博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink - accumulator
阅读量:7010 次
发布时间:2019-06-28

本文共 10173 字,大约阅读时间需要 33 分钟。

 

读accumlator

JobManager

在job finish的时候会汇总accumulator的值,

newJobStatus match {  case JobStatus.FINISHED =>  try {    val accumulatorResults = executionGraph.getAccumulatorsSerialized()    val result = new SerializedJobExecutionResult(      jobID,      jobInfo.duration,      accumulatorResults)    jobInfo.client ! decorateMessage(JobResultSuccess(result))  }

 

在client请求accumulation时,

public Map
getAccumulators(JobID jobID, ClassLoader loader) throws Exception { ActorGateway jobManagerGateway = getJobManagerGateway(); Future
response; try { response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout); } catch (Exception e) { throw new Exception("Failed to query the job manager gateway for accumulators.", e); }

 

消息传到job manager

case message: AccumulatorMessage => handleAccumulatorMessage(message)
private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {message match {  case RequestAccumulatorResults(jobID) =>    try {      currentJobs.get(jobID) match {        case Some((graph, jobInfo)) =>          val accumulatorValues = graph.getAccumulatorsSerialized()          sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))        case None =>          archive.forward(message)      }    }

 

ExecuteGraph

获取accumulator的值

/** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. * @throws IOException */public Map
> getAccumulatorsSerialized() throws IOException { Map
> accumulatorMap = aggregateUserAccumulators(); Map
> result = new HashMap
>(); for (Map.Entry
> entry : accumulatorMap.entrySet()) { result.put(entry.getKey(), new SerializedValue
(entry.getValue().getLocalValue())); } return result;}

 

execution的accumulator聚合,

/** * Merges all accumulator results from the tasks previously executed in the Executions. * @return The accumulator map */public Map
> aggregateUserAccumulators() { Map
> userAccumulators = new HashMap
>(); for (ExecutionVertex vertex : getAllExecutionVertices()) { Map
> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); if (next != null) { AccumulatorHelper.mergeInto(userAccumulators, next); } } return userAccumulators;}

具体merge的逻辑,

