stream和parallelStream

HanGR 于 2024-09-10 发布


stream()和parallelStream()


Stream()的特点


parallelStream()方法的特点


stream()和parallelStream()的区别


踩坑

  1. 问题汇总

    • ParallelStreams()使用JVM默认的forkJoin框架的线程池由当前线程去执行并行操作
    • 阻塞操作: 调用第三方API时, 由于响应时间较长, 会导致线程阻塞, 影响整体性能.
    • 线程池耗尽: ForkJoinPool.common() 的线程池可能会被耗尽, 导致后续任务性能下降.
    • 并行流的影响: 使用 ParallelStream() 时, 长时间运行的函数或阻塞操作会影响整个程序的性能, 导致其他部分的执行变得不可预测.
  2. 解决方案

    • (1) 异步处理: 使用 CompletableFuture 或其他异步编程模型来处理第三方API的调用, 避免阻塞主线程或其他工作线程
     List<CompletableFuture<Response>> futures = apiUrls.stream()
         .map(url -> CompletableFuture.supplyAsync(() -> callApi(url), executor))
         .collect(Collectors.toList());
        
     CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    
    • (2) 自定义线程池: 为不同的任务类型创建不同的线程池, 避免所有任务共享同一个线程池导致的资源竞争问题.
     ExecutorService apiExecutor = Executors.newFixedThreadPool(10);
     ExecutorService computeExecutor = Executors.newFixedThreadPool(5);
        
     // 使用apiExecutor处理API调用
     CompletableFuture<Response> apiFuture = CompletableFuture.supplyAsync(() -> callApi(url), apiExecutor);
        
     // 使用computeExecutor处理计算任务
     CompletableFuture<Result> computeFuture = CompletableFuture.supplyAsync(() -> computeData(data), computeExecutor);
    
    • (3) ManagedBlocker: 在某些情况下, 可以使用 ManagedBlocker 来帮助 ForkJoinPool 管理阻塞操作, 确保线程池的线程不会被完全阻塞.
     class ApiBlocker implements ManagedBlocker {
         private volatile boolean done = false;
         private final String url;
         private Response response;
        
         public ApiBlocker(String url) {
             this.url = url;
         }
        
         @Override
         public boolean block() {
             response = callApi(url);
             done = true;
             return true;
         }
        
         @Override
         public boolean isReleasable() {
             return done;
         }
        
         public Response getResponse() {
             return response;
         }
     }
        
     ForkJoinPool.managedBlock(new ApiBlocker(url));
    
    • (4) 避免长时间运行的函数: 在 ParallelStream() 中尽量避免使用长时间运行的函数, 或者将这些函数拆分成更小的任务, 以减少对线程池的影响.


总结