只要使用这个功能,程序运行速度瞬间提升,高到离谱!

本文转载自微信公众号「小明菜市场」,作者小明菜市场 。转载本文请联系小明菜市场公众号。

云安网站制作公司哪家好,找创新互联!从网页设计、网站建设、微信开发、APP开发、响应式网站建设等网站项目制作,到程序开发,运营维护。创新互联2013年至今到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联

 前言

在之前如果需要处理集合需要先手动分成几部分,然后为每部分创建线程,最后在合适的时候合并,这是手动处理并行集合的方法,在java8中,有了新功能,可以一下开启并行模式。

并行流

认识开启并行流

并行流是什么?是把一个流内容分成多个数据块,并用不同线程分别处理每个不同数据块的流。例如,有下面一个例子,在List中,需要对List数据进行分别计算,其代码如下所示:

 
 
 
  1. List appleList = new ArrayList<>(); // 假装数据是从库里查出来的
  2. for (Apple apple : appleList) {
  3.     apple.setPrice(5.0 * apple.getWeight() / 1000);
  4. }

在这里,时间复杂度为O(list.size),随着list的增加,耗时也在增加。并行流可以解决这个问题,代码如下所示:

appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));

这里通过调parallelStream()说明当前流为并行流,然后进行并行执行。并行流内部使用了默认的ForkJoinPool线程池,默认线程数为处理器的核心数。

测试并行流

普通代码如下所示:

 
 
 
  1. public static void main(String[] args) throws InterruptedException {
  2.     List appleList = initAppleList();
  3.     Date begin = new Date();
  4.     for (Apple apple : appleList) {
  5.         apple.setPrice(5.0 * apple.getWeight() / 1000);
  6.         Thread.sleep(1000);
  7.     }
  8.     Date end = new Date();
  9.     log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
  10. }

输出的内容为耗时4s。

并行代码如下所示:

 
 
 
  1. List appleList = initAppleList();
  2. Date begin = new Date();
  3. appleList.parallelStream().forEach(apple ->
  4.                                    {
  5.                                        apple.setPrice(5.0 * apple.getWeight() / 1000);
  6.                                        try {
  7.                                            Thread.sleep(1000);
  8.                                        } catch (InterruptedException e) {
  9.                                            e.printStackTrace();
  10.                                        }
  11.                                    }
  12.                                   );
  13. Date end = new Date();
  14. log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);

输出结果为耗时1s。可以看到耗时大大提升了3s。

并行流拆分会影响流的速度

对于并行流来说需要注意以下几点:

对于 iterate 方法来处理的前 n 个数字来说,不管并行与否,它总是慢于循环的,

而对于 LongStream.rangeClosed() 方法来说,就不存在 iterate 的第两个痛点了。它生成的是基本类型的值,不用拆装箱操作,另外它可以直接将要生成的数字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 这样四部分。因此并行状态下的 rangeClosed() 是快于 for 循环外部迭代的

代码如下所示:

 
 
 
  1. package lambdasinaction.chap7;
  2. import java.util.stream.*;
  3. public class ParallelStreams {
  4.     public static long iterativeSum(long n) {
  5.         long result = 0;
  6.         for (long i = 0; i <= n; i++) {
  7.             result += i;
  8.         }
  9.         return result;
  10.     }
  11.     public static long sequentialSum(long n) {
  12.         return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
  13.     }
  14.     public static long parallelSum(long n) {
  15.         return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
  16.     }
  17.     public static long rangedSum(long n) {
  18.         return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
  19.     }
  20.     public static long parallelRangedSum(long n) {
  21.         return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
  22.     }
  23. }
  24. package lambdasinaction.chap7;
  25. import java.util.concurrent.*;
  26. import java.util.function.*;
  27. public class ParallelStreamsHarness {
  28.     public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
  29.     public static void main(String[] args) {
  30.         System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
  31.         System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
  32.         System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
  33.         System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
  34.         System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
  35.     }
  36.     public static  long measurePerf(Function f, T input) {
  37.         long fastest = Long.MAX_VALUE;
  38.         for (int i = 0; i < 10; i++) {
  39.             long start = System.nanoTime();
  40.             R result = f.apply(input);
  41.             long duration = (System.nanoTime() - start) / 1_000_000;
  42.             System.out.println("Result: " + result);
  43.             if (duration < fastest) fastest = duration;
  44.         }
  45.         return fastest;
  46.     }
  47. }

共享变量会造成数据出现问题

 
 
 
  1. public static long sideEffectSum(long n) {
  2.     Accumulator accumulator = new Accumulator();
  3.     LongStream.rangeClosed(1, n).forEach(accumulator::add);
  4.     return accumulator.total;
  5. }
  6. public static long sideEffectParallelSum(long n) {
  7.     Accumulator accumulator = new Accumulator();
  8.     LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
  9.     return accumulator.total;
  10. }
  11. public static class Accumulator {
  12.     private long total = 0;
  13.     public void add(long value) {
  14.         total += value;
  15.     }
  16. }

并行流的注意

  1. 尽量使用 LongStream / IntStream / DoubleStream 等原始数据流代替 Stream 来处理数字,以避免频繁拆装箱带来的额外开销
  2. 要考虑流的操作流水线的总计算成本,假设 N 是要操作的任务总数,Q 是每次操作的时间。N * Q 就是操作的总时间,Q 值越大就意味着使用并行流带来收益的可能性越大
  3. 对于较少的数据量,不建议使用并行流
  4. 容易拆分成块的流数据,建议使用并行流

新闻标题:只要使用这个功能,程序运行速度瞬间提升,高到离谱!
链接地址:http://www.shufengxianlan.com/qtweb/news43/9743.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联