Пример работы с SparkOnHbase
SparkOnHbase
https://github.com/cloudera-labs/SparkOnHBase
Скачиваем jar архив spark-hbase-0.0.2-clabs.jar для использования возможностей SparkOnHbase
Запускаем spark-shell:
Настройка для pom.xml :
Полезные ссылки:
https://github.com/cloudera-labs/SparkOnHBase
Скачиваем jar архив spark-hbase-0.0.2-clabs.jar для использования возможностей SparkOnHbase
Запускаем spark-shell:
spark-shell --master "local[2]" --driver-class-path /home/hdfs/spark-hbase-0.0.2-clabs.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
import com.cloudera.spark.hbase._
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Scan
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.spark.SparkConf
val rdd = sc.parallelize(Array(
(Bytes.toBytes("user_id:10")),
(Bytes.toBytes("user_id:11580751")),
(Bytes.toBytes("user_id:11580752")),
(Bytes.toBytes("user_id:11580753")),
(Bytes.toBytes("user_id:11580754")),
(Bytes.toBytes("user_id:11580755")),
(Bytes.toBytes("user_id:11580756"))))
val sc = new SparkContext(sparkConf)
val confHb = HBaseConfiguration.create()
confHb.addResource(new Path("/etc/hbase/conf/core-site.xml"))
confHb.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))
val hbaseContext = new HBaseContext(sc, confHb);
//val tableName = args(0);
val tableName = "db_name.table_name";
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
tableName,
2,
rdd,
record => {
System.out.println("making Get" )
new Get(record)
},
(result: Result) => {
val it = result.list().iterator()
val b = new StringBuilder
b.append(Bytes.toString(result.getRow()) + ":")
while (it.hasNext()) {
val kv = it.next()
val q = Bytes.toString(kv.getQualifier())
if (q.equals("counter")) {
b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toLong(kv.getValue()) + ")")
} else {
b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toString(kv.getValue()) + ")")
}
}
b.toString
})
getRdd.collect.foreach(v => System.out.println(v))
Настройка для pom.xml :
<repository>
<id>cloudera-repo-releases</id>
<url>https://repository.cloudera.com/artifactory/repo/</url>
</repository>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>spark-hbase</artifactId>
<version>0.0.2-clabs</version>
</dependency>
Полезные ссылки:
Коментарі
Дописати коментар