Real-time cохранение данных в SOLR с помощью Kafka + Flume
Для заливки данных в колекцию SOLR можно использовать сочетание Kafka + Flume MorphlineSolrSink. Это позволяет быстро настроить добавление новых данных в SOLR даже без необходимости писать код.
Предварительно сохранённая в формате json информация в Kafka topic читается, преобразовывается и сохраняется в SOLR с помощью MorphlineSolrSink. При этом все нюансы предподготовки данных перед сохранением в документ SOLR можно описать в конфигурационном файле для morphline.conf.
Файл конфигураций для flume:
Файл конфигураций /etc/flume-ng/conf/morphline.conf:
Команда extractJsonPaths необходима для того, чтобы вытащить данные с массива _attachment_body в который команда readJson сохраняет прочитаную json строку с топика Kafka. Далее команда setValues { _attachment_body: [] } просто обнуляет это поле чтобы не сохранят его в документ SOLR
Предварительно сохранённая в формате json информация в Kafka topic читается, преобразовывается и сохраняется в SOLR с помощью MorphlineSolrSink. При этом все нюансы предподготовки данных перед сохранением в документ SOLR можно описать в конфигурационном файле для morphline.conf.
Файл конфигураций для flume:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = solrSink1
tier1.sources.source1.channels = channel1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181 Zookeeper client port
tier1.sources.source1.topic = test_topic
tier1.sources.source1.kafka.auto.offset.reset = smallest
tier1.sources.source1.groupId = flume_source_test_topic
tier1.sources.source1.batchSize = 2000
tier1.sources.source1.batchDurationMillis = 1000
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
#tier1.channels.channel1.capacity = 10000
#tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList = 127.0.0.1:9092 //Kafka TCP PORT
tier1.channels.channel1.zookeeperConnect = 127.0.0.1:2181 //Zookeeper client port
tier1.channels.channel1.topic = test_topic_channel
tier1.channels.channel1.readSmallestOffset = true
tier1.channels.channel1.groupId = test_topic_group_channel
#tier1.channels.channel1.parseAsFlumeEvent = true
tier1.sinks.solrSink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.solrSink1.channel = channel1
tier1.sinks.solrSink1.batchSize = 100
tier1.sinks.solrSink1.batchDurationMillis = 1000
tier1.sinks.solrSink1.morphlineFile = /etc/flume-ng/conf/morphline.conf
tier1.sinks.solrSink1.morphlineId = morphline_test_topic
tier1.sinks.solrSink1.isProductionMode=false
Файл конфигураций /etc/flume-ng/conf/morphline.conf:
SOLR_LOCATOR : {
# Name of solr collection
collection : collection1
# ZooKeeper ensemble
# zkHost : "$ZK_HOST"
zkHost : "127.0.0.1:2181/solr"
}
morphlines: [
{
id: morphline_test_topic
importCommands: [
"org.kitesdk.**",
"org.apache.solr.**"
]
commands: [
{
readJson {
#outputClass:java.util.Map
}
}
{
extractJsonPaths {
flatten: true
paths: {
uid:/uid
field_1:/field_1
field_2:/field_2
dt:/dt
params:/params
event_msTime:/event_msTime
}
}
}
{
convertTimestamp {
field: dt
inputFormats: [
"yyyy-MM-dd HH:mm:ss"
]
inputTimezone: Europe/Kiev
#outputFormat: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputFormat: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'"
outputTimezone: Europe/Kiev
}
}
{
convertTimestamp {
field: event_msTime
inputFormats: [
"unixTimeInMillis"
]
inputTimezone: Europe/Kiev
outputFormat: "yyyy-MM-dd HH:mm:ss"
outputTimezone: Europe/Kiev
}
}
{
generateUUID {
field: _generatedUUid
}
}
{ # Remove record fields that are unknown to Solr schema.xml.
# Recall that Solr throws an exception on any attempt to load a document that
# contains a field that isn't specified in schema.xml.
sanitizeUnknownSolrFields {
solrLocator : ${SOLR_LOCATOR} # Location from which to fetch Solr schema
}
}
{
setValues {
_attachment_body: []
}
}
{
logDebug {
format: "output record: {}",
args: [
"@{}"
]
}
}
{
loadSolr {
solrLocator: ${SOLR_LOCATOR}
}
}
]
}
]
Коментарі
Дописати коментар