Публікації

Показано дописи з 2016

Apache Oozie - решаем ошибку java.lang.ClassNotFoundException

   При запуске задач с помощью Apache Oozie часто возникает ошибка: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.RpcControllerFactory   Суть в том, что по умолчанию у Вас не все библиотеки подключены в ShareLib. После настройки и запуска job-a может возникнуть ошибка "java.lang.ClassNotFoundException", которая и говорит нам, что Oozie попросту не видит данную реализацию в подключённых jar-никах в директории ShareLib.    Вам нужно указать Oozie где в HDFS лежат нужные jar файлы. Для этого создаём директорию в HDFS где будут лежать наши jar файлы: hadoop fs -mkdir /user/oozie/custom_share   Далее закачиваем jar в hdfs эту директорию: hadoop fs -put target/HBase-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs:///user/oozie/custom_share/ Далее нужно в файле job.properties указать эту директорию в переменной  oozie.libpath oozie.libpath=/user/oozie/custom_share Полезные ссылки: Add a common HBase lib in hdfs on clus...

Apache Oozie - краткое описание

   Oozie - это система для планирования выполнения повторяющихся задач в экосистеме Hadoop. В этой системе можно сконфигурировать цыкл задач написаных на Java,  Apache Hive, Apache Pig и Apache Sqoop, Apache Spark, UNIX Shell  и т.д. Oozie задачи могут быть быть сконфигурированы для регулярного выполнения или для выполнения при возникновении некоторого события. Задачи, которые должны быть запущены периодично - это задачи типа  Oozie coordinator. Задачи, которые должны быть запущены последовательно - это задачи типа Oozie Workflow. Задачи, которые вмещают в себе и задачи типа  Oozie coordinator и типа   Oozie Workflow, называются задачами типа Oozie Bundle и предоставляют возможность мониторить и координировать жизненный цикл всех типов задач в виде одного целого. Oozie Coordinators    Oozie coordinator планирует задачу на основе время начала и частоты выполнения задачи и также когда все необходимые входящие данные доступны....

Пример работы c supervisor в кластере Hadoop(Cloudera) на примере сервиса Livy Spark Server

В дистрибутиве от Cloudera напрямую работать с supervisor не получится. Дело в том, что саму службу они скрыли и она не доступна по стандартной команде "service supervisord" а файли настроек не находятся на своем привычном месте "/etc/supervisor/supervisord.conf". Так что же делать, если у Вас возникла например необходимость настроить стабильную работу  REST API для работы c Spark-ом на основе Livy Spark Server  ? Проблема усложняется тем, что на данный момент дистрибутив Cloudera официально не поддерживает эту службу и потому нужно самому собрать и запустить Livy сервер из исходников. После установки самого Livy сервера возникает необходимость  повысить отказоустойчивость путём мониторинга и автоматического перезапуска службы в случае падения. Настройки supervisor-a в случае собранного кластера на основе дистрибутива от Cloudera находится по этому адресу: "/var/run/cloudera-scm-agent/supervisor/include/" Создадим в этой директории файл "li...

Исправляем ошибку "SolrException: org.apache.hadoop.ipc.RemoteException Requested replication factor of 3 exceeds maximum of 2 for /solr/collection"

Эта ошибка возникает потому, что Solr берёт фактор репликации(replication factor) с конфигов hdfs только для индекса. Для Tlog нужно фактор репликации устанавливать в локальном конфигурационном файле /var/lib/solr/solr_configs/conf/solrconfig.xml или /var/lib/solr_configs/conf/solrconfig.xml: <updatelog> <int name="tlogDfsReplication">2</int> ... </updatelog>

Пример подключения к Impala в RStudio с помощью jdbc драйвера

Устанавливаем R на ubuntu: sudo apt-get -y install r-base sudo R CMD javareconf Далее скачиваем и устанавливаем RStudio  . Также скачиваем и распаковываем архив jdbc драйвера для подключения к Impala  . После запуска RStudio указываем путь к jdbc драйверу и работаем с базой: install.packages("RImpala") library(RImpala) rimpala.init(libs="/tmp/impala/jars/") rimpala.connect("192.168.10.1","21050") rimpala.invalidate() rimpala.showdatabases() rimpala.usedatabase("yourdatabase") rimpala.showtables() rimpala.describe("yourtablename") Полезные ссылки: http://blog.cloudera.com/blog/2013/12/how-to-do-statistical-analysis-with-impala-and-r/ https://github.com/Mu-Sigma/RImpala

