RunningExecutionsUpdater.java

279 lines | 10.173 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import azkaban.alert.Alerter;
import azkaban.metrics.CommonMetrics;
import azkaban.utils.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import javax.inject.Inject;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/**
 * Updates running executions.
 */
public class RunningExecutionsUpdater {

  private static final Logger logger = Logger.getLogger(RunningExecutionsUpdater.class);
  // First email is sent after 1 minute of unresponsiveness
  final int numErrorsBeforeUnresponsiveEmail = 6;
  final long errorThreshold = 10000;
  // When we have an http error, for that flow, we'll check every 10 secs, 360
  // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
  private final int numErrorsBetweenUnresponsiveEmail = 360;
  private final ExecutorManagerUpdaterStage updaterStage;
  private final AlerterHolder alerterHolder;
  private final CommonMetrics commonMetrics;
  private final ExecutorApiGateway apiGateway;
  private final RunningExecutions runningExecutions;
  private final ExecutionFinalizer executionFinalizer;
  private final ExecutorLoader executorLoader;

  @Inject
  public RunningExecutionsUpdater(final ExecutorManagerUpdaterStage updaterStage,
      final AlerterHolder alerterHolder, final CommonMetrics commonMetrics,
      final ExecutorApiGateway apiGateway, final RunningExecutions runningExecutions,
      final ExecutionFinalizer executionFinalizer, final ExecutorLoader executorLoader) {
    this.updaterStage = updaterStage;
    this.alerterHolder = alerterHolder;
    this.commonMetrics = commonMetrics;
    this.apiGateway = apiGateway;
    this.runningExecutions = runningExecutions;
    this.executionFinalizer = executionFinalizer;
    this.executorLoader = executorLoader;
  }

  /**
   * Updates running executions.
   */
  @SuppressWarnings("unchecked")
  public void updateExecutions() {
    this.updaterStage.set("Starting update all flows.");
    final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
    final ArrayList<ExecutableFlow> finalizeFlows =
        new ArrayList<>();

    for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
        .entrySet()) {

      final Optional<Executor> executorOption = entry.getKey();
      if (!executorOption.isPresent()) {
        for (final ExecutableFlow flow : entry.getValue()) {
          logger.warn("Finalizing execution " + flow.getExecutionId()
              + ". Executor id of this execution doesn't exist");
          finalizeFlows.add(flow);
        }
        continue;
      }
      final Executor executor = executorOption.get();

      this.updaterStage.set("Starting update flows on " + executor.getHost() + ":"
          + executor.getPort());

      Map<String, Object> results = null;
      try {
        results = this.apiGateway.updateExecutions(executor, entry.getValue());
      } catch (final ExecutorManagerException e) {
        handleException(entry, executor, e, finalizeFlows);
      }

      if (results != null) {
        final List<Map<String, Object>> executionUpdates =
            (List<Map<String, Object>>) results
                .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
        for (final Map<String, Object> updateMap : executionUpdates) {
          try {
            final ExecutableFlow flow = updateExecution(updateMap);

            this.updaterStage.set("Updated flow " + flow.getExecutionId());

            if (ExecutionControllerUtils.isFinished(flow)) {
              finalizeFlows.add(flow);
            }
          } catch (final ExecutorManagerException e) {
            final ExecutableFlow flow = e.getExecutableFlow();
            logger.error(e);

            if (flow != null) {
              logger.warn("Finalizing execution " + flow.getExecutionId());
              finalizeFlows.add(flow);
            }
          }
        }
      }
    }

    this.updaterStage.set("Finalizing " + finalizeFlows.size() + " error flows.");

    for (final ExecutableFlow flow : finalizeFlows) {
      this.executionFinalizer
          .finalizeFlow(flow, "Not running on the assigned executor (any more)", null);
    }

