Проблема с файлами маленьких размеров при сохранении данных в hdfs c помощью Аpache Kafka & Flume

Flume показался наиболее подходящим инструментом для сохранения потоковых данных в hdfs.
Изначально был ностроен flume-agent для сохранения данных в hdfs следующим образом:
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = users
tier1.sources.source1.groupId = 67
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/hdfs/tables/%{topic}/%Y-%m-%d
# rollover file based on max time of 2 min
tier1.sinks.sink1.hdfs.rollInterval = 120
 # rollover file based on maximum size of 0 MB
tier1.sinks.sink1.hdfs.rollSize = 0
# Give us a higher timeout because we are writing in batch
tier1.sinks.sink1.hdfs.callTimeout = 60000
# never rollover based on the number of events
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1 

Идея была в том, что-бы сохранять данные по дням для удобной обработки файлов hdfs и с дальнейшим подтягиванием в Hive.
Но это привело к тому что в директории hdfs создавались файлы размером в несколько килобайт. Как выяснилось позже flume не дозаписывает файл а каждый раз создаёт новый. За размер файла отвечают следующие параметры:
As described in the documentation, Flume HDFS sink has several file closing strategies:
  • each N seconds (specified by rollInterval option)
  • after writing N bytes (rollSize option)
  • after writing N received events (rollCount option)
  • after N seconds of inactivity (idleTimeout option)

Одним из решений может быть сброс всех параметров в дефолтное значение и указания кол. строк после которых будет создаваться новый файл:

#ALWAYS PROVIDE THESE OVERRIDES OF THE DEFAULTS !!

agent3.sinks.snk1.hdfs.rollSize = 0
agent3.sinks.snk1.hdfs.rollCount = 0
agent3.sinks.snk1.hdfs.rollInterval = 0
agent3.sinks.snk1.hdfs.idleTimeout = 0

#SET TO YOUR NEEDS

#rollCount writes to hdfs every 20 lines
agent3.sinks.snk1.hdfs.rollCount = 20
Не забываем про параметр idleTimeout который говорит о том когда сохраниться файл если небыло никаких действий.

Также можно попробывать a1.sinks.HDFS.hdfs.minBlockReplicas = 1 # or 3
Этот параметр действительно позволяет создвавать 1 файл, но проблема в том, что файл находится всегда в открытом состоянии и не доступен другим задачам.
Вот итоговый файл конфигурации:
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = replica-users
tier1.sources.source1.groupId = 67
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/hdfs/tables/%{topic}/%Y-%m-%d
tier1.sinks.sink1.hdfs.minBlockReplicas = 2
tier1.sinks.sink1.hdfs.rollInterval = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.idleTimeout = 5
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

Также интерестной показалась идея сохранения данных напрямую в таблицы Hive, но улыбнула надпись, что этот сценарий предполагается только как експериментальный и не советуется для использования в продакшене ;)
type – The component type name, needs to be hive
hive.metastore – Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database – Hive database name
hive.table – Hive table name

Ещё можно соединять файлы в один большой. Для этого даже есть проект: https://github.com/edwardcapriolo/filecrush/ который создаёт в директориях один файл с многих маленьких. 



Полезные ссылки:

Коментарі

Популярні дописи з цього блогу

Минимальные требование для кластера Hadoop(with Spark)

Линейная регрессия простыми словами

Исправляем ошибку HDFS Under-Replicated Blocks