Проблема с файлами маленьких размеров при сохранении данных в hdfs c помощью Аpache Kafka & Flume
Flume показался наиболее подходящим инструментом для сохранения потоковых данных в hdfs.
Изначально был ностроен flume-agent для сохранения данных в hdfs следующим образом:
Изначально был ностроен flume-agent для сохранения данных в hdfs следующим образом:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = users
tier1.sources.source1.groupId = 67
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/hdfs/tables/%{topic}/%Y-%m-%d
# rollover file based on max time of 2 min
tier1.sinks.sink1.hdfs.rollInterval = 120
# rollover file based on maximum size of 0 MB
tier1.sinks.sink1.hdfs.rollSize = 0
# Give us a higher timeout because we are writing in batch
tier1.sinks.sink1.hdfs.callTimeout = 60000
# never rollover based on the number of events
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
Идея была в том, что-бы сохранять данные по дням для удобной обработки файлов hdfs и с дальнейшим подтягиванием в Hive.
Но это привело к тому что в директории hdfs создавались файлы размером в несколько килобайт. Как выяснилось позже flume не дозаписывает файл а каждый раз создаёт новый. За размер файла отвечают следующие параметры:
Но это привело к тому что в директории hdfs создавались файлы размером в несколько килобайт. Как выяснилось позже flume не дозаписывает файл а каждый раз создаёт новый. За размер файла отвечают следующие параметры:
As described in the documentation, Flume HDFS sink has several file closing strategies:
- each N seconds (specified by
rollInterval
option) - after writing N bytes (
rollSize
option) - after writing N received events (
rollCount
option) - after N seconds of inactivity (
idleTimeout
option)
Одним из решений может быть сброс всех параметров в дефолтное значение и указания кол. строк после которых будет создаваться новый файл:
#ALWAYS PROVIDE THESE OVERRIDES OF THE DEFAULTS !!
agent3.sinks.snk1.hdfs.rollSize = 0
agent3.sinks.snk1.hdfs.rollCount = 0
agent3.sinks.snk1.hdfs.rollInterval = 0
agent3.sinks.snk1.hdfs.idleTimeout = 0
#SET TO YOUR NEEDS
#rollCount writes to hdfs every 20 lines
agent3.sinks.snk1.hdfs.rollCount = 20
Не забываем про параметр idleTimeout который говорит о том когда сохраниться файл если небыло никаких действий.Также можно попробывать a1.sinks.HDFS.hdfs.minBlockReplicas = 1 # or 3
Этот параметр действительно позволяет создвавать 1 файл, но проблема в том, что файл находится всегда в открытом состоянии и не доступен другим задачам.
Вот итоговый файл конфигурации:
Также интерестной показалась идея сохранения данных напрямую в таблицы Hive, но улыбнула надпись, что этот сценарий предполагается только как експериментальный и не советуется для использования в продакшене ;)
Вот итоговый файл конфигурации:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = replica-users
tier1.sources.source1.groupId = 67
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/hdfs/tables/%{topic}/%Y-%m-%d
tier1.sinks.sink1.hdfs.minBlockReplicas = 2
tier1.sinks.sink1.hdfs.rollInterval = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.idleTimeout = 5
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
Также интерестной показалась идея сохранения данных напрямую в таблицы Hive, но улыбнула надпись, что этот сценарий предполагается только как експериментальный и не советуется для использования в продакшене ;)
type – The component type name, needs to be hive
hive.metastore – Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database – Hive database name
hive.table – Hive table name
Ещё можно соединять файлы в один большой. Для этого даже есть проект: https://github.com/edwardcapriolo/filecrush/ который создаёт в директориях один файл с многих маленьких.
Полезные ссылки:
Коментарі
Дописати коментар