azkaban-aplcache

Add metrics for submission and time in queue (#2128) * Add

2/26/2019 7:49:48 PM

Details

diff --git a/az-core/src/main/java/azkaban/metrics/MetricsManager.java b/az-core/src/main/java/azkaban/metrics/MetricsManager.java
index 8715320..f935eec 100644
--- a/az-core/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/az-core/src/main/java/azkaban/metrics/MetricsManager.java
@@ -21,8 +21,10 @@ import static azkaban.Constants.ConfigurationKeys.METRICS_SERVER_URL;
 
 import azkaban.utils.Props;
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
@@ -79,6 +81,19 @@ public class MetricsManager {
   }
 
   /**
+   * A {@link Histogram} measures the statistical distribution of values in a stream of data. In
+   * addition to minimum, maximum, mean, etc., it also measures median, 75th,
+   * 90th, 95th, 98th, 99th, and 99.9th percentiles.
+   */
+  public Histogram addHistogram(final String name) { return this.registry.histogram(name); }
+
+  /**
+   * A {@link Timer} measures both the rate that a particular piece of code is called and the
+   * distribution of its duration.
+   */
+  public Timer addTimer(final String name) { return this.registry.timer(name); }
+
+  /**
    * reporting metrics to remote metrics collector. Note: this method must be synchronized, since
    * both web server and executor will call it during initialization.
    */
diff --git a/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java b/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 2a7c50e..0c7d59b 100644
--- a/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -16,8 +16,11 @@
 
 package azkaban.metrics;
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
-
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 
 /**
  * This class is designed for a utility class to test drop wizard metrics
@@ -34,4 +37,24 @@ public class MetricsTestUtility {
     // Assume that the gauge value can be converted to type long.
     return (long) this.registry.getGauges().get(name).getValue();
   }
+
+  /** @return the value for the specified {@link Meter} */
+  public long getMeterValue(final String name) {
+    return this.registry.getMeters().get(name).getCount();
+  }
+
+  /** @return the {@link Snapshot} for the specified {@link Histogram}. */
+  public Snapshot getHistogramSnapshot(final String name) {
+    return this.registry.getHistograms().get(name).getSnapshot();
+  }
+
+  /** @return the count for the specified {@link Timer}. */
+  public long getTimerCount(final String name) {
+    return this.registry.getTimers().get(name).getCount();
+  }
+
+  /** @return the {@link Snapshot} for the specified {@link Timer}. */
+  public Snapshot getTimerSnapshot(final String name) {
+    return this.registry.getTimers().get(name).getSnapshot();
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 20e4f15..d6f96d7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -18,6 +18,7 @@ package azkaban.executor;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.EventHandler;
 import azkaban.flow.FlowUtils;
+import azkaban.metrics.CommonMetrics;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
 import azkaban.utils.FileIOUtils.LogData;
@@ -57,12 +58,15 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   private final AlerterHolder alerterHolder;
   private final ExecutorHealthChecker executorHealthChecker;
   private final int maxConcurrentRunsOneFlow;
+  private final CommonMetrics commonMetrics;
 
   @Inject
   ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
+      final CommonMetrics commonMetrics,
       final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder, final
   ExecutorHealthChecker executorHealthChecker) {
     this.executorLoader = executorLoader;
+    this.commonMetrics = commonMetrics;
     this.apiGateway = apiGateway;
     this.alerterHolder = alerterHolder;
     this.executorHealthChecker = executorHealthChecker;
@@ -600,6 +604,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
 
       if (!running.isEmpty()) {
         if (running.size() > this.maxConcurrentRunsOneFlow) {
+          this.commonMetrics.markSubmitFlowSkip();
           throw new ExecutorManagerException("Flow " + flowId
               + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
               ExecutorManagerException.Reason.SkippedExecution);
@@ -615,6 +620,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
                   + options.getPipelineLevel() + ". \n";
         } else if (options.getConcurrentOption().equals(
             ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+          this.commonMetrics.markSubmitFlowSkip();
           throw new ExecutorManagerException("Flow " + flowId
               + " is already running. Skipping execution.",
               ExecutorManagerException.Reason.SkippedExecution);
@@ -635,6 +641,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
       // this call.
       this.executorLoader.uploadExecutableFlow(exflow);
 
+      this.commonMetrics.markSubmitFlowSuccess();
       message += "Execution queued successfully with exec id " + exflow.getExecutionId();
       return message;
     }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index f852732..4f39cd9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -949,6 +949,7 @@ public class ExecutorManager extends EventHandler implements
                     "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
                     flowId, exflow.getProjectName());
         logger.error(message);
+        this.commonMetrics.markSubmitFlowFail();
       } else {
         final int projectId = exflow.getProjectId();
         exflow.setSubmitUser(userId);
@@ -968,6 +969,7 @@ public class ExecutorManager extends EventHandler implements
 
         if (!running.isEmpty()) {
           if (running.size() > this.maxConcurrentRunsOneFlow) {
+            this.commonMetrics.markSubmitFlowSkip();
             throw new ExecutorManagerException("Flow " + flowId
                 + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
                 ExecutorManagerException.Reason.SkippedExecution);
@@ -983,6 +985,7 @@ public class ExecutorManager extends EventHandler implements
                     + options.getPipelineLevel() + ". \n";
           } else if (options.getConcurrentOption().equals(
               ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+            this.commonMetrics.markSubmitFlowSkip();
             throw new ExecutorManagerException("Flow " + flowId
                 + " is already running. Skipping execution.",
                 ExecutorManagerException.Reason.SkippedExecution);
@@ -1012,6 +1015,7 @@ public class ExecutorManager extends EventHandler implements
         this.executorLoader.addActiveExecutableReference(reference);
         this.queuedFlows.enqueue(exflow, reference);
         message += "Execution queued successfully with exec id " + exflow.getExecutionId();
+        this.commonMetrics.markSubmitFlowSuccess();
       }
       return message;
     }
@@ -1097,7 +1101,7 @@ public class ExecutorManager extends EventHandler implements
 
   /**
    * Calls executor to dispatch the flow, update db to assign the executor and in-memory state of
-   * executableFlow
+   * executableFlow.
    */
   private void dispatch(final ExecutionReference reference, final ExecutableFlow exflow,
       final Executor choosenExecutor) throws ExecutorManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 3d009fc..7505be4 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -16,7 +16,9 @@
 
 package azkaban.metrics;
 
+import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -28,6 +30,18 @@ import javax.inject.Singleton;
  */
 @Singleton
 public class CommonMetrics {
+  public static final String FLOW_FAIL_METER_NAME = "flow-fail-meter";
+  public static final String DISPATCH_FAIL_METER_NAME = "dispatch-fail-meter";
+  public static final String DISPATCH_SUCCESS_METER_NAME = "dispatch-success-meter";
+  public static final String SEND_EMAIL_FAIL_METER_NAME = "send-email-fail-meter";
+  public static final String SEND_EMAIL_SUCCESS_METER_NAME = "send-email-success-meter";
+  public static final String SUBMIT_FLOW_SUCCESS_METER_NAME = "submit-flow-success-meter";
+  public static final String SUBMIT_FLOW_FAIL_METER_NAME = "submit-flow-fail-meter";
+  public static final String SUBMIT_FLOW_SKIP_METER_NAME = "submit-flow-skip-meter";
+  public static final String OOM_WAITING_JOB_COUNT_NAME = "OOM-waiting-job-count";
+  public static final String QUEUE_WAIT_HISTOGRAM_NAME = "queue-wait-histogram";
+  public static final String FLOW_SETUP_TIMER_NAME = "flow-setup-timer";
+
 
   private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
   private final MetricsManager metricsManager;
@@ -36,6 +50,11 @@ public class CommonMetrics {
   private Meter dispatchSuccessMeter;
   private Meter sendEmailFailMeter;
   private Meter sendEmailSuccessMeter;
+  private Meter submitFlowSuccessMeter;
+  private Meter submitFlowFailMeter;
+  private Meter submitFlowSkipMeter;
+  private Histogram queueWaitMeter;
+  private Timer flowSetupTimer;
 
   @Inject
   public CommonMetrics(final MetricsManager metricsManager) {
@@ -44,15 +63,19 @@ public class CommonMetrics {
   }
 
   private void setupAllMetrics() {
-    this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
-    this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
-    this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
-    this.sendEmailFailMeter = this.metricsManager.addMeter("send-email-fail-meter");
-    this.sendEmailSuccessMeter = this.metricsManager.addMeter("send-email-success-meter");
-    this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
+    this.flowFailMeter = this.metricsManager.addMeter(FLOW_FAIL_METER_NAME);
+    this.dispatchFailMeter = this.metricsManager.addMeter(DISPATCH_FAIL_METER_NAME);
+    this.dispatchSuccessMeter = this.metricsManager.addMeter(DISPATCH_SUCCESS_METER_NAME);
+    this.sendEmailFailMeter = this.metricsManager.addMeter(SEND_EMAIL_FAIL_METER_NAME);
+    this.sendEmailSuccessMeter = this.metricsManager.addMeter(SEND_EMAIL_SUCCESS_METER_NAME);
+    this.submitFlowSuccessMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SUCCESS_METER_NAME);
+    this.submitFlowFailMeter = this.metricsManager.addMeter(SUBMIT_FLOW_FAIL_METER_NAME);
+    this.submitFlowSkipMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SKIP_METER_NAME);
+    this.metricsManager.addGauge(OOM_WAITING_JOB_COUNT_NAME, this.OOMWaitingJobCount::get);
+    this.queueWaitMeter = this.metricsManager.addHistogram(QUEUE_WAIT_HISTOGRAM_NAME);
+    this.flowSetupTimer = this.metricsManager.addTimer(FLOW_SETUP_TIMER_NAME);
   }
 
-
   /**
    * Mark flowFailMeter when a flow is considered as FAILED. This method could be called by Web
    * Server or Executor, as they both detect flow failure.
@@ -90,6 +113,27 @@ public class CommonMetrics {
   }
 
   /**
+   * Mark submitFlowSuccessMeter when a flow is submitted for execution successfully.
+   */
+  public void markSubmitFlowSuccess() {
+    this.submitFlowSuccessMeter.mark();
+  }
+
+  /**
+   * Mark submitFlowFailMeter when a flow submitted for execution is skipped.
+   */
+  public void markSubmitFlowSkip() {
+    this.submitFlowSkipMeter.mark();
+  }
+
+  /**
+   * Mark submitFlowFailMeter when a flow fails to be submitted for execution.
+   */
+  public void markSubmitFlowFail() {
+    this.submitFlowFailMeter.mark();
+  }
+
+  /**
    * Mark the occurrence of an job waiting event due to OOM
    */
   public void incrementOOMJobWaitCount() {
@@ -103,4 +147,15 @@ public class CommonMetrics {
     this.OOMWaitingJobCount.decrementAndGet();
   }
 
+  /**
+   * Add the queue wait time for a flow to the metrics.
+   *
+   * @param time queue wait time for a flow.
+   */
+  public void addQueueWait(long time) { this.queueWaitMeter.update(time); }
+
+  /**
+   * @return the {@link Timer.Context} for the timer.
+   */
+  public Timer.Context getFlowSetupTimerContext() { return this.flowSetupTimer.time(); }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index 209a75d..b7a1c1d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -22,10 +22,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.Constants;
+import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
 import azkaban.user.User;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
+import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
@@ -52,6 +55,9 @@ public class ExecutionControllerTest {
   private AlerterHolder alertHolder;
   private ExecutorHealthChecker executorHealthChecker;
   private Props props;
+  private final CommonMetrics commonMetrics = new CommonMetrics(
+      new MetricsManager(new MetricRegistry()));
+
   private User user;
   private ExecutableFlow flow1;
   private ExecutableFlow flow2;
@@ -70,8 +76,8 @@ public class ExecutionControllerTest {
     this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
     this.alertHolder = mock(AlerterHolder.class);
     this.executorHealthChecker = mock(ExecutorHealthChecker.class);
-    this.controller = new ExecutionController(this.props, this.loader, this.apiGateway,
-        this.alertHolder, this.executorHealthChecker);
+    this.controller = new ExecutionController(this.props, this.loader, this.commonMetrics,
+        this.apiGateway, this.alertHolder, this.executorHealthChecker);
 
     final Executor executor1 = new Executor(1, "localhost", 12345, true);
     final Executor executor2 = new Executor(2, "localhost", 12346, true);
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 64ac5fd..6099ed5 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -17,8 +17,11 @@
 package azkaban.metrics;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,10 +40,55 @@ public class CommonMetricsTest {
 
   @Test
   public void testOOMWaitingJobMetrics() {
-    final String metricName = "OOM-waiting-job-count";
+    final String metricName = CommonMetrics.OOM_WAITING_JOB_COUNT_NAME;
 
     assertEquals(0, this.testUtil.getGaugeValue(metricName));
     this.metrics.incrementOOMJobWaitCount();
     assertEquals(1, this.testUtil.getGaugeValue(metricName));
   }
+
+  @Test
+  public void testSubmitMetrics() {
+    assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+    this.metrics.markSubmitFlowFail();
+    assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+
+    assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+    this.metrics.markSubmitFlowSkip();
+    assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+
+    assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+    this.metrics.markSubmitFlowSuccess();
+    assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+  }
+
+  @Test
+  public void testQueueWaitMetrics() {
+    final double delta = 0.001;
+
+    this.metrics.addQueueWait(500L);
+    this.metrics.addQueueWait(600L);
+    this.metrics.addQueueWait(1000L);
+    Snapshot snapshot = this.testUtil.getHistogramSnapshot(CommonMetrics.QUEUE_WAIT_HISTOGRAM_NAME);
+    assertEquals(600, snapshot.getMedian(), delta);
+    assertEquals(700, snapshot.getMean(), delta);
+    assertEquals(500, snapshot.getMin(), delta);
+    assertEquals(1000, snapshot.getMax(), delta);
+  }
+
+  @Test
+  public void testFlowSetupMetrics() throws InterruptedException {
+    assertEquals(0, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
+    Timer.Context context = this.metrics.getFlowSetupTimerContext();
+    try {
+      Thread.sleep(100);
+    }
+    finally {
+      context.stop();
+    }
+    assertEquals(1, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
+    Snapshot snapshot = this.testUtil.getTimerSnapshot(CommonMetrics.FLOW_SETUP_TIMER_NAME);
+    double val = snapshot.getMax();
+    assertTrue(snapshot.getMax() > 100);
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 66f6b8d..98a8a73 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -135,7 +135,7 @@ class FlowPreparer {
       // Download project to a temp dir if not exists in local cache.
       final long start = System.currentTimeMillis();
 
-      tempDir = downloadProjectIfNotExists(project);
+      tempDir = downloadProjectIfNotExists(project, flow.getExecutionId());
 
       log.info("Downloading zip file for project {} when preparing execution [execid {}] "
               + "completed in {} second(s)", project, flow.getExecutionId(),
@@ -247,7 +247,7 @@ class FlowPreparer {
    * @throws IOException if downloading or unzipping fails.
    */
   @VisibleForTesting
-  File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj)
+  File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj, int execId)
       throws IOException {
     final String projectDir = generateProjectDirName(proj);
     if (proj.getInstalledDir() == null) {
@@ -256,7 +256,7 @@ class FlowPreparer {
 
     // If directory exists, assume it's prepared and skip.
     if (proj.getInstalledDir().exists()) {
-      log.info("Project {} already cached. Skipping download.", proj);
+	log.info("Project {} already cached. Skipping download. ExecId: {}", proj, execId);
       // Hit the local cache.
       this.cacheMetrics.incrementCacheHit();
       // Update last modified time of the file keeping project dir size when the project is
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f0b2122..f418561 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -36,6 +36,7 @@ import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
 import azkaban.metric.MetricReportManager;
+import azkaban.metrics.CommonMetrics;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectWhitelist;
 import azkaban.project.ProjectWhitelist.WhitelistType;
@@ -52,6 +53,7 @@ import azkaban.utils.Props;
 import azkaban.utils.ThreadPoolExecutingListener;
 import azkaban.utils.TrackingThreadPool;
 import azkaban.utils.UndefinedPropertyException;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
@@ -131,6 +133,7 @@ public class FlowRunnerManager implements EventListener,
   private final File executionDirectory;
   private final File projectDirectory;
   private final Object executionDirDeletionSync = new Object();
+  private final CommonMetrics commonMetrics;
 
   private final int numThreads;
   private final int numJobThreadPerFlow;
@@ -156,6 +159,7 @@ public class FlowRunnerManager implements EventListener,
       final StorageManager storageManager,
       final TriggerManager triggerManager,
       final AlerterHolder alerterHolder,
+      final CommonMetrics commonMetrics,
       @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
     this.azkabanProps = props;
 
@@ -183,6 +187,7 @@ public class FlowRunnerManager implements EventListener,
     this.projectLoader = projectLoader;
     this.triggerManager = triggerManager;
     this.alerterHolder = alerterHolder;
+    this.commonMetrics = commonMetrics;
 
     this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
     this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
@@ -336,8 +341,19 @@ public class FlowRunnerManager implements EventListener,
           + execId);
     }
 
-    // Sets up the project files and execution directory.
-    this.flowPreparer.setup(flow);
+    // Record the time between submission, and when the flow preparation/execution starts.
+    // Note that since submit time is recorded on the web server, while flow preparation is on
+    // the executor, there could be some inaccuracies due to clock skew.
+    commonMetrics.addQueueWait(System.currentTimeMillis() -
+        flow.getExecutableFlow().getSubmitTime());
+
+    final Timer.Context flowPrepTimerContext = commonMetrics.getFlowSetupTimerContext();
+    try {
+      // Sets up the project files and execution directory.
+      this.flowPreparer.setup(flow);
+    } finally {
+      flowPrepTimerContext.stop();
+    }
 
     // Setup flow runner
     FlowWatcher watcher = null;
@@ -973,9 +989,11 @@ public class FlowRunnerManager implements EventListener,
           if (execId != -1) {
             FlowRunnerManager.logger.info("Submitting flow " + execId);
             submitFlow(execId);
+            commonMetrics.markDispatchSuccess();
           }
         } catch (final Exception e) {
           FlowRunnerManager.logger.error("Failed to submit flow ", e);
+          commonMetrics.markDispatchFail();
         }
       }
     }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index dcb4f6c..a57b218 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -97,7 +97,7 @@ public class FlowPreparerTest {
     final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
         new File(this.projectsDir, SAMPLE_FLOW_01));
 
-    final File tmp = this.instance.downloadProjectIfNotExists(proj);
+    final File tmp = this.instance.downloadProjectIfNotExists(proj, 123);
 
     final long actualDirSize = 1048835;
 
@@ -111,7 +111,7 @@ public class FlowPreparerTest {
   public void testDownloadingProjectIfNotExists() throws Exception {
     final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
         new File(this.projectsDir, SAMPLE_FLOW_01));
-    final File tmp = this.instance.downloadProjectIfNotExists(proj);
+    final File tmp = this.instance.downloadProjectIfNotExists(proj, 124);
 
     final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
         FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
@@ -125,11 +125,11 @@ public class FlowPreparerTest {
   public void testNotDownloadingProjectIfExists() throws Exception {
     final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
         new File(this.projectsDir, SAMPLE_FLOW_01));
-    File tmp = this.instance.downloadProjectIfNotExists(proj);
+    File tmp = this.instance.downloadProjectIfNotExists(proj, 125);
     Files.move(tmp.toPath(), proj.getInstalledDir().toPath());
 
     // Try downloading the same project again
-    tmp = this.instance.downloadProjectIfNotExists(proj);
+    tmp = this.instance.downloadProjectIfNotExists(proj, 126);
 
     final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
         FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);