ExecutorManager.java

1970 lines | 68.415 kB Blame History Raw Download
/*
 * Copyright 2014 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import azkaban.metrics.CommonMetrics;
import azkaban.constants.ServerProperties;
import azkaban.utils.FlowUtils;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
import azkaban.executor.selector.ExecutorSelector;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;

/**
 * Executor manager used to manage the client side job.
 *
 */
public class ExecutorManager extends EventHandler implements
    ExecutorManagerAdapter {
  static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
      "azkaban.executorselector.filters";
  static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
      "azkaban.executorselector.comparator.";
  static final String AZKABAN_QUEUEPROCESSING_ENABLED =
    "azkaban.queueprocessing.enabled";
  static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
    "azkaban.use.multiple.executors";
  private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
    "azkaban.webserver.queue.size";
  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
    "azkaban.activeexecutor.refresh.milisecinterval";
  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
    "azkaban.activeexecutor.refresh.flowinterval";
  private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
      "azkaban.executorinfo.refresh.maxThreads";
  private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
    "azkaban.maxDispatchingErrors";

  private static Logger logger = Logger.getLogger(ExecutorManager.class);
  private ExecutorLoader executorLoader;

  private CleanerThread cleanerThread;

  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
  private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
      new ConcurrentHashMap<Integer, ExecutableFlow>();

  QueuedExecutions queuedFlows;

  final private Set<Executor> activeExecutors = new HashSet<Executor>();
  private QueueProcessorThread queueProcessor;
  private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;

  private ExecutingManagerUpdaterThread executingManager;
  // 12 weeks
  private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
      * 24 * 60 * 60 * 1000L;
  private long lastCleanerThreadCheckTime = -1;

  private long lastThreadCheckTime = -1;
  private String updaterStage = "not started";

  private Map<String, Alerter> alerters;

  File cacheDir;

  private final Props azkProps;
  private List<String> filterList;
  private Map<String, Integer> comparatorWeightsMap;
  private long lastSuccessfulExecutorInfoRefresh;
  private ExecutorService executorInforRefresherService;

  public ExecutorManager(Props azkProps, ExecutorLoader loader,
      Map<String, Alerter> alerters) throws ExecutorManagerException {
    this.alerters = alerters;
    this.azkProps = azkProps;
    this.executorLoader = loader;
    this.setupExecutors();
    this.loadRunningFlows();

    queuedFlows =
        new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
    this.loadQueuedFlows();

    cacheDir = new File(azkProps.getString("cache.directory", "cache"));

    executingManager = new ExecutingManagerUpdaterThread();
    executingManager.start();

    if(isMultiExecutorMode()) {
      setupMultiExecutorMode();
    }

    long executionLogsRetentionMs =
        azkProps.getLong("execution.logs.retention.ms",
        DEFAULT_EXECUTION_LOGS_RETENTION_MS);

    cleanerThread = new CleanerThread(executionLogsRetentionMs);
    cleanerThread.start();

  }

  private void setupMultiExecutorMode() {
    // initliatize hard filters for executor selector from azkaban.properties
    String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
    if (filters != null) {
      filterList = Arrays.asList(StringUtils.split(filters, ","));
    }

    // initliatize comparator feature weights for executor selector from
    // azkaban.properties
    Map<String, String> compListStrings =
      azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
    if (compListStrings != null) {
      comparatorWeightsMap = new TreeMap<String, Integer>();
      for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
        comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
      }
    }

    executorInforRefresherService =
        Executors.newFixedThreadPool(azkProps.getInt(
          AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));

    // configure queue processor
    queueProcessor =
      new QueueProcessorThread(azkProps.getBoolean(
        AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
        AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));

    queueProcessor.start();
  }

  /**
   *
   * {@inheritDoc}
   * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
   */
  @Override
  public void setupExecutors() throws ExecutorManagerException {
    Set<Executor> newExecutors = new HashSet<Executor>();

    if (isMultiExecutorMode()) {
      logger.info("Initializing multi executors from database");
      newExecutors.addAll(executorLoader.fetchActiveExecutors());
    } else if (azkProps.containsKey("executor.port")) {
      // Add local executor, if specified as per properties
      String executorHost = azkProps.getString(ServerProperties.EXECUTOR_HOST, "localhost");
      int executorPort = azkProps.getInt("executor.port");
      logger.info(String.format("Initializing local executor %s:%d",
        executorHost, executorPort));
      Executor executor =
        executorLoader.fetchExecutor(executorHost, executorPort);
      if (executor == null) {
        executor = executorLoader.addExecutor(executorHost, executorPort);
      } else if (!executor.isActive()) {
        executor.setActive(true);
        executorLoader.updateExecutor(executor);
      }
      newExecutors.add(new Executor(executor.getId(), executorHost,
        executorPort, true));
    }

    if (newExecutors.isEmpty()) {
      logger.error("No active executor found");
      throw new ExecutorManagerException("No active executor found");
    } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
      logger.error("Multiple local executors specified");
      throw new ExecutorManagerException("Multiple local executors specified");
    } else {
      // clear all active executors, only if we have at least one new active
      // executors
      activeExecutors.clear();
      activeExecutors.addAll(newExecutors);
    }
  }

  private boolean isMultiExecutorMode() {
    return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
  }

  /**
   * Refresh Executor stats for all the actie executors in this executorManager
   */
  private void refreshExecutors() {
    synchronized (activeExecutors) {

      List<Pair<Executor, Future<String>>> futures =
        new ArrayList<Pair<Executor, Future<String>>>();
      for (final Executor executor : activeExecutors) {
        // execute each executorInfo refresh task to fetch
        Future<String> fetchExecutionInfo =
          executorInforRefresherService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
              return callExecutorForJsonString(executor.getHost(),
                executor.getPort(), "/serverStatistics", null);
            }
          });
        futures.add(new Pair<Executor, Future<String>>(executor,
          fetchExecutionInfo));
      }

      boolean wasSuccess = true;
      for (Pair<Executor, Future<String>> refreshPair : futures) {
        Executor executor = refreshPair.getFirst();
        executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
        try {
          // max 5 secs
          String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
          executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
          logger.info(String.format(
            "Successfully refreshed executor: %s with executor info : %s",
            executor, jsonString));
        } catch (TimeoutException e) {
          wasSuccess = false;
          logger.error("Timed out while waiting for ExecutorInfo refresh"
            + executor, e);
        } catch (Exception e) {
          wasSuccess = false;
          logger.error("Failed to update ExecutorInfo for executor : "
            + executor, e);
        }
      }

      // update is successful for all executors
      if (wasSuccess) {
        lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
      }
    }
  }

  /**
   * Throws exception if running in local mode
   * {@inheritDoc}
   * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
   */
  @Override
  public void disableQueueProcessorThread() throws ExecutorManagerException {
    if (isMultiExecutorMode()) {
      queueProcessor.setActive(false);
    } else {
      throw new ExecutorManagerException(
        "Cannot disable QueueProcessor in local mode");
    }
  }

  /**
   * Throws exception if running in local mode
   * {@inheritDoc}
   * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
   */
  @Override
  public void enableQueueProcessorThread() throws ExecutorManagerException {
    if (isMultiExecutorMode()) {
      queueProcessor.setActive(true);
    } else {
      throw new ExecutorManagerException(
        "Cannot enable QueueProcessor in local mode");
    }
  }

  public State getQueueProcessorThreadState() {
    if (isMultiExecutorMode())
      return queueProcessor.getState();
    else
      return State.NEW; // not started in local mode
  }

  /**
   * Returns state of QueueProcessor False, no flow is being dispatched True ,
   * flows are being dispatched as expected
   *
   * @return
   */
  public boolean isQueueProcessorThreadActive() {
    if (isMultiExecutorMode())
      return queueProcessor.isActive();
    else
      return false;
  }

  /**
   * Return last Successful ExecutorInfo Refresh for all active executors
   *
   * @return
   */
  public long getLastSuccessfulExecutorInfoRefresh() {
    return this.lastSuccessfulExecutorInfoRefresh;
  }

  /**
   * Get currently supported Comparators available to use via azkaban.properties
   *
   * @return
   */
  public Set<String> getAvailableExecutorComparatorNames() {
    return ExecutorComparator.getAvailableComparatorNames();

  }

  /**
   * Get currently supported filters available to use via azkaban.properties
   *
   * @return
   */
  public Set<String> getAvailableExecutorFilterNames() {
    return ExecutorFilter.getAvailableFilterNames();
  }

  @Override
  public State getExecutorManagerThreadState() {
    return executingManager.getState();
  }

  public String getExecutorThreadStage() {
    return updaterStage;
  }

  @Override
  public boolean isExecutorManagerThreadActive() {
    return executingManager.isAlive();
  }

  @Override
  public long getLastExecutorManagerThreadCheckTime() {
    return lastThreadCheckTime;
  }

  public long getLastCleanerThreadCheckTime() {
    return this.lastCleanerThreadCheckTime;
  }

  @Override
  public Collection<Executor> getAllActiveExecutors() {
    return Collections.unmodifiableCollection(activeExecutors);
  }

  /**
   *
   * {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
   */
  @Override
  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
    for (Executor executor : activeExecutors) {
      if (executor.getId() == executorId) {
        return executor;
      }
    }
    return executorLoader.fetchExecutor(executorId);
  }

  @Override
  public Set<String> getPrimaryServerHosts() {
    // Only one for now. More probably later.
    HashSet<String> ports = new HashSet<String>();
    for (Executor executor : activeExecutors) {
      ports.add(executor.getHost() + ":" + executor.getPort());
    }
    return ports;
  }

  @Override
  public Set<String> getAllActiveExecutorServerHosts() {
    // Includes non primary server/hosts
    HashSet<String> ports = new HashSet<String>();
    for (Executor executor : activeExecutors) {
      ports.add(executor.getHost() + ":" + executor.getPort());
    }
    // include executor which were initially active and still has flows running
    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
      .values()) {
      ExecutionReference ref = running.getFirst();
      ports.add(ref.getHost() + ":" + ref.getPort());
    }
    return ports;
  }

  private void loadRunningFlows() throws ExecutorManagerException {
    runningFlows.putAll(executorLoader.fetchActiveFlows());
  }

  /*
   * load queued flows i.e with active_execution_reference and not assigned to
   * any executor
   */
  private void loadQueuedFlows() throws ExecutorManagerException {
    List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
      executorLoader.fetchQueuedFlows();
    if (retrievedExecutions != null) {
      for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
        queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
      }
    }
  }

  /**
   * Gets a list of all the active (running flows and non-dispatched flows)
   * executions for a given project and flow {@inheritDoc}. Results should
   * be sorted as we assume this while setting up pipelined execution Id.
   *
   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
   *      java.lang.String)
   */
  @Override
  public List<Integer> getRunningFlows(int projectId, String flowId) {
    List<Integer> executionIds = new ArrayList<Integer>();
    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
      queuedFlows.getAllEntries()));
    // it's possible an execution is runningCandidate, meaning it's in dispatching state neither in queuedFlows nor runningFlows,
    // so checks the runningCandidate as well.
    if (runningCandidate != null) {
      executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
    }
    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
      runningFlows.values()));
    Collections.sort(executionIds);
    return executionIds;
  }

  /* Helper method for getRunningFlows */
  private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
    List<Integer> executionIds = new ArrayList<Integer>();
    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
      if (ref.getSecond().getFlowId().equals(flowId)
        && ref.getSecond().getProjectId() == projectId) {
        executionIds.add(ref.getFirst().getExecId());
      }
    }
    return executionIds;
  }

  /**
   *
   * {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
   */
  @Override
  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
    throws IOException {
    List<Pair<ExecutableFlow, Executor>> flows =
      new ArrayList<Pair<ExecutableFlow, Executor>>();
    getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
    return flows;
  }

  /* Helper method for getActiveFlowsWithExecutor */
  private void getActiveFlowsWithExecutorHelper(
    List<Pair<ExecutableFlow, Executor>> flows,
    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
      flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
        .getFirst().getExecutor()));
    }
  }

  /**
   * Checks whether the given flow has an active (running, non-dispatched)
   * executions {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
   *      java.lang.String)
   */
  @Override
  public boolean isFlowRunning(int projectId, String flowId) {
    boolean isRunning = false;
    isRunning =
      isRunning
        || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
    isRunning =
      isRunning
        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
    return isRunning;
  }

  /* Search a running flow in a collection */
  private boolean isFlowRunningHelper(int projectId, String flowId,
    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
      if (ref.getSecond().getProjectId() == projectId
        && ref.getSecond().getFlowId().equals(flowId)) {
        return true;
      }
    }
    return false;
  }

  /**
   * Fetch ExecutableFlow from database {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
   */
  @Override
  public ExecutableFlow getExecutableFlow(int execId)
    throws ExecutorManagerException {
      return executorLoader.fetchExecutableFlow(execId);
  }

  /**
   * Get all active (running, non-dispatched) flows
   *
   * {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
   */
  @Override
  public List<ExecutableFlow> getRunningFlows() {
    ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
    getActiveFlowHelper(flows, queuedFlows.getAllEntries());
    getActiveFlowHelper(flows, runningFlows.values());
    return flows;
  }

  /*
   * Helper method to get all running flows from a Pair<ExecutionReference,
   * ExecutableFlow collection
   */
  private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
      flows.add(ref.getSecond());
    }
  }

  /**
   * Get execution Ids of all active (running, non-dispatched) flows
   *
   * {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
   */
  public String getRunningFlowIds() {
    List<Integer> allIds = new ArrayList<Integer>();
    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
    getRunningFlowsIdsHelper(allIds, runningFlows.values());
    Collections.sort(allIds);
    return allIds.toString();
  }

  /**
   * Get execution Ids of all non-dispatched flows
   *
   * {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
   */
  public String getQueuedFlowIds() {
    List<Integer> allIds = new ArrayList<Integer>();
    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
    Collections.sort(allIds);
    return allIds.toString();
  }


  public long getQueuedFlowSize() {
    return queuedFlows.size();
  }

  /* Helper method to flow ids of all running flows */
  private void getRunningFlowsIdsHelper(List<Integer> allIds,
    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
      allIds.add(ref.getSecond().getExecutionId());
    }
  }

  public List<ExecutableFlow> getRecentlyFinishedFlows() {
    return new ArrayList<ExecutableFlow>(recentlyFinished.values());
  }

  @Override
  public List<ExecutableFlow> getExecutableFlows(Project project,
      String flowId, int skip, int size) throws ExecutorManagerException {
    List<ExecutableFlow> flows =
        executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
    return flows;
  }

  @Override
  public List<ExecutableFlow> getExecutableFlows(int skip, int size)
      throws ExecutorManagerException {
    List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
    return flows;
  }

  @Override
  public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
      int skip, int size) throws ExecutorManagerException {
    List<ExecutableFlow> flows =
        executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
            0, -1, -1, skip, size);
    return flows;
  }

  @Override
  public List<ExecutableFlow> getExecutableFlows(String projContain,
      String flowContain, String userContain, int status, long begin, long end,
      int skip, int size) throws ExecutorManagerException {
    List<ExecutableFlow> flows =
        executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
            status, begin, end, skip, size);
    return flows;
  }

  @Override
  public List<ExecutableJobInfo> getExecutableJobs(Project project,
      String jobId, int skip, int size) throws ExecutorManagerException {
    List<ExecutableJobInfo> nodes =
        executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
    return nodes;
  }

  @Override
  public int getNumberOfJobExecutions(Project project, String jobId)
      throws ExecutorManagerException {
    return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
  }

  @Override
  public int getNumberOfExecutions(Project project, String flowId)
      throws ExecutorManagerException {
    return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
  }

  @Override
  public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
      int length) throws ExecutorManagerException {
    Pair<ExecutionReference, ExecutableFlow> pair =
        runningFlows.get(exFlow.getExecutionId());
    if (pair != null) {
      Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
      Pair<String, String> offsetParam =
          new Pair<String, String>("offset", String.valueOf(offset));
      Pair<String, String> lengthParam =
          new Pair<String, String>("length", String.valueOf(length));

      @SuppressWarnings("unchecked")
      Map<String, Object> result =
          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
              typeParam, offsetParam, lengthParam);
      return LogData.createLogDataFromObject(result);
    } else {
      LogData value =
          executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
              length);
      return value;
    }
  }

  @Override
  public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
      int offset, int length, int attempt) throws ExecutorManagerException {
    Pair<ExecutionReference, ExecutableFlow> pair =
        runningFlows.get(exFlow.getExecutionId());
    if (pair != null) {
      Pair<String, String> typeParam = new Pair<String, String>("type", "job");
      Pair<String, String> jobIdParam =
          new Pair<String, String>("jobId", jobId);
      Pair<String, String> offsetParam =
          new Pair<String, String>("offset", String.valueOf(offset));
      Pair<String, String> lengthParam =
          new Pair<String, String>("length", String.valueOf(length));
      Pair<String, String> attemptParam =
          new Pair<String, String>("attempt", String.valueOf(attempt));

      @SuppressWarnings("unchecked")
      Map<String, Object> result =
          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
      return LogData.createLogDataFromObject(result);
    } else {
      LogData value =
          executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
              offset, length);
      return value;
    }
  }

  @Override
  public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
      int attempt) throws ExecutorManagerException {
    Pair<ExecutionReference, ExecutableFlow> pair =
        runningFlows.get(exFlow.getExecutionId());
    if (pair == null) {
      return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
          attempt);
    }

    Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
    Pair<String, String> attemptParam =
        new Pair<String, String>("attempt", String.valueOf(attempt));

    @SuppressWarnings("unchecked")
    Map<String, Object> result =
        callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
            jobIdParam, attemptParam);

    @SuppressWarnings("unchecked")
    List<Object> jobStats = (List<Object>) result.get("attachments");

    return jobStats;
  }

  @Override
  public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
      String jobId, int offset, int length, int attempt)
      throws ExecutorManagerException {
    Pair<ExecutionReference, ExecutableFlow> pair =
        runningFlows.get(exFlow.getExecutionId());
    if (pair != null) {

      Pair<String, String> typeParam = new Pair<String, String>("type", "job");
      Pair<String, String> jobIdParam =
          new Pair<String, String>("jobId", jobId);
      Pair<String, String> offsetParam =
          new Pair<String, String>("offset", String.valueOf(offset));
      Pair<String, String> lengthParam =
          new Pair<String, String>("length", String.valueOf(length));
      Pair<String, String> attemptParam =
          new Pair<String, String>("attempt", String.valueOf(attempt));

      @SuppressWarnings("unchecked")
      Map<String, Object> result =
          callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
      return JobMetaData.createJobMetaDataFromObject(result);
    } else {
      return null;
    }
  }

  /**
   * if flows was dispatched to an executor, cancel by calling Executor else if
   * flow is still in queue, remove from queue and finalize {@inheritDoc}
   *
   * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
   *      java.lang.String)
   */
  @Override
  public void cancelFlow(ExecutableFlow exFlow, String userId)
    throws ExecutorManagerException {
    synchronized (exFlow) {
      if (runningFlows.containsKey(exFlow.getExecutionId())) {
        Pair<ExecutionReference, ExecutableFlow> pair =
          runningFlows.get(exFlow.getExecutionId());
        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
          userId);
      } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
        queuedFlows.dequeue(exFlow.getExecutionId());
        finalizeFlows(exFlow);
      } else {
        throw new ExecutorManagerException("Execution "
          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
          + " isn't running.");
      }
    }
  }

  @Override
  public void resumeFlow(ExecutableFlow exFlow, String userId)
      throws ExecutorManagerException {
    synchronized (exFlow) {
      Pair<ExecutionReference, ExecutableFlow> pair =
          runningFlows.get(exFlow.getExecutionId());
      if (pair == null) {
        throw new ExecutorManagerException("Execution "
            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
            + " isn't running.");
      }
      callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
    }
  }

  @Override
  public void pauseFlow(ExecutableFlow exFlow, String userId)
      throws ExecutorManagerException {
    synchronized (exFlow) {
      Pair<ExecutionReference, ExecutableFlow> pair =
          runningFlows.get(exFlow.getExecutionId());
      if (pair == null) {
        throw new ExecutorManagerException("Execution "
            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
            + " isn't running.");
      }
      callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
    }
  }

  @Override
  public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
      String... jobIds) throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
        jobIds);
  }

  @Override
  public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
      String... jobIds) throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
        jobIds);
  }

  @Override
  public void retryFailures(ExecutableFlow exFlow, String userId)
      throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
  }

  @Override
  public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
      String... jobIds) throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
        jobIds);
  }

  @Override
  public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
      String... jobIds) throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
        jobIds);
  }

  @Override
  public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
      String... jobIds) throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
        jobIds);
  }

  @Override
  public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
      String... jobIds) throws ExecutorManagerException {
    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
        jobIds);
  }

  @SuppressWarnings("unchecked")
  private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
      String command, String userId, String... jobIds)
      throws ExecutorManagerException {
    synchronized (exFlow) {
      Pair<ExecutionReference, ExecutableFlow> pair =
          runningFlows.get(exFlow.getExecutionId());
      if (pair == null) {
        throw new ExecutorManagerException("Execution "
            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
            + " isn't running.");
      }

      Map<String, Object> response = null;
      if (jobIds != null && jobIds.length > 0) {
        for (String jobId : jobIds) {
          if (!jobId.isEmpty()) {
            ExecutableNode node = exFlow.getExecutableNode(jobId);
            if (node == null) {
              throw new ExecutorManagerException("Job " + jobId
                  + " doesn't exist in execution " + exFlow.getExecutionId()
                  + ".");
            }
          }
        }
        String ids = StringUtils.join(jobIds, ',');
        response =
            callExecutorServer(pair.getFirst(),
                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
                new Pair<String, String>(
                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
                new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
      } else {
        response =
            callExecutorServer(pair.getFirst(),
                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
                new Pair<String, String>(
                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
      }

      return response;
    }
  }

  @Override
  public String submitExecutableFlow(ExecutableFlow exflow, String userId)
    throws ExecutorManagerException {

    String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
    // using project and flow name to prevent race condition when same flow is submitted by API and schedule at the same time
    // causing two same flow submission entering this piece.
    synchronized (exFlowKey.intern()) {
      String flowId = exflow.getFlowId();

      logger.info("Submitting execution flow " + flowId + " by " + userId);

      String message = "";
      if (queuedFlows.isFull()) {
        message =
          String
            .format(
              "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
              flowId, exflow.getProjectName());
        logger.error(message);
      } else {
        int projectId = exflow.getProjectId();
        exflow.setSubmitUser(userId);
        exflow.setSubmitTime(System.currentTimeMillis());

        List<Integer> running = getRunningFlows(projectId, flowId);

        ExecutionOptions options = exflow.getExecutionOptions();
        if (options == null) {
          options = new ExecutionOptions();
        }

        if (options.getDisabledJobs() != null) {
          FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
        }

        if (!running.isEmpty()) {
          if (options.getConcurrentOption().equals(
            ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
            Collections.sort(running);
            Integer runningExecId = running.get(running.size() - 1);

            options.setPipelineExecutionId(runningExecId);
            message =
              "Flow " + flowId + " is already running with exec id "
                + runningExecId + ". Pipelining level "
                + options.getPipelineLevel() + ". \n";
          } else if (options.getConcurrentOption().equals(
            ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
            throw new ExecutorManagerException("Flow " + flowId
              + " is already running. Skipping execution.",
              ExecutorManagerException.Reason.SkippedExecution);
          } else {
            // The settings is to run anyways.
            message =
              "Flow " + flowId + " is already running with exec id "
                + StringUtils.join(running, ",")
                + ". Will execute concurrently. \n";
          }
        }

        boolean memoryCheck =
          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
            ProjectWhitelist.WhitelistType.MemoryCheck);
        options.setMemoryCheck(memoryCheck);

        // The exflow id is set by the loader. So it's unavailable until after
        // this call.
        executorLoader.uploadExecutableFlow(exflow);

        // We create an active flow reference in the datastore. If the upload
        // fails, we remove the reference.
        ExecutionReference reference =
          new ExecutionReference(exflow.getExecutionId());

        if (isMultiExecutorMode()) {
          //Take MultiExecutor route
          executorLoader.addActiveExecutableReference(reference);
          queuedFlows.enqueue(exflow, reference);
        } else {
          // assign only local executor we have
          Executor choosenExecutor = activeExecutors.iterator().next();
          executorLoader.addActiveExecutableReference(reference);
          try {
            dispatch(reference, exflow, choosenExecutor);
          } catch (ExecutorManagerException e) {
            executorLoader.removeActiveExecutableReference(reference
              .getExecId());
            throw e;
          }
        }
        message +=
          "Execution submitted successfully with exec id "
            + exflow.getExecutionId();
      }
      return message;
    }
  }

  private void cleanOldExecutionLogs(long millis) {
    long beforeDeleteLogsTimestamp = System.currentTimeMillis();
    try {
      int count = executorLoader.removeExecutionLogsByTime(millis);
      logger.info("Cleaned up " + count + " log entries.");
    } catch (ExecutorManagerException e) {
      logger.error("log clean up failed. ", e);
    }
    logger.info("log clean up time: "  + (System.currentTimeMillis() - beforeDeleteLogsTimestamp)/1000 + " seconds.");
  }

  private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
    Executor executor, String action) throws ExecutorManagerException {
    try {
      return callExecutorServer(executor.getHost(), executor.getPort(), action,
        exflow.getExecutionId(), null, (Pair<String, String>[]) null);
    } catch (IOException e) {
      throw new ExecutorManagerException(e);
    }
  }

  private Map<String, Object> callExecutorServer(ExecutionReference ref,
      String action, String user) throws ExecutorManagerException {
    try {
      return callExecutorServer(ref.getHost(), ref.getPort(), action,
          ref.getExecId(), user, (Pair<String, String>[]) null);
    } catch (IOException e) {
      throw new ExecutorManagerException(e);
    }
  }

  private Map<String, Object> callExecutorServer(ExecutionReference ref,
      String action, Pair<String, String>... params)
      throws ExecutorManagerException {
    try {
      return callExecutorServer(ref.getHost(), ref.getPort(), action,
          ref.getExecId(), null, params);
    } catch (IOException e) {
      throw new ExecutorManagerException(e);
    }
  }

  private Map<String, Object> callExecutorServer(ExecutionReference ref,
      String action, String user, Pair<String, String>... params)
      throws ExecutorManagerException {
    try {
      return callExecutorServer(ref.getHost(), ref.getPort(), action,
          ref.getExecId(), user, params);
    } catch (IOException e) {
      throw new ExecutorManagerException(e);
    }
  }

  private Map<String, Object> callExecutorServer(String host, int port,
      String action, Integer executionId, String user,
      Pair<String, String>... params) throws IOException {
    List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();

    // if params = null
    if(params != null) {
      paramList.addAll(Arrays.asList(params));
    }

    paramList
      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
    paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
      .valueOf(executionId)));
    paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));

    Map<String, Object> jsonResponse =
      callExecutorForJsonObject(host, port, "/executor", paramList);

    return jsonResponse;
  }

  /*
   * Helper method used by ExecutorManager to call executor and return json
   * object map
   */
  private Map<String, Object> callExecutorForJsonObject(String host, int port,
    String path, List<Pair<String, String>> paramList) throws IOException {
    String responseString =
      callExecutorForJsonString(host, port, path, paramList);

    @SuppressWarnings("unchecked")
    Map<String, Object> jsonResponse =
      (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
    String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
    if (error != null) {
      throw new IOException(error);
    }
    return jsonResponse;
  }

  /*
   * Helper method used by ExecutorManager to call executor and return raw json
   * string
   */
  private String callExecutorForJsonString(String host, int port, String path,
    List<Pair<String, String>> paramList) throws IOException {
    if (paramList == null) {
      paramList = new ArrayList<Pair<String, String>>();
    }

    ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
    @SuppressWarnings("unchecked")
    URI uri =
      ExecutorApiClient.buildUri(host, port, path, true,
        paramList.toArray(new Pair[0]));

    return apiclient.httpGet(uri, null);
  }

  /**
   * Manage servlet call for stats servlet in Azkaban execution server
   * {@inheritDoc}
   *
   * @throws ExecutorManagerException
   *
   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
   *      azkaban.utils.Pair[])
   */
  @Override
  public Map<String, Object> callExecutorStats(int executorId, String action,
    Pair<String, String>... params) throws IOException, ExecutorManagerException {
    Executor executor = fetchExecutor(executorId);

    List<Pair<String, String>> paramList =
      new ArrayList<Pair<String, String>>();

    // if params = null
    if (params != null) {
      paramList.addAll(Arrays.asList(params));
    }

    paramList
      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));

    return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
      "/stats", paramList);
  }


  @Override
  public Map<String, Object> callExecutorJMX(String hostPort, String action,
      String mBean) throws IOException {
    List<Pair<String, String>> paramList =
      new ArrayList<Pair<String, String>>();

    paramList.add(new Pair<String, String>(action, ""));
    if(mBean != null) {
      paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
    }

    String[] hostPortSplit = hostPort.split(":");
    return callExecutorForJsonObject(hostPortSplit[0],
      Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
  }

  @Override
  public void shutdown() {
    if (isMultiExecutorMode()) {
      queueProcessor.shutdown();
    }
    executingManager.shutdown();
  }

  private class ExecutingManagerUpdaterThread extends Thread {
    private boolean shutdown = false;

    public ExecutingManagerUpdaterThread() {
      this.setName("ExecutorManagerUpdaterThread");
    }

    // 10 mins recently finished threshold.
    private long recentlyFinishedLifetimeMs = 600000;
    private int waitTimeIdleMs = 2000;
    private int waitTimeMs = 500;

    // When we have an http error, for that flow, we'll check every 10 secs, 6
    // times (1 mins) before we evict.
    private int numErrors = 6;
    private long errorThreshold = 10000;

    private void shutdown() {
      shutdown = true;
    }

    @SuppressWarnings("unchecked")
    public void run() {
      while (!shutdown) {
        try {
          lastThreadCheckTime = System.currentTimeMillis();
          updaterStage = "Starting update all flows.";

          Map<Executor, List<ExecutableFlow>> exFlowMap =
              getFlowToExecutorMap();
          ArrayList<ExecutableFlow> finishedFlows =
              new ArrayList<ExecutableFlow>();
          ArrayList<ExecutableFlow> finalizeFlows =
              new ArrayList<ExecutableFlow>();

          if (exFlowMap.size() > 0) {
            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
                .entrySet()) {
              List<Long> updateTimesList = new ArrayList<Long>();
              List<Integer> executionIdsList = new ArrayList<Integer>();

              Executor executor = entry.getKey();

              updaterStage =
                  "Starting update flows on " + executor.getHost() + ":"
                      + executor.getPort();

              // We pack the parameters of the same host together before we
              // query.
              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
                  updateTimesList);

              Pair<String, String> updateTimes =
                  new Pair<String, String>(
                      ConnectorParams.UPDATE_TIME_LIST_PARAM,
                      JSONUtils.toJSON(updateTimesList));
              Pair<String, String> executionIds =
                  new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
                      JSONUtils.toJSON(executionIdsList));

              Map<String, Object> results = null;
              try {
                results =
                    callExecutorServer(executor.getHost(),
                      executor.getPort(), ConnectorParams.UPDATE_ACTION,
                        null, null, executionIds, updateTimes);
              } catch (IOException e) {
                logger.error(e);
                for (ExecutableFlow flow : entry.getValue()) {
                  Pair<ExecutionReference, ExecutableFlow> pair =
                      runningFlows.get(flow.getExecutionId());

                  updaterStage =
                      "Failed to get update. Doing some clean up for flow "
                          + pair.getSecond().getExecutionId();

                  if (pair != null) {
                    ExecutionReference ref = pair.getFirst();
                    int numErrors = ref.getNumErrors();
                    if (ref.getNumErrors() < this.numErrors) {
                      ref.setNextCheckTime(System.currentTimeMillis()
                          + errorThreshold);
                      ref.setNumErrors(++numErrors);
                    } else {
                      logger.error("Evicting flow " + flow.getExecutionId()
                          + ". The executor is unresponsive.");
                      // TODO should send out an unresponsive email here.
                      finalizeFlows.add(pair.getSecond());
                    }
                  }
                }
              }

              // We gets results
              if (results != null) {
                List<Map<String, Object>> executionUpdates =
                    (List<Map<String, Object>>) results
                        .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
                for (Map<String, Object> updateMap : executionUpdates) {
                  try {
                    ExecutableFlow flow = updateExecution(updateMap);

                    updaterStage = "Updated flow " + flow.getExecutionId();

                    if (isFinished(flow)) {
                      finishedFlows.add(flow);
                      finalizeFlows.add(flow);
                    }
                  } catch (ExecutorManagerException e) {
                    ExecutableFlow flow = e.getExecutableFlow();
                    logger.error(e);

                    if (flow != null) {
                      logger.error("Finalizing flow " + flow.getExecutionId());
                      finalizeFlows.add(flow);
                    }
                  }
                }
              }
            }

            updaterStage = "Evicting old recently finished flows.";

            evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
            // Add new finished
            for (ExecutableFlow flow : finishedFlows) {
              if (flow.getScheduleId() >= 0
                  && flow.getStatus() == Status.SUCCEEDED) {
                ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
                    cacheDir);
              }
              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
              recentlyFinished.put(flow.getExecutionId(), flow);
            }

            updaterStage =
                "Finalizing " + finalizeFlows.size() + " error flows.";

            // Kill error flows
            for (ExecutableFlow flow : finalizeFlows) {
              finalizeFlows(flow);
            }
          }

          updaterStage = "Updated all active flows. Waiting for next round.";

          synchronized (this) {
            try {
              if (runningFlows.size() > 0) {
                this.wait(waitTimeMs);
              } else {
                this.wait(waitTimeIdleMs);
              }
            } catch (InterruptedException e) {
            }
          }
        } catch (Exception e) {
          logger.error(e);
        }
      }
    }
  }

  private void finalizeFlows(ExecutableFlow flow) {

    int execId = flow.getExecutionId();
    boolean alertUser = true;
    updaterStage = "finalizing flow " + execId;
    // First we check if the execution in the datastore is complete
    try {
      ExecutableFlow dsFlow;
      if (isFinished(flow)) {
        dsFlow = flow;
      } else {
        updaterStage = "finalizing flow " + execId + " loading from db";
        dsFlow = executorLoader.fetchExecutableFlow(execId);

        // If it's marked finished, we're good. If not, we fail everything and
        // then mark it finished.
        if (!isFinished(dsFlow)) {
          updaterStage = "finalizing flow " + execId + " failing the flow";
          failEverything(dsFlow);
          executorLoader.updateExecutableFlow(dsFlow);
        }
      }

      updaterStage = "finalizing flow " + execId + " deleting active reference";

      // Delete the executing reference.
      if (flow.getEndTime() == -1) {
        flow.setEndTime(System.currentTimeMillis());
        executorLoader.updateExecutableFlow(dsFlow);
      }
      executorLoader.removeActiveExecutableReference(execId);

      updaterStage = "finalizing flow " + execId + " cleaning from memory";
      runningFlows.remove(execId);
      fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
      recentlyFinished.put(execId, dsFlow);

    } catch (ExecutorManagerException e) {
      alertUser = false; // failed due to azkaban internal error, not to alert user
      logger.error(e);
    }

    // TODO append to the flow log that we forced killed this flow because the
    // target no longer had
    // the reference.

    updaterStage = "finalizing flow " + execId + " alerting and emailing";
    if(alertUser) {
      ExecutionOptions options = flow.getExecutionOptions();
      // But we can definitely email them.
      Alerter mailAlerter = alerters.get("email");
      if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
        if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
          try {
            mailAlerter.alertOnError(flow);
          } catch (Exception e) {
            logger.error(e);
          }
        }
        if (options.getFlowParameters().containsKey("alert.type")) {
          String alertType = options.getFlowParameters().get("alert.type");
          Alerter alerter = alerters.get(alertType);
          if (alerter != null) {
            try {
              alerter.alertOnError(flow);
            } catch (Exception e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
              logger.error("Failed to alert by " + alertType);
            }
          } else {
            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
          }
        }
      } else {
        if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
          try {

            mailAlerter.alertOnSuccess(flow);
          } catch (Exception e) {
            logger.error(e);
          }
        }
        if (options.getFlowParameters().containsKey("alert.type")) {
          String alertType = options.getFlowParameters().get("alert.type");
          Alerter alerter = alerters.get(alertType);
          if (alerter != null) {
            try {
              alerter.alertOnSuccess(flow);
            } catch (Exception e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
              logger.error("Failed to alert by " + alertType);
            }
          } else {
            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
          }
        }
      }
    }

  }

  private void failEverything(ExecutableFlow exFlow) {
    long time = System.currentTimeMillis();
    for (ExecutableNode node : exFlow.getExecutableNodes()) {
      switch (node.getStatus()) {
      case SUCCEEDED:
      case FAILED:
      case KILLED:
      case SKIPPED:
      case DISABLED:
        continue;
        // case UNKNOWN:
      case READY:
        node.setStatus(Status.KILLED);
        break;
      default:
        node.setStatus(Status.FAILED);
        break;
      }

      if (node.getStartTime() == -1) {
        node.setStartTime(time);
      }
      if (node.getEndTime() == -1) {
        node.setEndTime(time);
      }
    }

    if (exFlow.getEndTime() == -1) {
      exFlow.setEndTime(time);
    }

    exFlow.setStatus(Status.FAILED);
  }

  private void evictOldRecentlyFinished(long ageMs) {
    ArrayList<Integer> recentlyFinishedKeys =
        new ArrayList<Integer>(recentlyFinished.keySet());
    long oldAgeThreshold = System.currentTimeMillis() - ageMs;
    for (Integer key : recentlyFinishedKeys) {
      ExecutableFlow flow = recentlyFinished.get(key);

      if (flow.getEndTime() < oldAgeThreshold) {
        // Evict
        recentlyFinished.remove(key);
      }
    }
  }

  private ExecutableFlow updateExecution(Map<String, Object> updateData)
      throws ExecutorManagerException {

    Integer execId =
        (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
    if (execId == null) {
      throw new ExecutorManagerException(
          "Response is malformed. Need exec id to update.");
    }

    Pair<ExecutionReference, ExecutableFlow> refPair =
        this.runningFlows.get(execId);
    if (refPair == null) {
      throw new ExecutorManagerException(
          "No running flow found with the execution id. Removing " + execId);
    }

    ExecutionReference ref = refPair.getFirst();
    ExecutableFlow flow = refPair.getSecond();
    if (updateData.containsKey("error")) {
      // The flow should be finished here.
      throw new ExecutorManagerException((String) updateData.get("error"), flow);
    }

    // Reset errors.
    ref.setNextCheckTime(0);
    ref.setNumErrors(0);
    Status oldStatus = flow.getStatus();
    flow.applyUpdateObject(updateData);
    Status newStatus = flow.getStatus();

    if(oldStatus != newStatus && newStatus == Status.FAILED) {
      CommonMetrics.INSTANCE.markFlowFail();
    }

    ExecutionOptions options = flow.getExecutionOptions();
    if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
      // We want to see if we should give an email status on first failure.
      if (options.getNotifyOnFirstFailure()) {
        Alerter mailAlerter = alerters.get("email");
        try {
          mailAlerter.alertOnFirstError(flow);
        } catch (Exception e) {
          e.printStackTrace();
          logger.error("Failed to send first error email." + e.getMessage());
        }
      }
      if (options.getFlowParameters().containsKey("alert.type")) {
        String alertType = options.getFlowParameters().get("alert.type");
        Alerter alerter = alerters.get(alertType);
        if (alerter != null) {
          try {
            alerter.alertOnFirstError(flow);
          } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            logger.error("Failed to alert by " + alertType);
          }
        } else {
          logger.error("Alerter type " + alertType
              + " doesn't exist. Failed to alert.");
        }
      }
    }

    return flow;
  }

  public boolean isFinished(ExecutableFlow flow) {
    switch (flow.getStatus()) {
      case SUCCEEDED:
      case FAILED:
      case KILLED:
        return true;
      default:
        return false;
    }
  }

  private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
      List<Integer> executionIds, List<Long> updateTimes) {
    for (ExecutableFlow flow : flows) {
      executionIds.add(flow.getExecutionId());
      updateTimes.add(flow.getUpdateTime());
    }
  }

  /* Group Executable flow by Executors to reduce number of REST calls */
  private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
    HashMap<Executor, List<ExecutableFlow>> exFlowMap =
      new HashMap<Executor, List<ExecutableFlow>>();

    for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
      .values()) {
      ExecutionReference ref = runningFlow.getFirst();
      ExecutableFlow flow = runningFlow.getSecond();
      Executor executor = ref.getExecutor();

      // We can set the next check time to prevent the checking of certain
      // flows.
      if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
        continue;
      }

      List<ExecutableFlow> flows = exFlowMap.get(executor);
      if (flows == null) {
        flows = new ArrayList<ExecutableFlow>();
        exFlowMap.put(executor, flows);
      }

      flows.add(flow);
    }

    return exFlowMap;
  }

  @Override
  public int getExecutableFlows(int projectId, String flowId, int from,
      int length, List<ExecutableFlow> outputList)
      throws ExecutorManagerException {
    List<ExecutableFlow> flows =
        executorLoader.fetchFlowHistory(projectId, flowId, from, length);
    outputList.addAll(flows);
    return executorLoader.fetchNumExecutableFlows(projectId, flowId);
  }

  @Override
  public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
      int from, int length, Status status) throws ExecutorManagerException {
    return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
        status);
  }

  /*
   * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
   */
  private class CleanerThread extends Thread {
    // log file retention is 1 month.

    // check every day
    private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
        24 * 60 * 60 * 1000;

    private final long executionLogsRetentionMs;

    private boolean shutdown = false;
    private long lastLogCleanTime = -1;

    public CleanerThread(long executionLogsRetentionMs) {
      this.executionLogsRetentionMs = executionLogsRetentionMs;
      this.setName("AzkabanWebServer-Cleaner-Thread");
    }

    @SuppressWarnings("unused")
    public void shutdown() {
      shutdown = true;
      this.interrupt();
    }

    public void run() {
      while (!shutdown) {
        synchronized (this) {
          try {
            lastCleanerThreadCheckTime = System.currentTimeMillis();

            // Cleanup old stuff.
            long currentTime = System.currentTimeMillis();
            if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
              cleanExecutionLogs();
              lastLogCleanTime = currentTime;
            }

            wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
          } catch (InterruptedException e) {
            logger.info("Interrupted. Probably to shut down.");
          }
        }
      }
    }

    private void cleanExecutionLogs() {
      logger.info("Cleaning old logs from execution_logs");
      long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
      logger.info("Cleaning old log files before "
          + new DateTime(cutoff).toString());
      cleanOldExecutionLogs(DateTime.now().getMillis()
          - executionLogsRetentionMs);
    }
  }

  /**
   * Calls executor to dispatch the flow, update db to assign the executor and
   * in-memory state of executableFlow
   */
  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
    Executor choosenExecutor) throws ExecutorManagerException {
    exflow.setUpdateTime(System.currentTimeMillis());

    executorLoader.assignExecutor(choosenExecutor.getId(),
      exflow.getExecutionId());
    try {
      callExecutorServer(exflow, choosenExecutor,
        ConnectorParams.EXECUTE_ACTION);
    } catch (ExecutorManagerException ex) {
      logger.error("Rolling back executor assignment for execution id:"
        + exflow.getExecutionId(), ex);
      executorLoader.unassignExecutor(exflow.getExecutionId());
      throw new ExecutorManagerException(ex);
    }
    reference.setExecutor(choosenExecutor);

    // move from flow to running flows
    runningFlows.put(exflow.getExecutionId(),
      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));

    logger.info(String.format(
      "Successfully dispatched exec %d with error count %d",
      exflow.getExecutionId(), reference.getNumErrors()));
  }

  /*
   * This thread is responsible for processing queued flows using dispatcher and
   * making rest api calls to executor server
   */
  private class QueueProcessorThread extends Thread {
    private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
    private final int maxDispatchingErrors;
    private final long activeExecutorRefreshWindowInMilisec;
    private final int activeExecutorRefreshWindowInFlows;

    private volatile boolean shutdown = false;
    private volatile boolean isActive = true;

    public QueueProcessorThread(boolean isActive,
      long activeExecutorRefreshWindowInTime,
      int activeExecutorRefreshWindowInFlows,
      int maxDispatchingErrors) {
      setActive(isActive);
      this.maxDispatchingErrors = maxDispatchingErrors;
      this.activeExecutorRefreshWindowInFlows =
        activeExecutorRefreshWindowInFlows;
      this.activeExecutorRefreshWindowInMilisec =
        activeExecutorRefreshWindowInTime;
      this.setName("AzkabanWebServer-QueueProcessor-Thread");
    }

    public void setActive(boolean isActive) {
      this.isActive = isActive;
      logger.info("QueueProcessorThread active turned " + this.isActive);
    }

    public boolean isActive() {
      return isActive;
    }

    public void shutdown() {
      shutdown = true;
      this.interrupt();
    }

    public void run() {
      // Loops till QueueProcessorThread is shutdown
      while (!shutdown) {
        synchronized (this) {
          try {
            // start processing queue if active, other wait for sometime
            if (isActive) {
              processQueuedFlows(activeExecutorRefreshWindowInMilisec,
                activeExecutorRefreshWindowInFlows);
            }
            wait(QUEUE_PROCESSOR_WAIT_IN_MS);
          } catch (Exception e) {
            logger.error(
              "QueueProcessorThread Interrupted. Probably to shut down.", e);
          }
        }
      }
    }

    /* Method responsible for processing the non-dispatched flows */
    private void processQueuedFlows(long activeExecutorsRefreshWindow,
      int maxContinuousFlowProcessed) throws InterruptedException,
      ExecutorManagerException {
      long lastExecutorRefreshTime = 0;
      int currentContinuousFlowProcessed = 0;

      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
        ExecutionReference reference = runningCandidate.getFirst();
        ExecutableFlow exflow = runningCandidate.getSecond();
        long currentTime = System.currentTimeMillis();

        // if we have dispatched more than maxContinuousFlowProcessed or
        // It has been more then activeExecutorsRefreshWindow millisec since we
        // refreshed

        if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
          // Refresh executorInfo for all activeExecutors
          refreshExecutors();
          lastExecutorRefreshTime = currentTime;
          currentContinuousFlowProcessed = 0;
        }


        /**
         * <pre>
         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
         *        Currently we try each queued flow once to infer a global busy state
         * Possible improvements:-
         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
         * Assumptions:-
         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
         * </pre>
         */
        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
          // put back in the queue
          queuedFlows.enqueue(exflow, reference);
          runningCandidate = null;
          long sleepInterval =
            activeExecutorsRefreshWindow
              - (currentTime - lastExecutorRefreshTime);
          // wait till next executor refresh
          sleep(sleepInterval);
        } else {
          exflow.setUpdateTime(currentTime);
          // process flow with current snapshot of activeExecutors
          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
          runningCandidate = null;
        }

        // do not count failed flow processsing (flows still in queue)
        if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
          currentContinuousFlowProcessed++;
        }
      }
    }

    /* process flow with a snapshot of available Executors */
    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
      ExecutableFlow exflow, Set<Executor> availableExecutors)
      throws ExecutorManagerException {
      synchronized (exflow) {
        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
        if (selectedExecutor != null) {
          try {
            dispatch(reference, exflow, selectedExecutor);
          } catch (ExecutorManagerException e) {
            logger.warn(String.format(
              "Executor %s responded with exception for exec: %d",
              selectedExecutor, exflow.getExecutionId()), e);
            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
              availableExecutors);
          }
        } else {
          handleNoExecutorSelectedCase(reference, exflow);
        }
      }
    }

    /* Helper method to fetch  overriding Executor, if a valid user has specifed otherwise return null */
    private Executor getUserSpecifiedExecutor(ExecutionOptions options,
      int executionId) {
      Executor executor = null;
      if (options != null
        && options.getFlowParameters() != null
        && options.getFlowParameters().containsKey(
          ExecutionOptions.USE_EXECUTOR)) {
        try {
          int executorId =
            Integer.valueOf(options.getFlowParameters().get(
              ExecutionOptions.USE_EXECUTOR));
          executor = fetchExecutor(executorId);

          if (executor == null) {
            logger
              .warn(String
                .format(
                  "User specified executor id: %d for execution id: %d is not active, Looking up db.",
                  executorId, executionId));
            executor = executorLoader.fetchExecutor(executorId);
            if (executor == null) {
              logger
                .warn(String
                  .format(
                    "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
                    executorId, executionId));
            }
          }
        } catch (ExecutorManagerException ex) {
          logger.error("Failed to fetch user specified executor for exec_id = "
            + executionId, ex);
        }
      }
      return executor;
    }

    /* Choose Executor for exflow among the available executors */
    private Executor selectExecutor(ExecutableFlow exflow,
      Set<Executor> availableExecutors) {
      Executor choosenExecutor =
        getUserSpecifiedExecutor(exflow.getExecutionOptions(),
          exflow.getExecutionId());

      // If no executor was specified by admin
      if (choosenExecutor == null) {
        logger.info("Using dispatcher for execution id :"
          + exflow.getExecutionId());
        ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
        choosenExecutor = selector.getBest(availableExecutors, exflow);
      }
      return choosenExecutor;
    }

    private void handleDispatchExceptionCase(ExecutionReference reference,
      ExecutableFlow exflow, Executor lastSelectedExecutor,
      Set<Executor> remainingExecutors) throws ExecutorManagerException {
      logger
        .info(String
          .format(
            "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
            exflow.getExecutionId(), reference.getNumErrors()));
      reference.setNumErrors(reference.getNumErrors() + 1);
      if (reference.getNumErrors() > this.maxDispatchingErrors
        || remainingExecutors.size() <= 1) {
        logger.error("Failed to process queued flow");
        finalizeFlows(exflow);
      } else {
        remainingExecutors.remove(lastSelectedExecutor);
        // try other executors except chosenExecutor
        selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
      }
    }

    private void handleNoExecutorSelectedCase(ExecutionReference reference,
      ExecutableFlow exflow) throws ExecutorManagerException {
      logger
      .info(String
        .format(
          "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
            exflow.getExecutionId(), reference.getNumErrors()));
      // TODO: handle scenario where a high priority flow failing to get
      // schedule can starve all others
      queuedFlows.enqueue(exflow, reference);
    }
  }
}