JAVA spark创建DataFrame的方法
述说正传,接下来开始说正事。
以前用Python和Scala操作Spark的时候比较多,毕竟Python和Scala代码写起来要简洁很多。
今天一起来看看Java版本怎么创建DataFrame,代码写起来其实差不多,毕竟公用同一套API。测试数据可以参考我之前的文章。
先来总结下Spark的一般流程:
1,先创建Spark基础变量,spark,sc
2,加载数据,rdd.textFile,spark.read.csv/json等
3,数据处理,mapPartition,map,filter,reduce等一系列transformation操作
4,数据保存,saveAstextFile,或者其他DataFrame方法
祭出代码
packagedev.java; importdev.utils.Utils; importorg.apache.spark.api.java.JavaRDD; importorg.apache.spark.api.java.JavaSparkContext; importorg.apache.spark.sql.Dataset; importorg.apache.spark.sql.Row; importorg.apache.spark.sql.RowFactory; importorg.apache.spark.sql.SparkSession; importorg.apache.spark.sql.types.StructType; importscala.Tuple2; importjava.util.List; publicclassSpark1{ privatestaticfinalStringfileData="seed"; privatestaticfinalStringfileSave="result"; privatestaticSparkSessionspark=SparkSession.builder() .appName("Java-Spark") .master("local[*]") .config("spark.default.parallelism",100) .config("spark.sql.shuffle.partitions",100) .config("spark.driver.maxResultSize","3g") .getOrCreate(); privatestaticJavaSparkContextsc=JavaSparkContext.fromSparkContext(spark.sparkContext()); publicstaticvoidmain(String[]args){ Utils.delete(fileSave); // t1(); } privatestaticvoidt1(){ JavaRDDrdd=sc.textFile(fileData) .map(v->{ String[]parts=v.split("\t"); returnRowFactory.create(parts[0],Long.parseLong(parts[1])); }) .filter(v->v.getLong(1)>=10000) .sortBy(v->v.getLong(1),false,100) .coalesce(2); Dataset
df=spark.createDataFrame(rdd,StructType.fromDDL("titlestring,qtylong")); df.write().csv(fileSave); spark.stop(); } }
以上就是JAVA操作spark创建DataFrame的方法的详细内容,更多关于JAVASpark创建DataFrame的资料请关注毛票票其它相关文章!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。