Исправляем ошибку: java.lang.UnsupportedOperationException: Can't read accumulator value in task

Эта ошибка возникает в ситуациях, когда spark-streaming задача восстанавливается с последнего checkpoint-a.
1-е нужно у убрать с кода обращение к значению ".value". Тоесть читать значение акумулятора не нужно через значение ".value". Нужно обращаться просто по имени акумулятора.

Если вы используете checkpoint-ы, при восстановлении работы драйвера, акумулятор не реинициализирован, потому вы можете увидеть следующую ошибку:
ERROR scheduler.DAGScheduler: Failed to update accumulators for ResultTask(5, 5)
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
Ошибка эта также говорит нам о том, что акумулятор не реиницилизирован при восстановлении с checkpoint-a. Для этого нужно добавить проверку на инициализацию акумулятора:
object Accum_inst {

  @volatile private var instance: Accumulator[Long] = null

  def getInstance(sc: SparkContext, AccumName: String): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, AccumName)
        }
      }
    }
    instance
  }
}
///....
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
ssc.checkpoint(checkpointDirectory)
rdd.foreachRDD((recrdd, time: Time) => {
   val updateAllCount = Accum_inst.getInstance(recrdd.sparkContext, "Counter all")
   recrdd.foreachPartition(part => {
     var partCount = 0L;
     part.foreach( item_row => {
       partCount += 1
     })
     updateAllCount += partCount;
   })
   System.out.println("count processed: "+ updateAllCount + ";")
})

Полезные ссылки:

Коментарі

Популярні дописи з цього блогу

Минимальные требование для кластера Hadoop(with Spark)

Apache Spark Resource Management и модель приложений от YARN

Apache Spark - основные команды