Details
diff --git a/az-core/src/main/java/azkaban/metrics/MetricsManager.java b/az-core/src/main/java/azkaban/metrics/MetricsManager.java
index 8715320..f935eec 100644
--- a/az-core/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/az-core/src/main/java/azkaban/metrics/MetricsManager.java
@@ -21,8 +21,10 @@ import static azkaban.Constants.ConfigurationKeys.METRICS_SERVER_URL;
import azkaban.utils.Props;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
@@ -79,6 +81,19 @@ public class MetricsManager {
}
/**
+ * A {@link Histogram} measures the statistical distribution of values in a stream of data. In
+ * addition to minimum, maximum, mean, etc., it also measures median, 75th,
+ * 90th, 95th, 98th, 99th, and 99.9th percentiles.
+ */
+ public Histogram addHistogram(final String name) { return this.registry.histogram(name); }
+
+ /**
+ * A {@link Timer} measures both the rate that a particular piece of code is called and the
+ * distribution of its duration.
+ */
+ public Timer addTimer(final String name) { return this.registry.timer(name); }
+
+ /**
* reporting metrics to remote metrics collector. Note: this method must be synchronized, since
* both web server and executor will call it during initialization.
*/
diff --git a/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java b/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 2a7c50e..0c7d59b 100644
--- a/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -16,8 +16,11 @@
package azkaban.metrics;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
/**
* This class is designed for a utility class to test drop wizard metrics
@@ -34,4 +37,24 @@ public class MetricsTestUtility {
// Assume that the gauge value can be converted to type long.
return (long) this.registry.getGauges().get(name).getValue();
}
+
+ /** @return the value for the specified {@link Meter} */
+ public long getMeterValue(final String name) {
+ return this.registry.getMeters().get(name).getCount();
+ }
+
+ /** @return the {@link Snapshot} for the specified {@link Histogram}. */
+ public Snapshot getHistogramSnapshot(final String name) {
+ return this.registry.getHistograms().get(name).getSnapshot();
+ }
+
+ /** @return the count for the specified {@link Timer}. */
+ public long getTimerCount(final String name) {
+ return this.registry.getTimers().get(name).getCount();
+ }
+
+ /** @return the {@link Snapshot} for the specified {@link Timer}. */
+ public Snapshot getTimerSnapshot(final String name) {
+ return this.registry.getTimers().get(name).getSnapshot();
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 20e4f15..d6f96d7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -18,6 +18,7 @@ package azkaban.executor;
import azkaban.Constants.ConfigurationKeys;
import azkaban.event.EventHandler;
import azkaban.flow.FlowUtils;
+import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.utils.FileIOUtils.LogData;
@@ -57,12 +58,15 @@ public class ExecutionController extends EventHandler implements ExecutorManager
private final AlerterHolder alerterHolder;
private final ExecutorHealthChecker executorHealthChecker;
private final int maxConcurrentRunsOneFlow;
+ private final CommonMetrics commonMetrics;
@Inject
ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
+ final CommonMetrics commonMetrics,
final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder, final
ExecutorHealthChecker executorHealthChecker) {
this.executorLoader = executorLoader;
+ this.commonMetrics = commonMetrics;
this.apiGateway = apiGateway;
this.alerterHolder = alerterHolder;
this.executorHealthChecker = executorHealthChecker;
@@ -600,6 +604,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
if (!running.isEmpty()) {
if (running.size() > this.maxConcurrentRunsOneFlow) {
+ this.commonMetrics.markSubmitFlowSkip();
throw new ExecutorManagerException("Flow " + flowId
+ " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
ExecutorManagerException.Reason.SkippedExecution);
@@ -615,6 +620,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
+ options.getPipelineLevel() + ". \n";
} else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+ this.commonMetrics.markSubmitFlowSkip();
throw new ExecutorManagerException("Flow " + flowId
+ " is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution);
@@ -635,6 +641,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
// this call.
this.executorLoader.uploadExecutableFlow(exflow);
+ this.commonMetrics.markSubmitFlowSuccess();
message += "Execution queued successfully with exec id " + exflow.getExecutionId();
return message;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index f852732..4f39cd9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -949,6 +949,7 @@ public class ExecutorManager extends EventHandler implements
"Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
flowId, exflow.getProjectName());
logger.error(message);
+ this.commonMetrics.markSubmitFlowFail();
} else {
final int projectId = exflow.getProjectId();
exflow.setSubmitUser(userId);
@@ -968,6 +969,7 @@ public class ExecutorManager extends EventHandler implements
if (!running.isEmpty()) {
if (running.size() > this.maxConcurrentRunsOneFlow) {
+ this.commonMetrics.markSubmitFlowSkip();
throw new ExecutorManagerException("Flow " + flowId
+ " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
ExecutorManagerException.Reason.SkippedExecution);
@@ -983,6 +985,7 @@ public class ExecutorManager extends EventHandler implements
+ options.getPipelineLevel() + ". \n";
} else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+ this.commonMetrics.markSubmitFlowSkip();
throw new ExecutorManagerException("Flow " + flowId
+ " is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution);
@@ -1012,6 +1015,7 @@ public class ExecutorManager extends EventHandler implements
this.executorLoader.addActiveExecutableReference(reference);
this.queuedFlows.enqueue(exflow, reference);
message += "Execution queued successfully with exec id " + exflow.getExecutionId();
+ this.commonMetrics.markSubmitFlowSuccess();
}
return message;
}
@@ -1097,7 +1101,7 @@ public class ExecutorManager extends EventHandler implements
/**
* Calls executor to dispatch the flow, update db to assign the executor and in-memory state of
- * executableFlow
+ * executableFlow.
*/
private void dispatch(final ExecutionReference reference, final ExecutableFlow exflow,
final Executor choosenExecutor) throws ExecutorManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 3d009fc..7505be4 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -16,7 +16,9 @@
package azkaban.metrics;
+import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -28,6 +30,18 @@ import javax.inject.Singleton;
*/
@Singleton
public class CommonMetrics {
+ public static final String FLOW_FAIL_METER_NAME = "flow-fail-meter";
+ public static final String DISPATCH_FAIL_METER_NAME = "dispatch-fail-meter";
+ public static final String DISPATCH_SUCCESS_METER_NAME = "dispatch-success-meter";
+ public static final String SEND_EMAIL_FAIL_METER_NAME = "send-email-fail-meter";
+ public static final String SEND_EMAIL_SUCCESS_METER_NAME = "send-email-success-meter";
+ public static final String SUBMIT_FLOW_SUCCESS_METER_NAME = "submit-flow-success-meter";
+ public static final String SUBMIT_FLOW_FAIL_METER_NAME = "submit-flow-fail-meter";
+ public static final String SUBMIT_FLOW_SKIP_METER_NAME = "submit-flow-skip-meter";
+ public static final String OOM_WAITING_JOB_COUNT_NAME = "OOM-waiting-job-count";
+ public static final String QUEUE_WAIT_HISTOGRAM_NAME = "queue-wait-histogram";
+ public static final String FLOW_SETUP_TIMER_NAME = "flow-setup-timer";
+
private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
private final MetricsManager metricsManager;
@@ -36,6 +50,11 @@ public class CommonMetrics {
private Meter dispatchSuccessMeter;
private Meter sendEmailFailMeter;
private Meter sendEmailSuccessMeter;
+ private Meter submitFlowSuccessMeter;
+ private Meter submitFlowFailMeter;
+ private Meter submitFlowSkipMeter;
+ private Histogram queueWaitMeter;
+ private Timer flowSetupTimer;
@Inject
public CommonMetrics(final MetricsManager metricsManager) {
@@ -44,15 +63,19 @@ public class CommonMetrics {
}
private void setupAllMetrics() {
- this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
- this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
- this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
- this.sendEmailFailMeter = this.metricsManager.addMeter("send-email-fail-meter");
- this.sendEmailSuccessMeter = this.metricsManager.addMeter("send-email-success-meter");
- this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
+ this.flowFailMeter = this.metricsManager.addMeter(FLOW_FAIL_METER_NAME);
+ this.dispatchFailMeter = this.metricsManager.addMeter(DISPATCH_FAIL_METER_NAME);
+ this.dispatchSuccessMeter = this.metricsManager.addMeter(DISPATCH_SUCCESS_METER_NAME);
+ this.sendEmailFailMeter = this.metricsManager.addMeter(SEND_EMAIL_FAIL_METER_NAME);
+ this.sendEmailSuccessMeter = this.metricsManager.addMeter(SEND_EMAIL_SUCCESS_METER_NAME);
+ this.submitFlowSuccessMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SUCCESS_METER_NAME);
+ this.submitFlowFailMeter = this.metricsManager.addMeter(SUBMIT_FLOW_FAIL_METER_NAME);
+ this.submitFlowSkipMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SKIP_METER_NAME);
+ this.metricsManager.addGauge(OOM_WAITING_JOB_COUNT_NAME, this.OOMWaitingJobCount::get);
+ this.queueWaitMeter = this.metricsManager.addHistogram(QUEUE_WAIT_HISTOGRAM_NAME);
+ this.flowSetupTimer = this.metricsManager.addTimer(FLOW_SETUP_TIMER_NAME);
}
-
/**
* Mark flowFailMeter when a flow is considered as FAILED. This method could be called by Web
* Server or Executor, as they both detect flow failure.
@@ -90,6 +113,27 @@ public class CommonMetrics {
}
/**
+ * Mark submitFlowSuccessMeter when a flow is submitted for execution successfully.
+ */
+ public void markSubmitFlowSuccess() {
+ this.submitFlowSuccessMeter.mark();
+ }
+
+ /**
+ * Mark submitFlowFailMeter when a flow submitted for execution is skipped.
+ */
+ public void markSubmitFlowSkip() {
+ this.submitFlowSkipMeter.mark();
+ }
+
+ /**
+ * Mark submitFlowFailMeter when a flow fails to be submitted for execution.
+ */
+ public void markSubmitFlowFail() {
+ this.submitFlowFailMeter.mark();
+ }
+
+ /**
* Mark the occurrence of an job waiting event due to OOM
*/
public void incrementOOMJobWaitCount() {
@@ -103,4 +147,15 @@ public class CommonMetrics {
this.OOMWaitingJobCount.decrementAndGet();
}
+ /**
+ * Add the queue wait time for a flow to the metrics.
+ *
+ * @param time queue wait time for a flow.
+ */
+ public void addQueueWait(long time) { this.queueWaitMeter.update(time); }
+
+ /**
+ * @return the {@link Timer.Context} for the timer.
+ */
+ public Timer.Context getFlowSetupTimerContext() { return this.flowSetupTimer.time(); }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index 209a75d..b7a1c1d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -22,10 +22,13 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.Constants;
+import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
import azkaban.user.User;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
+import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
@@ -52,6 +55,9 @@ public class ExecutionControllerTest {
private AlerterHolder alertHolder;
private ExecutorHealthChecker executorHealthChecker;
private Props props;
+ private final CommonMetrics commonMetrics = new CommonMetrics(
+ new MetricsManager(new MetricRegistry()));
+
private User user;
private ExecutableFlow flow1;
private ExecutableFlow flow2;
@@ -70,8 +76,8 @@ public class ExecutionControllerTest {
this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
this.alertHolder = mock(AlerterHolder.class);
this.executorHealthChecker = mock(ExecutorHealthChecker.class);
- this.controller = new ExecutionController(this.props, this.loader, this.apiGateway,
- this.alertHolder, this.executorHealthChecker);
+ this.controller = new ExecutionController(this.props, this.loader, this.commonMetrics,
+ this.apiGateway, this.alertHolder, this.executorHealthChecker);
final Executor executor1 = new Executor(1, "localhost", 12345, true);
final Executor executor2 = new Executor(2, "localhost", 12346, true);
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 64ac5fd..6099ed5 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -17,8 +17,11 @@
package azkaban.metrics;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import org.junit.Before;
import org.junit.Test;
@@ -37,10 +40,55 @@ public class CommonMetricsTest {
@Test
public void testOOMWaitingJobMetrics() {
- final String metricName = "OOM-waiting-job-count";
+ final String metricName = CommonMetrics.OOM_WAITING_JOB_COUNT_NAME;
assertEquals(0, this.testUtil.getGaugeValue(metricName));
this.metrics.incrementOOMJobWaitCount();
assertEquals(1, this.testUtil.getGaugeValue(metricName));
}
+
+ @Test
+ public void testSubmitMetrics() {
+ assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+ this.metrics.markSubmitFlowFail();
+ assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+
+ assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+ this.metrics.markSubmitFlowSkip();
+ assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+
+ assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+ this.metrics.markSubmitFlowSuccess();
+ assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+ }
+
+ @Test
+ public void testQueueWaitMetrics() {
+ final double delta = 0.001;
+
+ this.metrics.addQueueWait(500L);
+ this.metrics.addQueueWait(600L);
+ this.metrics.addQueueWait(1000L);
+ Snapshot snapshot = this.testUtil.getHistogramSnapshot(CommonMetrics.QUEUE_WAIT_HISTOGRAM_NAME);
+ assertEquals(600, snapshot.getMedian(), delta);
+ assertEquals(700, snapshot.getMean(), delta);
+ assertEquals(500, snapshot.getMin(), delta);
+ assertEquals(1000, snapshot.getMax(), delta);
+ }
+
+ @Test
+ public void testFlowSetupMetrics() throws InterruptedException {
+ assertEquals(0, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
+ Timer.Context context = this.metrics.getFlowSetupTimerContext();
+ try {
+ Thread.sleep(100);
+ }
+ finally {
+ context.stop();
+ }
+ assertEquals(1, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
+ Snapshot snapshot = this.testUtil.getTimerSnapshot(CommonMetrics.FLOW_SETUP_TIMER_NAME);
+ double val = snapshot.getMax();
+ assertTrue(snapshot.getMax() > 100);
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 66f6b8d..98a8a73 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -135,7 +135,7 @@ class FlowPreparer {
// Download project to a temp dir if not exists in local cache.
final long start = System.currentTimeMillis();
- tempDir = downloadProjectIfNotExists(project);
+ tempDir = downloadProjectIfNotExists(project, flow.getExecutionId());
log.info("Downloading zip file for project {} when preparing execution [execid {}] "
+ "completed in {} second(s)", project, flow.getExecutionId(),
@@ -247,7 +247,7 @@ class FlowPreparer {
* @throws IOException if downloading or unzipping fails.
*/
@VisibleForTesting
- File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj)
+ File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj, int execId)
throws IOException {
final String projectDir = generateProjectDirName(proj);
if (proj.getInstalledDir() == null) {
@@ -256,7 +256,7 @@ class FlowPreparer {
// If directory exists, assume it's prepared and skip.
if (proj.getInstalledDir().exists()) {
- log.info("Project {} already cached. Skipping download.", proj);
+ log.info("Project {} already cached. Skipping download. ExecId: {}", proj, execId);
// Hit the local cache.
this.cacheMetrics.incrementCacheHit();
// Update last modified time of the file keeping project dir size when the project is
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f0b2122..f418561 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -36,6 +36,7 @@ import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.metric.MetricReportManager;
+import azkaban.metrics.CommonMetrics;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectWhitelist;
import azkaban.project.ProjectWhitelist.WhitelistType;
@@ -52,6 +53,7 @@ import azkaban.utils.Props;
import azkaban.utils.ThreadPoolExecutingListener;
import azkaban.utils.TrackingThreadPool;
import azkaban.utils.UndefinedPropertyException;
+import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
@@ -131,6 +133,7 @@ public class FlowRunnerManager implements EventListener,
private final File executionDirectory;
private final File projectDirectory;
private final Object executionDirDeletionSync = new Object();
+ private final CommonMetrics commonMetrics;
private final int numThreads;
private final int numJobThreadPerFlow;
@@ -156,6 +159,7 @@ public class FlowRunnerManager implements EventListener,
final StorageManager storageManager,
final TriggerManager triggerManager,
final AlerterHolder alerterHolder,
+ final CommonMetrics commonMetrics,
@Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
this.azkabanProps = props;
@@ -183,6 +187,7 @@ public class FlowRunnerManager implements EventListener,
this.projectLoader = projectLoader;
this.triggerManager = triggerManager;
this.alerterHolder = alerterHolder;
+ this.commonMetrics = commonMetrics;
this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
@@ -336,8 +341,19 @@ public class FlowRunnerManager implements EventListener,
+ execId);
}
- // Sets up the project files and execution directory.
- this.flowPreparer.setup(flow);
+ // Record the time between submission, and when the flow preparation/execution starts.
+ // Note that since submit time is recorded on the web server, while flow preparation is on
+ // the executor, there could be some inaccuracies due to clock skew.
+ commonMetrics.addQueueWait(System.currentTimeMillis() -
+ flow.getExecutableFlow().getSubmitTime());
+
+ final Timer.Context flowPrepTimerContext = commonMetrics.getFlowSetupTimerContext();
+ try {
+ // Sets up the project files and execution directory.
+ this.flowPreparer.setup(flow);
+ } finally {
+ flowPrepTimerContext.stop();
+ }
// Setup flow runner
FlowWatcher watcher = null;
@@ -973,9 +989,11 @@ public class FlowRunnerManager implements EventListener,
if (execId != -1) {
FlowRunnerManager.logger.info("Submitting flow " + execId);
submitFlow(execId);
+ commonMetrics.markDispatchSuccess();
}
} catch (final Exception e) {
FlowRunnerManager.logger.error("Failed to submit flow ", e);
+ commonMetrics.markDispatchFail();
}
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index dcb4f6c..a57b218 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -97,7 +97,7 @@ public class FlowPreparerTest {
final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
new File(this.projectsDir, SAMPLE_FLOW_01));
- final File tmp = this.instance.downloadProjectIfNotExists(proj);
+ final File tmp = this.instance.downloadProjectIfNotExists(proj, 123);
final long actualDirSize = 1048835;
@@ -111,7 +111,7 @@ public class FlowPreparerTest {
public void testDownloadingProjectIfNotExists() throws Exception {
final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
new File(this.projectsDir, SAMPLE_FLOW_01));
- final File tmp = this.instance.downloadProjectIfNotExists(proj);
+ final File tmp = this.instance.downloadProjectIfNotExists(proj, 124);
final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
@@ -125,11 +125,11 @@ public class FlowPreparerTest {
public void testNotDownloadingProjectIfExists() throws Exception {
final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
new File(this.projectsDir, SAMPLE_FLOW_01));
- File tmp = this.instance.downloadProjectIfNotExists(proj);
+ File tmp = this.instance.downloadProjectIfNotExists(proj, 125);
Files.move(tmp.toPath(), proj.getInstalledDir().toPath());
// Try downloading the same project again
- tmp = this.instance.downloadProjectIfNotExists(proj);
+ tmp = this.instance.downloadProjectIfNotExists(proj, 126);
final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);