本文转载自微信公众号「小姐姐味道」,小妹妹养的狗 。转载本文请联系小妹妹味道微信官方账号。
许多学生喜欢使用它lambda表达式允许你定义短小精悍的函数,反映你高超的编码水平。当然,这个功能对于一些用代码行数来衡量工作量的公司来说是一个损失。
比如下面的代码片段,让人阅读的时候就像是读诗一样。但是一旦用不好,也是会要命的。
List<Integer>transactionsIds=widgets.stream().filter(b->b.getColor()==RED).sorted((x,y)->x.getWeight()-y.getWeight()).mapToInt(Widget::getWeight).sum();这个代码有一个关键函数,即stream。通过它,一个普通一个普通的list,转化为流,然后可以使用类似管道的方法list操作。总之,用过的都说好。
不太熟悉这些函数吗?可以参考:到处都是map、flatMap,啥意思?》
问题来了
假如我们把stream换成parallelStream,会发生什么?
从字面上看,流会从串行 变为并行。
既然是平行的,就用屁股想想,就知道里面肯定会有线程安全问题。然而,我们在这里讨论的不是使用线程安全集合。这个话题太低级了。在这个阶段,在线程不安全的环境中使用线程安全集合是一项基本技能。
踩坑的地方是并行流的性能问题。
我们用代码说话。
下面的代码打开了8个线程,都使用并行流进行数据计算。在执行逻辑中,我们让每个任务sleep 1秒,可以模拟一些I/O请求需要时间等待。
使用stream,程序将在30秒后返回,但我们希望程序能在1秒以上返回,因为它是平行的,值得称号。
测试发现,我们等了很长时间才完成任务。
staticvoidparalleTest(){List<Integer>numbers=Arrays.asList(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29);finallongbegin=System.currentTimeMillis();numbers.parallelStream().map(k->{try{Thread.sleep(1000);System.out.println((System.currentTimeMillis()-begin) "ms=>" k "\t" Thread.currentThread());}catch(InterruptedExceptione){e.printStackTrace();}returnk;}).collect(Collectors.toList());}publicstaticvoidmain(String[]args){//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();}坑
事实上,在不同的机器上执行代码需要不同的时间。
既然是平行的,那一定要有平行的程度。太低,无法反映平行能力;太大了,浪费了上下文切换的时间。我很沮丧地发现,许多高级研发人员背诵了线程池的各种参数,并进行了各种调整。他们敢于视而不见I/O密集型业务中用上parallelStream。
要理解这种并行性,我们需要检查具体的结构方法。ForkJoinPool在类中找到这样的代码。
try{//ignoreexceptionsinaccessing/parsingpropertiesStringpp=System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");if(pp!=null)parallelism=Integer.parseInt(pp);fac=(ForkJoinWorkerThreadFactory)newInstanceFromSystemProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");handler=(UncaughtExceptionHandler)newInstanceFromSystemProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");}catch(Exceptionignore){}if(fac==null){if(System.getSecurityManager()==null)fac=defaultForkJoinWorkerThreadFactory;else//usesecurity-manageddefaultfac=newInnocuousForkJoinWorkerThreadFactory();}if(parallelism<0&&//default1lessthan#cores(parallelism=Runtime.getRuntime().availableProcessors()-1)<=0)parallelism=1;if(parallelism>MAX_CAP)parallelism=MAX_CAP;可以下参数控制,可以看出并行控制的。如果无法获得此参数,则默认使用 CPU个数-1 并行度。
可以看出,该函数是为计算密集型业务而设计的。如果你喂它很多任务,它将从并行退回到类似的串行效果。
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N设置初始值大小,仍有问题。
因为,parallelism这个变量是final是的,一旦设置,就不允许修改。也就是说,上述参数只生效一次。
张三可以使用以下代码设置并行度为20。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");李四可以同样的方式设置这个值为30。那么项目中实际使用的值是什么呢?你必须问JVM如何加载类信息。
这种方式并不太非常靠谱。
一种解决方案
我们可以提供外部forkjoinpool,即改变提交方式,实现不同类型的任务分离。
如下所示,任务分离可以通过显式代码提交来实现。
ForkJoinPoolpool=newForkJoinPool(30);finallongbegin=System.currentTimeMillis();try{pool.submit(()->numbers.parallelStream().map(k->{try{Thread.sleep(1000);System.out.println((System.currentTimeMillis()-begin) "ms=>" k "\t" Thread.currentThread());}catch(InterruptedExceptione){e.printStackTrace();}returnk;}).collect(Collectors.toList())).get();}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}这样,不同的场景就可以有不同的平行度。这种方和CountDownLatch我们需要手动管理资源。
这样,代码量的增加与优雅无关,不仅不优雅,而且丑陋。白天鹅变成了丑小鸭,你还会爱它吗?
作者简介:小妹妹味 (xjjdog),不允许程序员走弯路的微信官方账号。聚焦基础设施和Linux。十年架构,日流量100亿,和你讨论高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,进一步交流。