java 中Spark中将对象序列化存储到hdfs
java中Spark中将对象序列化存储到hdfs
摘要:Spark应用中经常会遇到这样一个需求:需要将JAVA对象序列化并存储到HDFS,尤其是利用MLlib计算出来的一些模型,存储到hdfs以便模型可以反复利用.下面的例子演示了Spark环境下从Hbase读取数据,生成一个word2vec模型,存储到hdfs.
废话不多说,直接贴代码了.spark1.4+hbase0.98
importorg.apache.spark.storage.StorageLevel importscala.collection.JavaConverters._ importjava.io.File importjava.io.FileInputStream importjava.io.FileOutputStream importjava.io.ObjectInputStream importjava.io.ObjectOutputStream importjava.net.URI importjava.util.Date importorg.ansj.library.UserDefineLibrary importorg.ansj.splitWord.analysis.NlpAnalysis importorg.ansj.splitWord.analysis.ToAnalysis importorg.apache.hadoop.fs.FSDataInputStream importorg.apache.hadoop.fs.FSDataOutputStream importorg.apache.hadoop.fs.FileSystem importorg.apache.hadoop.fs.FileUtil importorg.apache.hadoop.fs.Path importorg.apache.hadoop.hbase.client._ importorg.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,TableName} importorg.apache.hadoop.hbase.filter.FilterList importorg.apache.hadoop.hbase.filter.PageFilter importorg.apache.hadoop.hbase.filter.RegexStringComparator importorg.apache.hadoop.hbase.filter.SingleColumnValueFilter importorg.apache.hadoop.hbase.filter.CompareFilter.CompareOp importorg.apache.hadoop.hbase.mapreduce.TableInputFormat importorg.apache.hadoop.hbase.protobuf.ProtobufUtil importorg.apache.hadoop.hbase.util.{Base64,Bytes} importcom.feheadline.fespark.db.Neo4jManager importcom.feheadline.fespark.util.Env importorg.apache.spark.SparkConf importorg.apache.spark.SparkContext importorg.apache.spark.rdd._ importorg.apache.spark.mllib.feature.{Word2Vec,Word2VecModel} importscala.math.log importscala.io.Source objectWord2VecDemo{ defconvertScanToString(scan:Scan)={ valproto=ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } defmain(args:Array[String]):Unit={ valsparkConf=newSparkConf().setAppName("Word2VecDemo") sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer","256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize","500") sparkConf.set("spark.rpc.askTimeout","30") valsc=newSparkContext(sparkConf) valhbaseConf=HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum","myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE,"crawled") valscan=newScan() valfilterList:FilterList=newFilterList(FilterList.Operator.MUST_PASS_ALL) valcomp:RegexStringComparator=newRegexStringComparator(""".{1500,}""") valarticleFilter:SingleColumnValueFilter=newSingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(newPageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) valcrawledRDD=sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) valarticlesRDD=crawledRDD.filter{ case(_,result)=>{ valcontent=Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content!=null } } valwordsInDoc=articlesRDD.map{ case(_,result)=>{ valcontent=Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq elseSeq("") } } valfitleredWordsInDoc=wordsInDoc.filter(_.nonEmpty) valword2vec=newWord2Vec() valmodel=word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs valhadoopConf=sc.hadoopConfiguration hadoopConf.set("fs.defaultFS","hdfs://myhadoop:9000/") valfileSystem=FileSystem.get(hadoopConf) valpath=newPath("/user/hadoop/data/mllib/word2vec-object") valoos=newObjectOutputStream(newFSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 valois=newObjectInputStream(newFSDataInputStream(fileSystem.open(path))) valsample_model=ois.readObject.asInstanceOf[Word2VecModel] /* *//你还可以将序列化文件从hdfs放到本地,scala程序使用模型 *importjava.io._ *importorg.apache.spark.mllib.feature.{Word2Vec,Word2VecModel} *valois=newObjectInputStream(newFileInputStream("/home/cherokee/tmp/word2vec-object")) *valsample_model=ois.readObject.asInstanceOf[Word2VecModel] *ois.close */ //-------------------------------------------------------------------------------------------------------------- } }
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!