diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 08d0787..9c0a032 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1035,11 +1035,13 @@ public class ExecutorManager extends EventHandler implements
executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
+ this.commonMetrics.markDispatchSuccess();
} catch (ExecutorManagerException e) {
// When flow dispatch fails, should update the flow status
// to FAILED in execution_flows DB table as well. Currently
// this logic is only implemented in multiExecutorMode but
// missed in single executor case.
+ this.commonMetrics.markDispatchFail();
finalizeFlows(exflow);
throw e;
}
@@ -1852,7 +1854,9 @@ public class ExecutorManager extends EventHandler implements
if (selectedExecutor != null) {
try {
dispatch(reference, exflow, selectedExecutor);
+ commonMetrics.markDispatchSuccess();
} catch (ExecutorManagerException e) {
+ commonMetrics.markDispatchFail();
logger.warn(String.format(
"Executor %s responded with exception for exec: %d",
selectedExecutor, exflow.getExecutionId()), e);
@@ -1860,6 +1864,7 @@ public class ExecutorManager extends EventHandler implements
availableExecutors);
}
} else {
+ commonMetrics.markDispatchFail();
handleNoExecutorSelectedCase(reference, exflow);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 2d4b5c9..8e46206 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -34,6 +34,8 @@ public class CommonMetrics {
private final MetricsManager metricsManager;
private Meter dbConnectionMeter;
private Meter flowFailMeter;
+ private Meter dispatchFailMeter;
+ private Meter dispatchSuccessMeter;
@Inject
public CommonMetrics(final MetricsManager metricsManager) {
@@ -44,6 +46,8 @@ public class CommonMetrics {
private void setupAllMetrics() {
this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
+ this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
+ this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
}
@@ -70,6 +74,20 @@ public class CommonMetrics {
this.flowFailMeter.mark();
}
+ /**
+ * Mark dispatchFailMeter when web server fails to dispatch a flow to executor.
+ */
+ public void markDispatchFail() {
+ this.dispatchFailMeter.mark();
+ }
+
+ /**
+ * Mark dispatchSuccessMeter when web server successfully dispatches a flow to executor.
+ */
+ public void markDispatchSuccess() {
+ this.dispatchSuccessMeter.mark();
+ }
+
public void setDBConnectionTime(final long milliseconds) {
this.dbConnectionTime.set(milliseconds);
}