Пример на Spark(Scala) рекурсивного чтения hdfs директории

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI

val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/data/*/*")

hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
   val filePathName = fileStatus.getPath().toString()
   val fileName = fileStatus.getPath().getName()
   println("fileName: " + fileName + "; FilePath: " + filePathName)
   val fileRDD = sc.textFile(filePathName)
   val counts = fileRDD.flatMap(line => line.split("\n"))
   //val df = sqlContext.read.json( filePathName)
   //df.show()
   counts.foreachPartition(partF => {
      partF.foreach( item_row => {
         println(item_row)
      });
   });
}

Коментарі

Популярні дописи з цього блогу

Минимальные требование для кластера Hadoop(with Spark)

Apache Spark Resource Management и модель приложений от YARN

Apache Spark - основные команды