public static void mergeInto(Map
> target, Map
> toMerge) { for (Map.Entry
> otherEntry : toMerge.entrySet()) { Accumulator
ownAccumulator = target.get(otherEntry.getKey()); if (ownAccumulator == null) { // Create initial counter (copy!) target.put(otherEntry.getKey(), otherEntry.getValue().clone()); } else { // Both should have the same type AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(), ownAccumulator.getClass(), otherEntry.getValue().getClass()); // Merge target counter with other counter mergeSingle(ownAccumulator, otherEntry.getValue()); } }}

 

更新accumulator

JobManager

收到task发来的heartbeat,其中附带accumulators

case Heartbeat(instanceID, metricsReport, accumulators) =>  updateAccumulators(accumulators)

根据jobid,更新到ExecutionGraph

private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {    accumulators foreach {      case accumulatorEvent =>        currentJobs.get(accumulatorEvent.getJobID) match {          case Some((jobGraph, jobInfo)) =>            future {              jobGraph.updateAccumulators(accumulatorEvent)            }(context.dispatcher)          case None =>          // ignore accumulator values for old job        }    }}

根据ExecutionAttemptID, 更新Execution中

/** * Updates the accumulators during the runtime of a job. Final accumulator results are transferred * through the UpdateTaskExecutionState message. * @param accumulatorSnapshot The serialized flink and user-defined accumulators */public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {    Map
> flinkAccumulators; Map
> userAccumulators; try { flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators(); userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader); ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID(); Execution execution = currentExecutions.get(execID); if (execution != null) { execution.setAccumulators(flinkAccumulators, userAccumulators); } }}

对于execution,只要状态不是结束,就直接更新

/** * Update accumulators (discarded when the Execution has already been terminated). * @param flinkAccumulators the flink internal accumulators * @param userAccumulators the user accumulators */public void setAccumulators(Map
> flinkAccumulators, Map
> userAccumulators) { synchronized (accumulatorLock) { if (!state.isTerminal()) { this.flinkAccumulators = flinkAccumulators; this.userAccumulators = userAccumulators; } }}

 

再看TaskManager如何更新accumulator,并发送heartbeat,

/**   * Sends a heartbeat message to the JobManager (if connected) with the current   * metrics report.   */  protected def sendHeartbeatToJobManager(): Unit = {    try {      val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)      val accumulatorEvents =        scala.collection.mutable.Buffer[AccumulatorSnapshot]()      runningTasks foreach {        case (execID, task) =>          val registry = task.getAccumulatorRegistry          val accumulators = registry.getSnapshot          accumulatorEvents.append(accumulators)      }       currentJobManager foreach {        jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents))      }    }  }

可以看到会把每个running task的accumulators放到accumulatorEvents,然后通过Heartbeat消息发出

 

而task的accumlators是通过,task.getAccumulatorRegistry.getSnapshot得到

看看
AccumulatorRegistry
/** * Main accumulator registry which encapsulates internal and user-defined accumulators. */public class AccumulatorRegistry {    protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class);    protected final JobID jobID;  //accumulators所属的Job    protected final ExecutionAttemptID taskID; //taskID    /* Flink's internal Accumulator values stored for the executing task. */    private final Map
> flinkAccumulators = //内部的Accumulators new HashMap
>(); /* User-defined Accumulator values stored for the executing task. */ private final Map
> userAccumulators = new HashMap<>(); //用户定义的Accumulators /* The reporter reference that is handed to the reporting tasks. */ private final ReadWriteReporter reporter; /** * Creates a snapshot of this accumulator registry. * @return a serialized accumulator map */ public AccumulatorSnapshot getSnapshot() { try { return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators); } catch (IOException e) { LOG.warn("Failed to serialize accumulators for task.", e); return null; } }}

snapshot的逻辑也很简单,

public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID,                        Map
> flinkAccumulators, Map
> userAccumulators) throws IOException { this.jobID = jobID; this.executionAttemptID = executionAttemptID; this.flinkAccumulators = new SerializedValue
>>(flinkAccumulators); this.userAccumulators = new SerializedValue
>>(userAccumulators);}

 

最后,我们如何将统计数据累加到Accumulator上的?

直接看看Flink内部的Accumulator是如何更新的,都是通过这个reporter来更新的

/** * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out) */private static class ReadWriteReporter implements Reporter {    private LongCounter numRecordsIn = new LongCounter();    private LongCounter numRecordsOut = new LongCounter();    private LongCounter numBytesIn = new LongCounter();    private LongCounter numBytesOut = new LongCounter();    private ReadWriteReporter(Map
> accumulatorMap) { accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn); accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut); accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn); accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut); } @Override public void reportNumRecordsIn(long value) { numRecordsIn.add(value); } @Override public void reportNumRecordsOut(long value) { numRecordsOut.add(value); } @Override public void reportNumBytesIn(long value) { numBytesIn.add(value); } @Override public void reportNumBytesOut(long value) { numBytesOut.add(value); }}

 

何处调用到这个report的接口,

对于in, 在反序列化到record的时候会统计Bytesin和Recordsin

AdaptiveSpanningRecordDeserializer
public DeserializationResult getNextRecord(T target) throws IOException {    // check if we can get a full length;    if (nonSpanningRemaining >= 4) {        int len = this.nonSpanningWrapper.readInt();        if (reporter != null) {            reporter.reportNumBytesIn(len);        }                if (len <= nonSpanningRemaining - 4) {            // we can get a full record from here            target.read(this.nonSpanningWrapper);            if (reporter != null) {                reporter.reportNumRecordsIn(1);            }

 

所以对于out,反之则序列化的时候写入

SpanningRecordSerializer
@Overridepublic SerializationResult addRecord(T record) throws IOException {    int len = this.serializationBuffer.length();    this.lengthBuffer.putInt(0, len);    if (reporter != null) {        reporter.reportNumBytesOut(len);        reporter.reportNumRecordsOut(1);    }

 

使用accumulator时,需要首先extends by callinggetRuntimeContext().addAccumulator

转载地址:http://pojtl.baihongyu.com/

你可能感兴趣的文章