程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:?

开发过程中遇到ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:的问题如何解决?下面主要结合日常开发的经验,给出你关于ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:的解决方法建议,希望对你解决ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:有所启发或帮助; @H_419_0@方法一

@H_419_0@通常,非常快,而且效果很好。

public static int loops = 500;
private static Executorservice customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.Now();
LongSumMaryStatistics stats = LongStream.range(0,loops).boxed()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slownetworkCall(number),customPool))
        .collect(Collectors.toList()).stream() // collect first,else will be sequential
        .map(CompletableFuture::join)
        .mapTolong(Long::longvalue)
        .sumMaryStatistics();

log.info("cf completed in :: {},sumMaryStats :: {} ",Duration.between(start,Instant.Now()).toMillis(),stats);
// ... cf completed in :: 1054,sumMaryStats :: LongSumMaryStatistics{Count=500,sum=504008,min=1000,average=1008.016000,max=1017} 
@H_419_0@我明白,如果我不先收集流,那么由于懒惰的性质,流会一个一个地弹出CompletableFutures,并同步运行。 所以,作为一个实验:

@H_419_0@方法二

@H_419_0@删除中间收集步骤,但也要使流平行!

Instant start = Instant.Now();
LongSumMaryStatistics stats = LongStream.range(0,loops).boxed()
        .parallel()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slownetworkCall(number),customPool))
        .map(CompletableFuture::join) // direct join
        .mapTolong(Long::longvalue).sumMaryStatistics();

log.info("cfps_directJoin completed in :: {},stats);
// ... cfps_directJoin completed in :: 8098,sum=505002,average=1010.004000,max=1015}
@H_419_0@总结:

  • 方法 1 :: 1 秒
  • 方法 2 :: 8 秒
@H_419_0@我观察到的一种模式:

  1. 并行流方法一次“批处理”60 个调用,因此有 500 个循环,500/60 ~ 8 个批处理,每个需要 1 秒,因此总共 8 个
  2. 所以,当我将循环次数减少到 300 时,有 300/60 = 5 个批次,实际需要 5 秒才能完成。

所以,问题是:

@H_419_0@为什么在并行+直接收集方式中会有这种批处理调用?


@H_419_0@为了完成,这是我的虚拟网络调用方法:

    public static Long slownetworkCall(Long i) {
        Instant start = Instant.Now();
        log.info(" {} going to sleep..",i);
        try {
            TimeUnit.MILliSECONDs.sleep(1000); // 1 second
        } catch (InterruptedException E) {
            e.printstacktrace();
        }
        log.info(" {} woke up..",i);
        return Duration.between(start,Instant.Now()).toMillis();
    }

解决方法

这是当您阻塞其内部线程时 ForJoinPool 如何处理事物以及它产生多少新线程的工件。然,我可能会找到发生这种情况的确切线路,但我不确定这是否值得。有两个原因:

  • 逻辑可以改变

  • ForkJoinPool 中的代码远非微不足道

似乎对我们俩来说,ForkJoinPool.commonPool().getParallelism() 都会返回 11,所以我得到的结果与您相同。如果您登录 ForkJoinPool.commonPool().getPoolSize() 以了解您的代码使用了多少活动线程,您会看到在一段时间后,它只会在 64 处稳定下来。因此可以同时处理的最大任务数是 64,这与您看到的结果(那些 8 seconds)相当。

如果我用 -Djava.util.concurrent.ForkJoinPool.common.parallelism=50 运行您的代码,它现在在 2 seconds 中执行,并且池大小增加到 256。这意味着,有一个内部逻辑可以调整这些事情。

大佬总结

以上是大佬教程为你收集整理的ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:全部内容,希望文章能够帮你解决ParallelStream 上的 CompletableFuture 被批处理并且运行速度比顺序流慢? 所以,问题是:所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。