程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了在 akka 流中的操作之间添加状态大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决在 akka 流中的操作之间添加状态?

开发过程中遇到在 akka 流中的操作之间添加状态的问题如何解决?下面主要结合日常开发的经验,给出你关于在 akka 流中的操作之间添加状态的解决方法建议,希望对你解决在 akka 流中的操作之间添加状态有所启发或帮助; @H_674_2@下面是我用来计算对象列表中数据流平均值的代码:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class sd001 {

    private static final ActorSystem system = ActorSystem.create("akkassembly");
    private static List<RData> ls = new ArrayList();

    private static class RData {
        private String ID;

        public RData(String ID){
            this.ID = ID;
        }

        public List<Integer> getValues(){
            if(this.ID.equalsIgnoreCase("1")) {
                return Arrays.asList(1,2,3,4,5);
            }
            else {
                return Arrays.asList(1,3);
            }
        }

        public String getID() {
            return this.ID;
        }
    }

    final static List<RData> builderFunction() {
        try {
            ls.add(new RData("1"));
            ls.add(new RData("2"));
            ls.add(new RData("3"));
            Thread.sleep(3000);
        } catch (InterruptedException E) {
            e.printstacktrace();
        }
        return ls;
    }

    private static double calculateAverage(List <Integer> marks) {
        return marks.stream()
                .mapTodouble(d -> d)
                .average()
                .orElse(0.0);
    }

    public static voID main(String[] args) throws ExecutionException,InterruptedException,TimeoutException {

        final source<List<RData>,NotUsed> source2 =
                source.repeat(NotUsed.geTinstance()).map(elem -> builderFunction());

                source2.mapConcat(i -> i)
                .groupBy(3,x -> x.getID())
                .map(v -> calculateAverage(v.getValues()))
                .to(Sink.foreach(x -> System.out.println(X)))
                .run(system);

    }

}
@H_674_2@结果输出:

11:55:27.477 [akkassembly-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
3.0
2.0
2.0
3.0
@H_674_2@似乎按预期工作。

@H_674_2@我使用 groupBy 方法 (https://doc.akka.io/docs/akka/current/stream/stream-substream.html) 按其关联的 ID 值对 List 项进行分组。如何将 ID 值添加到输出平均值的阶段,以便将 ID 打印到屏幕而不是仅输出平均值?我所指的阶段是:

.to(Sink.foreach(x -> System.out.println(X)))
@H_674_2@一种可能的解决方案是修改方法 getValues 并创建一个新参数 ID 并返ID 以及平均值,这将允许访问 println 中的值{1}} 为 Sink。这个解决方案似乎过于复杂。看起来我需要在 ID 和 @H_85_5@map 函数之间携带一个额外的状态(在本例中为 to)吗?

解决方法

@H_674_2@通常,Akka Streams 中的阶段不共享状态:它们仅在它们之间传递流的元素。因此,在流的各个阶段之间传递状态的唯一通用方法是将状态嵌入到正在传递的元素中。

@H_674_2@在某些情况下,可以使用 sourceWithContext/FlowWithContext

@H_674_2@本质上,FlowWithContext 只是一个包含元素和上下文元组的 Flow,但优势在于运算符:FlowWithContext 上的大多数运算符将作用于元素而不是元组上,让您可以专注于您的应用程序逻辑,而不必担心上下文。

@H_674_2@在这种特殊情况下,由于 groupBy 的作用类似于重新排序元素,FlowWithContext 不支持 groupBy,因此您必须将 ID 嵌入到流元素中...

@H_674_2@(...除非您想深入了解自定义图形阶段的深度,这可能会使将 ID 嵌入到流元素中的复杂性相形见绌。)

大佬总结

以上是大佬教程为你收集整理的在 akka 流中的操作之间添加状态全部内容,希望文章能够帮你解决在 akka 流中的操作之间添加状态所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。