Пример работы с SparkOnHbase

SparkOnHbase
https://github.com/cloudera-labs/SparkOnHBase
Скачиваем jar архив spark-hbase-0.0.2-clabs.jar для использования возможностей SparkOnHbase
Запускаем spark-shell:

spark-shell --master "local[2]" --driver-class-path /home/hdfs/spark-hbase-0.0.2-clabs.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar

import com.cloudera.spark.hbase._
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Scan
import org.apache.spark.sql.SQLContext

import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.spark.SparkConf

val rdd = sc.parallelize(Array(
      (Bytes.toBytes("user_id:10")),
      (Bytes.toBytes("user_id:11580751")),
      (Bytes.toBytes("user_id:11580752")),
      (Bytes.toBytes("user_id:11580753")),
      (Bytes.toBytes("user_id:11580754")),
      (Bytes.toBytes("user_id:11580755")),
      (Bytes.toBytes("user_id:11580756"))))

val sc = new SparkContext(sparkConf)
val confHb = HBaseConfiguration.create()
confHb.addResource(new Path("/etc/hbase/conf/core-site.xml"))
confHb.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))

val hbaseContext = new HBaseContext(sc, confHb);
//val tableName = args(0);
val tableName = "db_name.table_name";

val getRdd = hbaseContext.bulkGet[Array[Byte], String](
tableName,
2,
rdd,
record => { 
System.out.println("making Get" )
new Get(record)
},
(result: Result) => {

val it = result.list().iterator()
val b = new StringBuilder

b.append(Bytes.toString(result.getRow()) + ":")

while (it.hasNext()) {
  val kv = it.next()
  val q = Bytes.toString(kv.getQualifier())
  if (q.equals("counter")) {
    b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toLong(kv.getValue()) + ")")
  } else {
    b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toString(kv.getValue()) + ")")  
  }
}
b.toString
})
          
getRdd.collect.foreach(v => System.out.println(v))

Настройка для pom.xml :
<repository>
 <id>cloudera-repo-releases</id>
 <url>https://repository.cloudera.com/artifactory/repo/</url>
</repository>
<dependency>
 <groupId>com.cloudera</groupId>
 <artifactId>spark-hbase</artifactId>
 <version>0.0.2-clabs</version>
</dependency>

Полезные ссылки:


Коментарі

Популярні дописи з цього блогу

Минимальные требование для кластера Hadoop(with Spark)

Apache Spark Resource Management и модель приложений от YARN

Apache Spark - основные команды