大数据技术之SparkCore(七)数据读取与保存--文件类

  • 作者:kane0409
  • 时间:2019-10-21 20:22:18

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

1.1 文件类数据读取与保存

4.1.1 Text文件

1)数据读取:textFile(String)

scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24

2)数据保存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut") 

4.1.2 Json文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。

注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

(1)导入解析json所需的包

scala> import scala.util.parsing.json.JSON

(2)上传json文件到HDFS

[root@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /

(3)读取文件

scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24

(4)解析json数据

scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27

(5)打印

scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

4.1.3 Sequence文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[ keyClass, valueClass](path)。

注意:SequenceFile文件只针对PairRDD

(1)创建一个RDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

(2)将RDD保存为Sequence文件

scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")

(3)查看该文件

[root@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile
[root @hadoop102 seqFile]$ ll
总用量 8
-rw-r--r-- 1 root root 108 10月 9 10:29 part-00000
-rw-r--r-- 1 root root 124 10月 9 10:29 part-00001
-rw-r--r-- 1 root root 0 10月 9 10:29 _SUCCESS
[root @hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط

(4)读取Sequence文件

scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24

(5)打印读取后的Sequence文件

scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

4.1.4 对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。

(1)创建一个RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24

(2)将RDD保存为Object文件

scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")

(3)查看该文件

[root @hadoop102 objectFile]$ pwd
/opt/module/spark/objectFile
[root @hadoop102 objectFile]$ ll
总用量 8
-rw-r--r-- 1 root root 142 10月 9 10:37 part-00000
-rw-r--r-- 1 root root 142 10月 9 10:37 part-00001
-rw-r--r-- 1 root root 0 10月 9 10:37 _SUCCESS
[root @hadoop102 objectFile]$ cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l

(4)读取Object文件

scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24

(5)打印读取后的Sequence文件

scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)

(本文为系列文章,关注作者阅读其它部分内容,总有一篇是你欠缺的,技术无止境,且学且珍惜!!!)

今日热门