java8中Stream的使用以及分割list案例
一、Steam的优势
java8中Stream配合Lambda表达式极大提高了编程效率,代码简洁易懂(可能刚接触的人会觉得晦涩难懂),不需要写传统的多线程代码就能写出高性能的并发程序
二、项目中遇到的问题
由于微信接口限制,每次导入code只能100个,所以需要分割list。但是由于code数量可能很大,这样执行效率就会很低。
1.首先想到是用多线程写传统并行程序,但是博主不是很熟练,写出代码可能会出现不可预料的结果,容易出错也难以维护。
2.然后就想到Steam中的parallel,能提高性能又能利用java8的特性,何乐而不为。
三、废话不多说,直接先贴代码,然后再解释(java8分割list代码在标题四)。
1.该方法是根据传入数量生成codes,privateStringgetGeneratorCode(inttenantId)是我根据编码规则生成唯一code这个不需要管,我们要看的是Stream.iterate
2.iterate()第一个参数为起始值,第二个函数表达式(看自己想要生成什么样的流关键在这里),http://write.blog.csdn.net/postedit然后必须要通过limit方法来限制自己生成的Stream大小。parallel()是开启并行处理。map()就是一对一的把Stream中的元素映射成ouputSteam中的元素。最后用collect收集,
2.1构造流的方法还有Stream.of(),结合或者数组可直接list.stream();
String[]array=newString[]{"1","2","3"};
stream=Stream.of(array)或者Arrays.Stream(array);
2.2数值流IntStream
int[]array=newint[]{1,2,3};
IntStream.of(array)或者IntStream.ranage(0,3)
3.以上构造流的方法都是已经知道大小,对于通过入参确定的应该图中方法自己生成流。
四、java8分割list,利用StreamApi实现。
没用java8前代码,做个鲜明对比():
1.list是我的编码集合(codes)。MAX_SEND为100(即每次100的大小去分割list),limit为按编码集合大小算出的本次需要分割多少次。
2.我们可以看到其实就是多了个skip跟limit方法。skip就是舍弃stream前多少个元素,那么limit就是返回流前面多少个元素(如果流里元素少于该值,则返回全部)。然后开启并行处理。通过循环我们的分割list的目标就达到了,每次取到的sendList就是100,100这样子的。
3.因为我这里业务就只需要到这里,如果我们分割之后需要收集之后再做处理,那只需要改写一下就ok;如:
List>splitList=Stream.iterate(0,n->n+1).limit(limit).parallel().map(a->{ List
sendList=list.stream().skip(a*MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList()); }).collect(Collectors.toList());
五、java8流里好像拿不到下标,所以我才用到构造一个递增数列当下标用,这就是我用java8分割list的过程,比以前的for循环看的爽心悦目,优雅些,性能功也提高了。
如果各位有更好的实现方式,欢迎留言指教。
补充知识:聊聊flinkDataStream的split操作
序
本文主要研究一下flinkDataStream的split操作
实例
SplitStreamsplit=someDataStream.split(newOutputSelector (){ @Override publicIterable select(Integervalue){ List output=newArrayList (); if(value%2==0){ output.add("even"); } else{ output.add("odd"); } returnoutput; } });
本实例将dataStreamsplit为两个dataStream,一个outputName为even,另一个outputName为odd
DataStream.split
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public publicclassDataStream{ //...... publicSplitStream split(OutputSelector outputSelector){ returnnewSplitStream<>(this,clean(outputSelector)); } //...... }
DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
OutputSelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolving publicinterfaceOutputSelectorextendsSerializable{ Iterable select(OUTvalue); }
OutputSelector定义了select方法用于给element打上outputNames
SplitStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolving publicclassSplitStreamextendsDataStream { protectedSplitStream(DataStream dataStream,OutputSelector outputSelector){ super(dataStream.getExecutionEnvironment(),newSplitTransformation (dataStream.getTransformation(),outputSelector)); } publicDataStream select(String...outputNames){ returnselectOutput(outputNames); } privateDataStream selectOutput(String[]outputNames){ for(StringoutName:outputNames){ if(outName==null){ thrownewRuntimeException("Selectednamesmustnotbenull"); } } SelectTransformation selectTransform=newSelectTransformation (this.getTransformation(),Lists.newArrayList(outputNames)); returnnewDataStream (this.getExecutionEnvironment(),selectTransform); } }
SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation
StreamGraphGenerator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internal publicclassStreamGraphGenerator{ //...... privateCollectiontransform(StreamTransformation>transform){ if(alreadyTransformed.containsKey(transform)){ returnalreadyTransformed.get(transform); } LOG.debug("Transforming"+transform); if(transform.getMaxParallelism()<=0){ //ifthemaxparallelismhasn'tbeenset,thenfirstusethejobwidemaxparallelism //fromtheExecutionConfig. intglobalMaxParallelismFromConfig=env.getConfig().getMaxParallelism(); if(globalMaxParallelismFromConfig>0){ transform.setMaxParallelism(globalMaxParallelismFromConfig); } } //callatleastoncetotriggerexceptionsaboutMissingTypeInfo transform.getOutputType(); Collection transformedIds; if(transforminstanceofOneInputTransformation,?>){ transformedIds=transformOneInputTransform((OneInputTransformation,?>)transform); }elseif(transforminstanceofTwoInputTransformation,?,?>){ transformedIds=transformTwoInputTransform((TwoInputTransformation,?,?>)transform); }elseif(transforminstanceofSourceTransformation>){ transformedIds=transformSource((SourceTransformation>)transform); }elseif(transforminstanceofSinkTransformation>){ transformedIds=transformSink((SinkTransformation>)transform); }elseif(transforminstanceofUnionTransformation>){ transformedIds=transformUnion((UnionTransformation>)transform); }elseif(transforminstanceofSplitTransformation>){ transformedIds=transformSplit((SplitTransformation>)transform); }elseif(transforminstanceofSelectTransformation>){ transformedIds=transformSelect((SelectTransformation>)transform); }elseif(transforminstanceofFeedbackTransformation>){ transformedIds=transformFeedback((FeedbackTransformation>)transform); }elseif(transforminstanceofCoFeedbackTransformation>){ transformedIds=transformCoFeedback((CoFeedbackTransformation>)transform); }elseif(transforminstanceofPartitionTransformation>){ transformedIds=transformPartition((PartitionTransformation>)transform); }elseif(transforminstanceofSideOutputTransformation>){ transformedIds=transformSideOutput((SideOutputTransformation>)transform); }else{ thrownewIllegalStateException("Unknowntransformation:"+transform); } //needthischeckbecausetheiteratetransformationaddsitselfbefore //transformingthefeedbackedges if(!alreadyTransformed.containsKey(transform)){ alreadyTransformed.put(transform,transformedIds); } if(transform.getBufferTimeout()>=0){ streamGraph.setBufferTimeout(transform.getId(),transform.getBufferTimeout()); } if(transform.getUid()!=null){ streamGraph.setTransformationUID(transform.getId(),transform.getUid()); } if(transform.getUserProvidedNodeHash()!=null){ streamGraph.setTransformationUserHash(transform.getId(),transform.getUserProvidedNodeHash()); } if(transform.getMinResources()!=null&&transform.getPreferredResources()!=null){ streamGraph.setResources(transform.getId(),transform.getMinResources(),transform.getPreferredResources()); } returntransformedIds; } private Collection transformSelect(SelectTransformation select){ StreamTransformation input=select.getInput(); Collection resultIds=transform(input); //therecursivetransformmighthavealreadytransformedthis if(alreadyTransformed.containsKey(select)){ returnalreadyTransformed.get(select); } List virtualResultIds=newArrayList<>(); for(intinputId:resultIds){ intvirtualId=StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId,virtualId,select.getSelectedNames()); virtualResultIds.add(virtualId); } returnvirtualResultIds; } private Collection transformSplit(SplitTransformation split){ StreamTransformation input=split.getInput(); Collection resultIds=transform(input); //therecursivetransformcallmighthavetransformedthisalready if(alreadyTransformed.containsKey(split)){ returnalreadyTransformed.get(split); } for(intinputId:resultIds){ streamGraph.addOutputSelector(inputId,split.getOutputSelector()); } returnresultIds; } //...... }
StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
transformSplit方法则根据split.getOutputSelector()来addOutputSelector
小结
DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
OutputSelector定义了select方法用于给element打上outputNames
SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream
doc
DataStreamTransformations
以上这篇java8中Stream的使用以及分割list案例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。