Real-time сохранение изменяющихся данных в Apache SOLR с помощью Kafka + Lily HBase Batch Indexer

   Apache Solr очень хороший инструмент как для поиска так и для аналитики. Но изменение данных является дорогой операцией и желательно это не делать вообще или делать это как можно реже.    Для этого был разработан подход на базе key-value базы HBase которая хорошо работает с обновлением данных. После сохранение обновлений в HBase, данные с помощью Lily HBase Batch Indexer считываются изменения с update log-a в HBase и заливают изменения в индекс Solr. Для начала создаём таблицу в HBase: hbase shell hbase(main):021:0> create 'table_for_index', {NAME => 'data', REPLICATION_SCOPE => 1} hbase(main):021:0> put 'table_for_index', 'row1', 'data', 'value' hbase(main):022:0> put 'table_for_index', 'row2', 'data', 'value2' Далее создаём индекс в Solr в который будут сохранятся изменения из HBase: solrctl instancedir --generate $HOME/hbase-index //Вносим необходимые поля из HBase таблички v...

Реализация real-time загрузки данных в Hive c помощью Kafka topic и Apache Flume

Для сохранения данных с Kafka topic напрямую в Hive можно использовать HiveSink: Если данные в Kafka топике у Вас сохранены у Вас  в Json-e, то для этого есть  .serializer = JSON. Также возможен вариант DELIMITED с последующим указанием разделителя для значений. Создаём таблицу в Hive для загрузки данных: CREATE TABLE `db_name.table_name`( `date` string,`cost` string,`cost_origin` string,`campaign` string, `currency` string) CLUSTERED BY(campaign) INTO 10 BUCKETS ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION '/user/flume/flumeingest/db.table_name'; Пример настройки Hive Sink: tier1.sinks.HiveSink5.channel = channel5 tier1.sinks.HiveSink5.type = hive tier1.sinks.HiveSink5.hive.metastore = thirft://127.0.0.1:9083 tier1.sinks.HiveSink5.hive.database = db_name tier1.sinks.HiveSink5.hive...

Пример использования "Regular expression filtering" в Flume

   Основываясь на регулярном выражении можно фильтровать сообщения на основе контента сообщения. К примеру Вы можете передавать на сохранение только те сообщения, которые совпадают регулярному выражению и наоборот Вы можете отфильтровывать ненужные Вам сообщение регулярным выражением. За это отвечает флаг excludeEvents = true | false. Для регулярных выражений используется Java-style синтаксис . Пример использования: agent.sources.projectSource1.interceptors=filterErr agent.sources.projectSource1.interceptors.filterErr.type=regex_filter agent.sources.projectSource1.interceptors.filterErr.regex= ERROR [0-4]: #agent.sources.projectSource1.interceptors.filterErr.regex= ".*(\\_event\\_name\\\"\\:\\\"cost\\_importer\\_data).*" agent.sources.projectSource1.interceptors.filterErr.excludeEvents=false Полезные ссылки: http://flume.apache.org/FlumeUserGuide.html#regex-filtering-interceptor http://docs.oracle.com/javase/6/docs/api/java/util/regex/Pattern.html

Импорт данных с Vertica в Hive помощью Sqoop

sqoop import --m=1 --connection-manager="org.apache.sqoop.manager.GenericJdbcManager" --driver='com.vertica.jdbc.Driver' --connect "jdbc:vertica://127.0.0.1:5433/schema?searchpath=db" --username username --password-file="/user/sqoop/vertica_db_name_pwd.txt" --compression-codec=snappy --as-parquetfile --hive-import --hive-database db --hive-table table_name --check-column="user_id" --last-value="000" --verbose --query 'select * from db.table_name tableSel WHERE $CONDITIONS' --split-by="user_id" --target-dir="/user/sqoop/import_sqoop/sqoop_dump_table_name" --incremental=append --map-column-hive user_id=String,uid=String,user_uid=String,funnel_id=String --map-column-java user_id=String,uid=String,user_uid=String,funnel_id=String

Импорт данных с Vertica в HBase помощью Sqoop

sqoop import -Doraoop.timestamp.string=false --m=1 --connection-manager="org.apache.sqoop.manager.GenericJdbcManager" --driver='com.vertica.jdbc.Driver' --connect "jdbc:vertica://127.0.0.1:5433/schema?searchpath=db" --username username --password yourpassword --verbose --table table_name --hbase-create-table --hbase-table db.table_name --column-family data --columns "user_id,uid,user_uid,funnel_id" --hbase-row-key user_id

solrctl - создание колекции для SOLR

