Details
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 2fb2f1c..6f7f892 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -90,7 +90,6 @@ public class AzkabanExecutorServer {
private static AzkabanExecutorServer app;
- private final ExecMetrics execMetrics;
private final ExecutorLoader executionLoader;
private final FlowRunnerManager runnerManager;
private final MetricsManager metricsManager;
@@ -106,15 +105,13 @@ public class AzkabanExecutorServer {
final ExecutorLoader executionLoader,
final FlowRunnerManager runnerManager,
final MetricsManager metricsManager,
- final ExecMetrics execMetrics,
@Named(EXEC_JETTY_SERVER) final Server server,
- @Named(EXEC_ROOT_CONTEXT) final Context root) throws Exception {
+ @Named(EXEC_ROOT_CONTEXT) final Context root) {
this.props = props;
this.executionLoader = executionLoader;
this.runnerManager = runnerManager;
this.metricsManager = metricsManager;
- this.execMetrics = execMetrics;
this.server = server;
this.root = root;
}
@@ -260,13 +257,11 @@ public class AzkabanExecutorServer {
logger.info("Started Executor Server on " + getExecutorHostPort());
if (this.props.getBoolean(ConfigurationKeys.IS_METRICS_ENABLED, false)) {
- startExecMetrics();
+ startReportingExecMetrics();
}
}
- private void startExecMetrics() throws Exception {
- this.execMetrics.addFlowRunnerManagerMetrics(getFlowRunnerManager());
-
+ private void startReportingExecMetrics() {
logger.info("starting reporting Executor Metrics");
this.metricsManager.startReporting("AZ-EXEC", this.props);
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
index c5185be..b0e544f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -16,6 +16,7 @@
package azkaban.execapp;
+import azkaban.execapp.metric.ProjectCacheHitRatio;
import azkaban.metrics.MetricsManager;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -27,15 +28,19 @@ import javax.inject.Singleton;
public class ExecMetrics {
private final MetricsManager metricsManager;
+ private final ProjectCacheHitRatio projectCacheHitRatio;
@Inject
ExecMetrics(final MetricsManager metricsManager) {
this.metricsManager = metricsManager;
- setupStaticMetrics();
+ // setup project cache ratio metrics
+ this.projectCacheHitRatio = new ProjectCacheHitRatio();
+ metricsManager.addGauge("EXEC-ProjectDirCacheHitRatio",
+ this.projectCacheHitRatio::getRatio);
}
- public void setupStaticMetrics() {
-
+ ProjectCacheHitRatio getProjectCacheHitRatio() {
+ return this.projectCacheHitRatio;
}
public void addFlowRunnerManagerMetrics(final FlowRunnerManager flowRunnerManager) {
@@ -43,7 +48,5 @@ public class ExecMetrics {
.addGauge("EXEC-NumRunningFlows", flowRunnerManager::getNumRunningFlows);
this.metricsManager
.addGauge("EXEC-NumQueuedFlows", flowRunnerManager::getNumQueuedFlows);
- this.metricsManager
- .addGauge("EXEC-ProjectDirCacheHitRatio", flowRunnerManager::getProjectDirCacheHitRatio);
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 98a8a73..e444841 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -20,6 +20,7 @@ package azkaban.execapp;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+import azkaban.execapp.metric.ProjectCacheHitRatio;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
import azkaban.project.ProjectFileHandler;
@@ -55,35 +56,15 @@ class FlowPreparer {
private final StorageManager storageManager;
// Null if cache clean-up is disabled
private final Optional<ProjectCacheCleaner> projectCacheCleaner;
- private final ProjectCacheMetrics cacheMetrics;
-
- @VisibleForTesting
- static class ProjectCacheMetrics {
- private long cacheHit;
- private long cacheMiss;
-
- /**
- * @return hit ratio of project dirs cache
- */
- synchronized double getHitRatio() {
- final long total = this.cacheHit + this.cacheMiss;
- return total == 0 ? 0 : this.cacheHit * 1.0 / total;
- }
-
- synchronized void incrementCacheHit() {
- this.cacheHit++;
- }
-
- synchronized void incrementCacheMiss() {
- this.cacheMiss++;
- }
- }
+ private final ProjectCacheHitRatio projectCacheHitRatio;
FlowPreparer(final StorageManager storageManager, final File executionsDir,
- final File projectsDir, final ProjectCacheCleaner cleaner) {
+ final File projectsDir, final ProjectCacheCleaner cleaner,
+ final ProjectCacheHitRatio projectCacheHitRatio) {
Preconditions.checkNotNull(storageManager);
Preconditions.checkNotNull(executionsDir);
Preconditions.checkNotNull(projectsDir);
+ Preconditions.checkNotNull(projectCacheHitRatio);
Preconditions.checkArgument(projectsDir.exists());
Preconditions.checkArgument(executionsDir.exists());
@@ -92,11 +73,7 @@ class FlowPreparer {
this.executionsDir = executionsDir;
this.projectCacheDir = projectsDir;
this.projectCacheCleaner = Optional.ofNullable(cleaner);
- this.cacheMetrics = new ProjectCacheMetrics();
- }
-
- double getProjectDirCacheHitRatio() {
- return this.cacheMetrics.getHitRatio();
+ this.projectCacheHitRatio = projectCacheHitRatio;
}
/**
@@ -247,7 +224,7 @@ class FlowPreparer {
* @throws IOException if downloading or unzipping fails.
*/
@VisibleForTesting
- File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj, int execId)
+ File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj, final int execId)
throws IOException {
final String projectDir = generateProjectDirName(proj);
if (proj.getInstalledDir() == null) {
@@ -256,9 +233,9 @@ class FlowPreparer {
// If directory exists, assume it's prepared and skip.
if (proj.getInstalledDir().exists()) {
- log.info("Project {} already cached. Skipping download. ExecId: {}", proj, execId);
+ log.info("Project {} already cached. Skipping download. ExecId: {}", proj, execId);
// Hit the local cache.
- this.cacheMetrics.incrementCacheHit();
+ this.projectCacheHitRatio.markHit();
// Update last modified time of the file keeping project dir size when the project is
// accessed. This last modified time will be used to determined least recently used
// projects when performing project directory clean-up.
@@ -267,7 +244,7 @@ class FlowPreparer {
return null;
}
- this.cacheMetrics.incrementCacheMiss();
+ this.projectCacheHitRatio.markMiss();
final File tempDir = createTempDir(proj);
downloadAndUnzipProject(proj, tempDir);
return tempDir;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 096d4cb..78bebac 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -138,6 +138,7 @@ public class FlowRunnerManager implements EventListener,
private final File projectDirectory;
private final Object executionDirDeletionSync = new Object();
private final CommonMetrics commonMetrics;
+ private final ExecMetrics execMetrics;
private final int numThreads;
private final int numJobThreadPerFlow;
@@ -164,6 +165,7 @@ public class FlowRunnerManager implements EventListener,
final TriggerManager triggerManager,
final AlerterHolder alerterHolder,
final CommonMetrics commonMetrics,
+ final ExecMetrics execMetrics,
@Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
this.azkabanProps = props;
@@ -192,6 +194,7 @@ public class FlowRunnerManager implements EventListener,
this.triggerManager = triggerManager;
this.alerterHolder = alerterHolder;
this.commonMetrics = commonMetrics;
+ this.execMetrics = execMetrics;
this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
@@ -219,7 +222,9 @@ public class FlowRunnerManager implements EventListener,
// Create a flow preparer
this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
- this.projectDirectory, cleaner);
+ this.projectDirectory, cleaner, this.execMetrics.getProjectCacheHitRatio());
+
+ this.execMetrics.addFlowRunnerManagerMetrics(this);
this.cleanerThread = new CleanerThread();
this.cleanerThread.start();
@@ -232,10 +237,6 @@ public class FlowRunnerManager implements EventListener,
}
}
- public double getProjectDirCacheHitRatio() {
- return this.flowPreparer.getProjectDirCacheHitRatio();
- }
-
/**
* Setting the gid bit on the execution directory forces all files/directories created within the
* directory to be a part of the group associated with the azkaban process. Then, when users
@@ -383,10 +384,10 @@ public class FlowRunnerManager implements EventListener,
// Record the time between submission, and when the flow preparation/execution starts.
// Note that since submit time is recorded on the web server, while flow preparation is on
// the executor, there could be some inaccuracies due to clock skew.
- commonMetrics.addQueueWait(System.currentTimeMillis() -
+ this.commonMetrics.addQueueWait(System.currentTimeMillis() -
flow.getExecutableFlow().getSubmitTime());
- final Timer.Context flowPrepTimerContext = commonMetrics.getFlowSetupTimerContext();
+ final Timer.Context flowPrepTimerContext = this.commonMetrics.getFlowSetupTimerContext();
try {
if (this.active || isExecutorSpecified(flow)) {
@@ -1036,11 +1037,11 @@ public class FlowRunnerManager implements EventListener,
if (execId != -1) {
FlowRunnerManager.logger.info("Submitting flow " + execId);
submitFlow(execId);
- commonMetrics.markDispatchSuccess();
+ FlowRunnerManager.this.commonMetrics.markDispatchSuccess();
}
} catch (final Exception e) {
FlowRunnerManager.logger.error("Failed to submit flow ", e);
- commonMetrics.markDispatchFail();
+ FlowRunnerManager.this.commonMetrics.markDispatchFail();
}
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/ProjectCacheHitRatio.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/ProjectCacheHitRatio.java
new file mode 100644
index 0000000..99bb3cb
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/ProjectCacheHitRatio.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2019 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import com.codahale.metrics.RatioGauge;
+import com.codahale.metrics.SlidingWindowReservoir;
+import java.util.Arrays;
+
+/**
+ * Project cache hit ratio of last 100 cache accesses.
+ *
+ * <p>The advantage of sampling last 100 caches accesses over time-based sampling like last hour's
+ * cache accesses is the former is more deterministic. Suppose there's only few execution in last
+ * hour, then hit ratio might not be truly informative, which doesn't necessarily reflect
+ * performance of the cache.</p>
+ */
+public class ProjectCacheHitRatio extends RatioGauge {
+
+ private final SlidingWindowReservoir hits;
+ public static final int WINDOW_SIZE = 100;
+
+ public ProjectCacheHitRatio() {
+ this.hits = new SlidingWindowReservoir(WINDOW_SIZE);
+ }
+
+ public synchronized void markHit() {
+ this.hits.update(1);
+ }
+
+ public synchronized void markMiss() {
+ this.hits.update(0);
+ }
+
+ @Override
+ public synchronized Ratio getRatio() {
+ final long hitCount = Arrays.stream(this.hits.getSnapshot().getValues()).sum();
+ return Ratio.of(hitCount, this.hits.getSnapshot().size());
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index a57b218..66933b2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -29,7 +29,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import azkaban.execapp.FlowPreparer.ProjectCacheMetrics;
+import azkaban.execapp.metric.ProjectCacheHitRatio;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
import azkaban.project.ProjectFileHandler;
@@ -88,7 +88,8 @@ public class FlowPreparerTest {
this.projectsDir = this.temporaryFolder.newFolder("projects");
this.instance = spy(
- new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null));
+ new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null,
+ new ProjectCacheHitRatio()));
doNothing().when(this.instance).updateLastModifiedTime(any());
}
@@ -185,34 +186,4 @@ public class FlowPreparerTest {
assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
}
-
- @Test
- public void testProjectsCacheMetricsZeroHit() {
- //given
- final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
-
- //when zero hit and zero miss then
- assertThat(cacheMetrics.getHitRatio()).isEqualTo(0);
-
- //when
- cacheMetrics.incrementCacheMiss();
- //then
- assertThat(cacheMetrics.getHitRatio()).isEqualTo(0);
- }
-
- @Test
- public void testProjectsCacheMetricsHit() {
- //given
- final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
-
- //when one hit
- cacheMetrics.incrementCacheHit();
- //then
- assertThat(cacheMetrics.getHitRatio()).isEqualTo(1);
-
- //when one miss
- cacheMetrics.incrementCacheMiss();
- //then
- assertThat(cacheMetrics.getHitRatio()).isEqualTo(0.5);
- }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/metrics/ProjectCacheHitRatioTest.java b/azkaban-exec-server/src/test/java/azkaban/metrics/ProjectCacheHitRatioTest.java
new file mode 100644
index 0000000..e781f30
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/metrics/ProjectCacheHitRatioTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.execapp.metric.ProjectCacheHitRatio;
+import org.junit.Test;
+
+
+public class ProjectCacheHitRatioTest {
+
+ @Test
+ public void testProjectCacheZeroHit() {
+ //given
+ final ProjectCacheHitRatio hitRatio = new ProjectCacheHitRatio();
+
+ //when zero hit and zero miss then
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(Double.NaN);
+
+ //when
+ hitRatio.markMiss();
+ //then
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(0);
+ }
+
+ @Test
+ public void testProjectCacheMetricsHit() {
+ //given
+ final ProjectCacheHitRatio hitRatio = new ProjectCacheHitRatio();
+
+ //when one hit
+ hitRatio.markHit();
+ //then
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(1);
+
+ //when one miss
+ hitRatio.markMiss();
+ //then
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(0.5);
+ }
+
+ @Test
+ public void testProjectCacheAccessWindowSize() {
+ //given
+ final ProjectCacheHitRatio hitRatio = new ProjectCacheHitRatio();
+
+ //when all hits
+ for (int i = 0; i < ProjectCacheHitRatio.WINDOW_SIZE; i++) {
+ hitRatio.markHit();
+ }
+ //then
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(1);
+
+ //when the 101th access is miss
+ hitRatio.markMiss();
+
+ //then
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(0.99);
+
+ //when the 102th access is miss
+ hitRatio.markMiss();
+ assertThat(hitRatio.getRatio().getValue()).isEqualTo(0.98);
+ }
+}