大佬教程收集整理的这篇文章主要介绍了在 akka 流中的操作之间添加状态,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.source;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class sd001 {
private static final ActorSystem system = ActorSystem.create("akkassembly");
private static List<RData> ls = new ArrayList();
private static class RData {
private String ID;
public RData(String ID){
this.ID = ID;
}
public List<Integer> getValues(){
if(this.ID.equalsIgnoreCase("1")) {
return Arrays.asList(1,2,3,4,5);
}
else {
return Arrays.asList(1,3);
}
}
public String getID() {
return this.ID;
}
}
final static List<RData> builderFunction() {
try {
ls.add(new RData("1"));
ls.add(new RData("2"));
ls.add(new RData("3"));
Thread.sleep(3000);
} catch (InterruptedException E) {
e.printstacktrace();
}
return ls;
}
private static double calculateAverage(List <Integer> marks) {
return marks.stream()
.mapTodouble(d -> d)
.average()
.orElse(0.0);
}
public static voID main(String[] args) throws ExecutionException,InterruptedException,TimeoutException {
final source<List<RData>,NotUsed> source2 =
source.repeat(NotUsed.geTinstance()).map(elem -> builderFunction());
source2.mapConcat(i -> i)
.groupBy(3,x -> x.getID())
.map(v -> calculateAverage(v.getValues()))
.to(Sink.foreach(x -> System.out.println(X)))
.run(system);
}
}
@H_674_2@结果输出:
11:55:27.477 [akkassembly-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
3.0
2.0
2.0
3.0
@H_674_2@似乎按预期工作。
@H_674_2@我使用 groupBy
方法 (https://doc.akka.io/docs/akka/current/stream/stream-substream.html) 按其关联的 ID 值对 List
项进行分组。如何将 ID 值添加到输出平均值的阶段,以便将 ID 打印到屏幕而不是仅输出平均值?我所指的阶段是:
.to(Sink.foreach(x -> System.out.println(X)))
@H_674_2@一种可能的解决方案是修改方法 getValues
并创建一个新参数 ID
并返回 ID
以及平均值,这将允许访问 println
中的值{1}} 为 Sink
。这个解决方案似乎过于复杂。看起来我需要在 ID
和 @H_85_5@map 函数之间携带一个额外的状态(在本例中为 to
)吗?
sourceWithContext
/FlowWithContext
:
@H_674_2@本质上,FlowWithContext
只是一个包含元素和上下文元组的 Flow
,但优势在于运算符:FlowWithContext
上的大多数运算符将作用于元素而不是元组上,让您可以专注于您的应用程序逻辑,而不必担心上下文。
@H_674_2@在这种特殊情况下,由于 groupBy
的作用类似于重新排序元素,FlowWithContext
不支持 groupBy
,因此您必须将 ID 嵌入到流元素中...
@H_674_2@(...除非您想深入了解自定义图形阶段的深度,这可能会使将 ID 嵌入到流元素中的复杂性相形见绌。)
以上是大佬教程为你收集整理的在 akka 流中的操作之间添加状态全部内容,希望文章能够帮你解决在 akka 流中的操作之间添加状态所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。