import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI
val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/data/*/*")
hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
val filePathName = fileStatus.getPath().toString()
val fileName = fileStatus.getPath().getName()
println("fileName: " + fileName + "; FilePath: " + filePathName)
val fileRDD = sc.textFile(filePathName)
val counts = fileRDD.flatMap(line => line.split("\n"))
//val df = sqlContext.read.json( filePathName)
//df.show()
counts.foreachPartition(partF => {
partF.foreach( item_row => {
println(item_row)
});
});
}
Коментарі
Дописати коментар