Реализация real-time загрузки данных в Hive c помощью Kafka topic и Apache Flume
Для сохранения данных с Kafka topic напрямую в Hive можно использовать HiveSink:
Если данные в Kafka топике у Вас сохранены у Вас в Json-e, то для этого есть .serializer = JSON. Также возможен вариант DELIMITED с последующим указанием разделителя для значений.
Создаём таблицу в Hive для загрузки данных:
Пример настройки Hive Sink:
На этом можно было бы и остановится, но если Вы хотите использовать формат таблиц Parquet, то нужно делать доп. телодвижения.
Проблема в том, что Hive Sink походу пока ещё не умеет сохранять данные в Parquet. При попытке сохранения данных с помощью Hive Sink, возникает следующая ошибка: "parquet.MapredParquetOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.AcidOutputFormat".
Возможно проблема кроется в этом баге, который на данный момент ещё не закрыт.
Нам же остаётся только вставлять данные в аналогичную табличку в формате Parquet, с помощью запроса:
Полезные ссылки:
Если данные в Kafka топике у Вас сохранены у Вас в Json-e, то для этого есть .serializer = JSON. Также возможен вариант DELIMITED с последующим указанием разделителя для значений.
Создаём таблицу в Hive для загрузки данных:
CREATE TABLE `db_name.table_name`(
`date` string,`cost` string,`cost_origin` string,`campaign` string, `currency` string)
CLUSTERED BY(campaign) INTO 10 BUCKETS
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'/user/flume/flumeingest/db.table_name';
Пример настройки Hive Sink:
tier1.sinks.HiveSink5.channel = channel5
tier1.sinks.HiveSink5.type = hive
tier1.sinks.HiveSink5.hive.metastore = thirft://127.0.0.1:9083
tier1.sinks.HiveSink5.hive.database = db_name
tier1.sinks.HiveSink5.hive.table = table_name
#tier1.sinks.HiveSink5.hive.partition = %{year}
tier1.sinks.HiveSink5.hive.txnsPerBatchAsk = 10
tier1.sinks.HiveSink5.batchSize = 15000
tier1.sinks.HiveSink5.serializer = JSON
#tier1.sinks.HiveSink5.serializer.delimiter = ,
tier1.sinks.HiveSink5.serializer.fieldnames =date,cost,cost_origin,currency,campaign
На этом можно было бы и остановится, но если Вы хотите использовать формат таблиц Parquet, то нужно делать доп. телодвижения.
Проблема в том, что Hive Sink походу пока ещё не умеет сохранять данные в Parquet. При попытке сохранения данных с помощью Hive Sink, возникает следующая ошибка: "parquet.MapredParquetOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.AcidOutputFormat".
Возможно проблема кроется в этом баге, который на данный момент ещё не закрыт.
Нам же остаётся только вставлять данные в аналогичную табличку в формате Parquet, с помощью запроса:
INSERT OVERWRITE TABLE `db_name.table_name_parquet` SELECT * FROM `db_name.table_name`;
Тут есть даже пример как автоматизировать этот процесс.Полезные ссылки:
- https://flume.apache.org/FlumeUserGuide.html#hive-sink
- http://henning.kropponline.de/2015/05/19/hivesink-for-flume/
- http://ctheu.com/2016/03/31/from-apache-flume-to-apache-impala-using-hdfs/
- http://henning.kropponline.de/2015/05/10/hadoop-file-ingest-and-hive/
- https://community.cloudera.com/t5/Data-Ingestion-Integration/Apache-Flume-and-parquet/td-p/8508
Коментарі
Дописати коментар