Создадим директорию со всеми необходимыми файлами настройки для будующей колекции документов (schema.xml, solrconf.xml) solrctl instancedir --generate /var/lib/solr/collection3 -schemaless Далее можно внести нужные нужные изменения в конфигурационные файлы. Например описать нужные поля в документе в schema.xml: <field name="dt" type="date" indexed="true" stored="true"/> <field name="hash" type="string" indexed="true" stored="true"/> <field name="funnel_id" type="string" indexed="false" stored="true"/> Или изменить фактор репликации для лога транзакций в solrconf.xml: <updateLog> <str name="dir">${solr.ulog.dir:}</str> <int name="tlogDfsReplication">2</int> </updateLog> Далее загружаем созданную директорию с настройками для колекции в SolrCloud: solrctl instancedir --create coll...

Real-time cохранение данных в SOLR с помощью Kafka + Flume

   Для заливки данных в колекцию SOLR можно использовать сочетание Kafka  + Flume MorphlineSolrSink. Это позволяет быстро настроить добавление новых данных в SOLR даже без необходимости писать код.     Предварительно сохранённая в формате json информация в Kafka topic читается, преобразовывается и сохраняется в SOLR с помощью MorphlineSolrSink. При этом все нюансы предподготовки данных перед сохранением в документ SOLR можно описать в конфигурационном файле для morphline.conf. Файл конфигураций для flume: tier1.sources = source1 tier1.channels = channel1 tier1.sinks = solrSink1 tier1.sources.source1.channels = channel1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181 Zookeeper client port tier1.sources.source1.topic = test_topic tier1.sources.source1.kafka.auto.offset.reset = smallest tier1.sources.source1.groupId = flume_source_test_topic tier1.sources.source1.batch...

Исправляем ошибку "The app won't work without a running Livy Spark Server"

В Hue 3.8 появилясь возможность использовать интерактивный Web-based блокнот для анализа данных " Notebook Application for Spark & SQL ". Вдохновление ребята взяли с аналогичной разработки  IPython Notebook. Идея в том, чтобы избавить аналитиков от изнурительной работы над компиляцией Jar файлов и их запуска с консоли. Это очень удобно, когда нужно сделать быструю проверку данных, проверку теории и т.д.. Проблема в том, что по умолчанию для запуска этого блокнота должен быть настроен Livy Spark Server. Об этом нам говорит следующее сообщение: "The app won't work without a running Livy Spark Server" В настройках hue.ini устанавливаем следующее: [desktop] app_blacklist= [notebook] show_notebooks=true [beeswax] use_new_editor=true Скачиваем и устанавливаем Livy server https://github.com/cloudera/livy  cd /usr/local/src git clone git@github.com:cloudera/livy.git cd livy-master export JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera export PATH=$JAVA_HO...

Пример работы с таблицами в Imapala написанных на Scala для Spark 1.5

Загружаем spark-shell и указываем путь к jdbc драйверу для работы с Impala : spark-shell --master "local[2]" --driver-class-path /opt/cloudera/parcels/CDH/jars/ImpalaJDBC41.jar После запуска spark-shell нужно импортировать все зависимости и октрыть соединение с помощью DriverManager. Далее приведены примеры чтения с таблицы и добавления данных: import java.util.Properties import org.apache.spark._ import org.apache.spark.sql.SQLContext import java.sql.Connection import java.sql.DriverManager Class.forName("com.cloudera.impala.jdbc41.Driver") var conn: java.sql.Connection = null conn = DriverManager.getConnection("jdbc:impala://127.0.0.1:21050/default;auth=noSasl", "", "") val statement = conn.createStatement(); val result = statement.executeQuery("SELECT * FROM users limit 10") result.next() result.getString("user_id")val sql_insert = "INSERT INTO users VALUES('user_id','email','gend...

Исправляем ошибку "Could not insert row with null value for row-key column" при импорте данных в HBase с помощью Sqoop

При иморте данных в HBase может возникнуть следующая ошибка: "Could not insert row with null value for row-key column". Один из вариантов, является добавления параметра "--columns", где нужно перечислить все поля для импорта вместе с row-key. Но это решение не всегда работает. Альтернативой может быть использование параметра "--query" где нужно sql функцией преобразовать все null значения в int к примеру. Вот пример конвертации null значения в int для Vertica: --query 'select funnel_id,dt,COALESCE(landing_user_uid, 0) AS landing_user_uid from landings_visits tableSel WHERE $CONDITIONS'

Ошибка при импорте данных с Vertica с помощью Sqoop: "java.io.IOException: SQLException in nextKeyValue"

