scala> var rdd1 = sc.textFile("hdfs://bigdata111:9000/spark/test_Cache.txt") rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata166:9000/spark/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24 scala> sc.setCheckpointDir("hdfs://bigdata166:9000/spark/") scala> sc.setCheckpointDir("hdfs://bigdata166:9000/spark/checkpoint") scala> rdd1.count scala> rdd1.cache //缓存 scala> rdd1.count res2: Long = 1846904 // scala> def fun1(index:Int,itea:Iterator[Int]):Iterator[String] = { | itea.toList.map(x => "[partid:" + index +", value="+x+"]" ).iterator | } fun1: (index: Int, itea: Iterator[Int])Iterator[String] scala> rdd1.mapPartitions mapPartitions mapPartitionsWithIndex scala> rdd1.mapPartitionsWithIndex(fun1).collect res3: Array[String] = Array( [partid:0, value=1], [partid:0, value=2], [partid:0, value=3], [partid:1, value=4], [partid:1, value=5], [partid:1, value=6], [partid:2, value=7], [partid:2, value=8], [partid:2, value=9])
:聚合,
先局部聚合,然后全局聚合
求每个分区的最大值,然后求和
现求每个分区的最大值
求和
zeroValue: U:初始化,需要赋值:初始值在局部操作起作用,全局操作也起作用
后面两个函数参数
第一个函数:表示局部操作
第二个:表示全局操作
scala> var rdd1 = sc.parallelize(List(1,2,3,4,5),2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> rdd1.mapPartitionsWithIndex(fun1).collect res4: Array[String] = Array( [partid:0, value=1], [partid:0, value=2], [partid:1, value=3], [partid:1, value=4], [partid:1, value=5]) scala> rdd1.aggregate(0)(max(_,_),_+_) res5: Int = 7 scala> rdd1.aggregate(10)(max(_,_),_+_) res6: Int = 30 //第一个分区数据:10(初始值),1,2————10 //第二个分区:10(初始值),3,4,5————10 //求和:10(初始值)+10+10=30 //第一个分区的最大值:2 //二:5 //求和:2+5=7 scala> rdd1.aggregate(0)(_+_,_+_) res7: Int = 15 scala> rdd1.aggregate(10)(_+_,_+_) res8: Int = 45 //第一个分区:10+1+2=3 //第二个分区:10+3+4+5=12 //求和:10+3+12=15 //其它操作:</pre> scala> var rdd1 = sc.parallelize(List("12","34","567","8901"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> def fun1(index:Int,iter:Iterator[String]):Iterator[String]={ | iter.toList.map(x => "[partID: "+index+",value:"+x+"]").iterator} fun1: (index: Int, iter: Iterator[String])Iterator[String] scala> rdd1.mapPartitionsWithIndex(fun1).collect res1: Array[String] = Array( [partID: 0,value:12], [partID: 0,value:34], [partID: 1,value:567], [partID: 1,value:8901]) scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y) scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y) res3: String = 42 scala> rdd1.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y) res4: String = 24 分析: 第一个分区:“12”,“34” 第一次比较:“”,“12”=2.toString ==》 “2” 第二次比较:“2”,“34”=2.toString ==》 “2” 第二个分区:“567”,“8901” 第一次比较:“”,“567”=3.toString ==》“3” 第二次比较:“3”,“8901”=4.toString ==》 “4” “24”或者“42” scala> var rdd1 = sc.parallelize(List("12","23","345",""),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24 scala> rdd1.mapPartitionsWithIndex(fun1).collect res6: Array[String] = Array([partID: 0,value:12], [partID: 0,value:23], [partID: 1,value:345], [partID: 1,value:]) scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y) res7: String = 10 scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y) res9: String = 01 分析: 第一个分区:“12”,“34” 第一次比较:“”,“12”=0.toString ==》 “0” 第二次比较:“0”,“34”=1.toString ==》 “1” 第二个分区:“345”,“” 第一次比较:“”,“345”=0.toString ==》“0” 第二次比较:“0”,“”=0.toString ==》 “0” “10”或者“01” scala> var rdd1 = sc.parallelize(List("12","23","","345"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24 scala> rdd1.mapPartitionsWithIndex(fun1).collect res10: Array[String] = Array([partID: 0,value:12], [partID: 0,value:23], [partID: 1,value:], [partID: 1,value:345]) scala> rdd1.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y) res11: String = 11 分析: 第一个分区:“12”,“34” 第一次比较:“”,“12”=0.toString ==》 “0” 第二次比较:“0”,“34”=1.toString ==》 “1” 第二个分区:“”,“345” 第一次比较:“”,“”=0.toString ==》“0” 第二次比较:“0”,“345”=1.toString ==》 “1” “11”