Исправляем ошибку: java.lang.UnsupportedOperationException: Can't read accumulator value in task
Эта ошибка возникает в ситуациях, когда spark-streaming задача восстанавливается с последнего checkpoint-a.
1-е нужно у убрать с кода обращение к значению ".value". Тоесть читать значение акумулятора не нужно через значение ".value". Нужно обращаться просто по имени акумулятора.
Если вы используете checkpoint-ы, при восстановлении работы драйвера, акумулятор не реинициализирован, потому вы можете увидеть следующую ошибку:
Полезные ссылки:
1-е нужно у убрать с кода обращение к значению ".value". Тоесть читать значение акумулятора не нужно через значение ".value". Нужно обращаться просто по имени акумулятора.
Если вы используете checkpoint-ы, при восстановлении работы драйвера, акумулятор не реинициализирован, потому вы можете увидеть следующую ошибку:
ERROR scheduler.DAGScheduler: Failed to update accumulators for ResultTask(5, 5)
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
Ошибка эта также говорит нам о том, что акумулятор не реиницилизирован при восстановлении с checkpoint-a. Для этого нужно добавить проверку на инициализацию акумулятора:object Accum_inst {
@volatile private var instance: Accumulator[Long] = null
def getInstance(sc: SparkContext, AccumName: String): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, AccumName)
}
}
}
instance
}
}
///....
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
ssc.checkpoint(checkpointDirectory)
rdd.foreachRDD((recrdd, time: Time) => {
val updateAllCount = Accum_inst.getInstance(recrdd.sparkContext, "Counter all")
recrdd.foreachPartition(part => {
var partCount = 0L;
part.foreach( item_row => {
partCount += 1
})
updateAllCount += partCount;
})
System.out.println("count processed: "+ updateAllCount + ";")
})
Полезные ссылки:
Коментарі
Дописати коментар