azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 08d0787..9c0a032 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1035,11 +1035,13 @@ public class ExecutorManager extends EventHandler implements
           executorLoader.addActiveExecutableReference(reference);
           try {
             dispatch(reference, exflow, choosenExecutor);
+            this.commonMetrics.markDispatchSuccess();
           } catch (ExecutorManagerException e) {
             // When flow dispatch fails, should update the flow status
             // to FAILED in execution_flows DB table as well. Currently
             // this logic is only implemented in multiExecutorMode but
             // missed in single executor case.
+            this.commonMetrics.markDispatchFail();
             finalizeFlows(exflow);
             throw e;
           }
@@ -1852,7 +1854,9 @@ public class ExecutorManager extends EventHandler implements
         if (selectedExecutor != null) {
           try {
             dispatch(reference, exflow, selectedExecutor);
+            commonMetrics.markDispatchSuccess();
           } catch (ExecutorManagerException e) {
+            commonMetrics.markDispatchFail();
             logger.warn(String.format(
               "Executor %s responded with exception for exec: %d",
               selectedExecutor, exflow.getExecutionId()), e);
@@ -1860,6 +1864,7 @@ public class ExecutorManager extends EventHandler implements
               availableExecutors);
           }
         } else {
+          commonMetrics.markDispatchFail();
           handleNoExecutorSelectedCase(reference, exflow);
         }
       }
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 2d4b5c9..8e46206 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -34,6 +34,8 @@ public class CommonMetrics {
   private final MetricsManager metricsManager;
   private Meter dbConnectionMeter;
   private Meter flowFailMeter;
+  private Meter dispatchFailMeter;
+  private Meter dispatchSuccessMeter;
 
   @Inject
   public CommonMetrics(final MetricsManager metricsManager) {
@@ -44,6 +46,8 @@ public class CommonMetrics {
   private void setupAllMetrics() {
     this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
     this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
+    this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
+    this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
     this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
     this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
   }
@@ -70,6 +74,20 @@ public class CommonMetrics {
     this.flowFailMeter.mark();
   }
 
+  /**
+   * Mark dispatchFailMeter when web server fails to dispatch a flow to executor.
+   */
+  public void markDispatchFail() {
+    this.dispatchFailMeter.mark();
+  }
+
+  /**
+   * Mark dispatchSuccessMeter when web server successfully dispatches a flow to executor.
+   */
+  public void markDispatchSuccess() {
+    this.dispatchSuccessMeter.mark();
+  }
+
   public void setDBConnectionTime(final long milliseconds) {
     this.dbConnectionTime.set(milliseconds);
   }