    this.updaterStage.set("Updated all active flows. Waiting for next round.");
  }

  private void handleException(final Entry<Optional<Executor>, List<ExecutableFlow>> entry,
      final Executor executor, final ExecutorManagerException e,
      final ArrayList<ExecutableFlow> finalizeFlows) {
    logger.error("Failed to get update from executor " + executor.getHost(), e);
    boolean sendUnresponsiveEmail = false;
    final boolean executorRemoved = isExecutorRemoved(executor.getId());
    for (final ExecutableFlow flow : entry.getValue()) {
      final Pair<ExecutionReference, ExecutableFlow> pair =
          this.runningExecutions.get().get(flow.getExecutionId());

      this.updaterStage
          .set("Failed to get update for flow " + pair.getSecond().getExecutionId());

      if (executorRemoved) {
        logger.warn("Finalizing execution " + flow.getExecutionId()
            + ". Executor is removed");
        finalizeFlows.add(flow);
      } else {
        final ExecutionReference ref = pair.getFirst();
        ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
        ref.setNumErrors(ref.getNumErrors() + 1);
        if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
            || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
          // if any of the executions has failed many enough updates, alert
          sendUnresponsiveEmail = true;
        }
      }
    }
    if (sendUnresponsiveEmail) {
      final Alerter mailAlerter = this.alerterHolder.get("email");
      mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
    }
  }

  private boolean isExecutorRemoved(final int id) {
    final Executor fetchedExecutor;
    try {
      fetchedExecutor = this.executorLoader.fetchExecutor(id);
    } catch (final ExecutorManagerException e) {
      logger.error("Couldn't check if executor exists", e);
      // don't know if removed or not -> default to false
      return false;
    }
    return fetchedExecutor == null;
  }

  /* Group Executable flow by Executors to reduce number of REST calls */
  private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
    final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
        new HashMap<>();

    for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningExecutions.get()
        .values()) {
      final ExecutionReference ref = runningFlow.getFirst();
      final ExecutableFlow flow = runningFlow.getSecond();
      final Optional<Executor> executor = ref.getExecutor();

      // We can set the next check time to prevent the checking of certain
      // flows.
      if (ref.getNextCheckTime() >= DateTime.now().getMillis()) {
        continue;
      }

      List<ExecutableFlow> flows = exFlowMap.get(executor);
      if (flows == null) {
        flows = new ArrayList<>();
        exFlowMap.put(executor, flows);
      }

      flows.add(flow);
    }

    return exFlowMap;
  }

  private ExecutableFlow updateExecution(final Map<String, Object> updateData)
      throws ExecutorManagerException {

    final Integer execId =
        (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
    if (execId == null) {
      throw new ExecutorManagerException(
          "Response is malformed. Need exec id to update.");
    }

    final Pair<ExecutionReference, ExecutableFlow> refPair =
        this.runningExecutions.get().get(execId);
    if (refPair == null) {
      // this shouldn't ever happen on real azkaban runtime.
      // but this can easily happen in unit tests if there's some inconsistent mocking.
      throw new ExecutorManagerException(
          "No execution found in the map with the execution id any more. Removing " + execId);
    }

    final ExecutionReference ref = refPair.getFirst();
    final ExecutableFlow flow = refPair.getSecond();
    if (updateData.containsKey("error")) {
      // The flow should be finished here.
      throw new ExecutorManagerException((String) updateData.get("error"), flow);
    }

    // Reset errors.
    ref.setNextCheckTime(0);
    ref.setNumErrors(0);
    final Status oldStatus = flow.getStatus();
    flow.applyUpdateObject(updateData);
    final Status newStatus = flow.getStatus();

    if (oldStatus != newStatus && newStatus == Status.FAILED) {
      this.commonMetrics.markFlowFail();
    }

    final ExecutionOptions options = flow.getExecutionOptions();
    if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
      // We want to see if we should give an email status on first failure.
      if (options.getNotifyOnFirstFailure()) {
        final Alerter mailAlerter = this.alerterHolder.get("email");
        try {
          mailAlerter.alertOnFirstError(flow);
        } catch (final Exception e) {
          logger.error("Failed to send first error email." + e.getMessage(), e);
        }
      }
      if (options.getFlowParameters().containsKey("alert.type")) {
        final String alertType = options.getFlowParameters().get("alert.type");
        final Alerter alerter = this.alerterHolder.get(alertType);
        if (alerter != null) {
          try {
            alerter.alertOnFirstError(flow);
          } catch (final Exception e) {
            logger.error("Failed to alert by " + alertType, e);
          }
        } else {
          logger.error("Alerter type " + alertType
              + " doesn't exist. Failed to alert.");
        }
      }
    }

    return flow;
  }

}