При импорте данных с Vertica в Hive может возникнуть ошибка: "Error: java.io.IOException: SQLException in nextKeyValue". Дело в том, что при импорте с Vertica аргумент "–table" не работает. При попытке импорта с помощью sqoop, часто возникает следующая ошибка: Error: java.io.IOException: SQLException in nextKeyValue …… Caused by: com.vertica.support.exceptions.SyntaxErrorException: [Vertica][VJDBC](4856) ERROR: Syntax error at or near “.” Нужно изменить аргумент "--table" на "--query" в строке запуска импорта с помощью Sqoop. Пример изменённой строки запуска импорта с Vertica в Hive с помощью Sqoop: sqoop import -m 1 --connection-manager="org.apache.sqoop.manager.GenericJdbcManager" \ --driver='com.vertica.jdbc.Driver' \ --connect "jdbc:vertica://yourhost:5433/databasename?searchpath=schemaname" --username youruser --password yourpassword --compression-codec=snappy \ --as-parquetfile --hive-import --hiv...

Пример на Spark(Scala) рекурсивного чтения hdfs директории

import org.apache.hadoop.fs._ import org.apache.spark.deploy.SparkHadoopUtil import java.net.URI val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf) val hdfs = FileSystem.get(hdfs_conf) // source data in HDFS val sourcePath = new Path("/data/*/*") hdfs.globStatus( sourcePath ).foreach{ fileStatus => val filePathName = fileStatus.getPath().toString() val fileName = fileStatus.getPath().getName() println("fileName: " + fileName + "; FilePath: " + filePathName) val fileRDD = sc.textFile(filePathName) val counts = fileRDD.flatMap(line => line.split("\n")) //val df = sqlContext.read.json( filePathName) //df.show() counts.foreachPartition(partF => { partF.foreach( item_row => { println(item_row) }); }); }

Исправляем ошибку: java.lang.UnsupportedOperationException: Can't read accumulator value in task

Эта ошибка возникает в ситуациях, когда spark-streaming задача восстанавливается с последнего checkpoint-a. 1-е нужно у убрать с кода обращение к значению ".value". Тоесть читать значение акумулятора не нужно через значение ".value". Нужно обращаться просто по имени акумулятора. Если вы используете checkpoint-ы, при восстановлении работы драйвера, акумулятор не реинициализирован, потому вы можете увидеть следующую ошибку: ERROR scheduler.DAGScheduler: Failed to update accumulators for ResultTask(5, 5) java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long Ошибка эта также говорит нам о том, что акумулятор не реиницилизирован при восстановлении с checkpoint-a. Для этого нужно добавить проверку на инициализацию акумулятора: object Accum_inst { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext, AccumName: String): Accumulator[Long] = { if (instance == null) { synchronized { ...

Пример работы с SparkOnHbase

SparkOnHbase https://github.com/cloudera-labs/SparkOnHBase Скачиваем jar архив spark-hbase-0.0.2-clabs.jar для использования возможностей SparkOnHbase Запускаем spark-shell: spark-shell --master "local[2]" --driver-class-path /home/hdfs/spark-hbase-0.0.2-clabs.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar import com.cloudera.spark.hbase._ import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Scan import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.client.Result import org.apache.spark.SparkConf val rdd = sc.parallelize(Array( (Bytes.toBytes("user_id:10")), (By...

PHP клиент для работы с HBase и Hive

При работе с кластером Hadoop, может возникнуть необходимость обратится к данным, которые хранятся в HBase и Hive в рамках Hadoop кластера. Для этого можно воспользоваться Apache Thrift фреймворком, который обеспечивает кросс-языковое решения обмена данными. Для доступа к HBase и Hive с PHP можно использовать php trhift клиент. Реализовывать самому этот клиент может быть долго и сложно. Поэтому лучше использовать готовые наработки. К примеру вот хорошая реализация  клиента на PHP. После скачивания файлов можно работать с данными. Попробуем реализовать пару простых методов доступа к данным: //thrift php $GLOBALS [ ‘THRIFT_ROOT’ ] = dirname ( __FILE__ ) . ‘/thrift/src’ ; require_once ( $GLOBALS [ ‘THRIFT_ROOT’ ] . ‘/Thrift.php’ ) ; require_once ( $GLOBALS [ ‘THRIFT_ROOT’ ] . ‘/transport/TSocket.php’ ) ; require_once ( $GLOBALS [ ‘THRIFT_ROOT’ ] . ‘/transport/TBufferedTransport.php’ ) ; require_once ( $GLOBALS [ ‘THRIFT_ROOT’ ] . ‘/protocol/TBinaryProtocol.php’ ...