Real-time cохранение данных в SOLR с помощью Kafka + Flume

   Для заливки данных в колекцию SOLR можно использовать сочетание Kafka + Flume MorphlineSolrSink. Это позволяет быстро настроить добавление новых данных в SOLR даже без необходимости писать код.
    Предварительно сохранённая в формате json информация в Kafka topic читается, преобразовывается и сохраняется в SOLR с помощью MorphlineSolrSink. При этом все нюансы предподготовки данных перед сохранением в документ SOLR можно описать в конфигурационном файле для morphline.conf.

Файл конфигураций для flume:
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = solrSink1 

tier1.sources.source1.channels = channel1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181 Zookeeper client port
tier1.sources.source1.topic = test_topic
tier1.sources.source1.kafka.auto.offset.reset = smallest
tier1.sources.source1.groupId = flume_source_test_topic
tier1.sources.source1.batchSize = 2000
tier1.sources.source1.batchDurationMillis = 1000

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
#tier1.channels.channel1.capacity = 10000
#tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList = 127.0.0.1:9092 //Kafka TCP PORT
tier1.channels.channel1.zookeeperConnect = 127.0.0.1:2181 //Zookeeper client port
tier1.channels.channel1.topic = test_topic_channel
tier1.channels.channel1.readSmallestOffset = true
tier1.channels.channel1.groupId = test_topic_group_channel
#tier1.channels.channel1.parseAsFlumeEvent = true

tier1.sinks.solrSink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.solrSink1.channel = channel1
tier1.sinks.solrSink1.batchSize = 100
tier1.sinks.solrSink1.batchDurationMillis = 1000
tier1.sinks.solrSink1.morphlineFile = /etc/flume-ng/conf/morphline.conf
tier1.sinks.solrSink1.morphlineId = morphline_test_topic
tier1.sinks.solrSink1.isProductionMode=false 

Файл конфигураций /etc/flume-ng/conf/morphline.conf:

SOLR_LOCATOR : {
  # Name of solr collection
  collection : collection1

  # ZooKeeper ensemble
#  zkHost : "$ZK_HOST"
   zkHost : "127.0.0.1:2181/solr"
}

morphlines: [
  {
    id: morphline_test_topic
    importCommands: [
      "org.kitesdk.**",
      "org.apache.solr.**"
    ]
    commands: [
      {
        readJson {
          #outputClass:java.util.Map
        }
      }
      {
        extractJsonPaths {
          flatten: true
          paths: {
            uid:/uid
            field_1:/field_1
            field_2:/field_2
            dt:/dt
            params:/params
            event_msTime:/event_msTime
          }
        }
      }
      {
        convertTimestamp {
          field: dt
          inputFormats: [
            "yyyy-MM-dd HH:mm:ss"
          ]
          inputTimezone: Europe/Kiev
          #outputFormat: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
          outputFormat: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'"
          outputTimezone: Europe/Kiev
        }
      }
      {
        convertTimestamp {
          field: event_msTime
          inputFormats: [
            "unixTimeInMillis"
          ]
          inputTimezone: Europe/Kiev
          outputFormat: "yyyy-MM-dd HH:mm:ss"
          outputTimezone: Europe/Kiev
        }
      }
      {
        generateUUID {
          field: _generatedUUid
        }
      }
      { # Remove record fields that are unknown to Solr schema.xml.
        # Recall that Solr throws an exception on any attempt to load a document that
        # contains a field that isn't specified in schema.xml.
        sanitizeUnknownSolrFields {
          solrLocator : ${SOLR_LOCATOR} # Location from which to fetch Solr schema
        }
      }
      {
        setValues {
          _attachment_body: []
        }
      }
      {
        logDebug {
          format: "output record: {}",
          args: [
            "@{}"
          ]
        }
      }

      {
        loadSolr {
          solrLocator: ${SOLR_LOCATOR}
        }
      }
    ]
  }
]

 



Команда extractJsonPaths необходима для того, чтобы вытащить данные с массива _attachment_body в который команда  readJson  сохраняет прочитаную json строку с топика Kafka. Далее команда setValues { _attachment_body: [] } просто обнуляет это поле чтобы не сохранят его в документ SOLR

Коментарі

Популярні дописи з цього блогу

Минимальные требование для кластера Hadoop(with Spark)

Apache Spark - основные команды

Apache Spark Resource Management и модель приложений от YARN