azkaban-aplcache

Project cache hit ratio metrics enhancement (#2137) This

3/4/2019 11:48:42 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 2fb2f1c..6f7f892 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -90,7 +90,6 @@ public class AzkabanExecutorServer {
 
   private static AzkabanExecutorServer app;
 
-  private final ExecMetrics execMetrics;
   private final ExecutorLoader executionLoader;
   private final FlowRunnerManager runnerManager;
   private final MetricsManager metricsManager;
@@ -106,15 +105,13 @@ public class AzkabanExecutorServer {
       final ExecutorLoader executionLoader,
       final FlowRunnerManager runnerManager,
       final MetricsManager metricsManager,
-      final ExecMetrics execMetrics,
       @Named(EXEC_JETTY_SERVER) final Server server,
-      @Named(EXEC_ROOT_CONTEXT) final Context root) throws Exception {
+      @Named(EXEC_ROOT_CONTEXT) final Context root) {
     this.props = props;
     this.executionLoader = executionLoader;
     this.runnerManager = runnerManager;
 
     this.metricsManager = metricsManager;
-    this.execMetrics = execMetrics;
     this.server = server;
     this.root = root;
   }
@@ -260,13 +257,11 @@ public class AzkabanExecutorServer {
     logger.info("Started Executor Server on " + getExecutorHostPort());
 
     if (this.props.getBoolean(ConfigurationKeys.IS_METRICS_ENABLED, false)) {
-      startExecMetrics();
+      startReportingExecMetrics();
     }
   }
 
-  private void startExecMetrics() throws Exception {
-    this.execMetrics.addFlowRunnerManagerMetrics(getFlowRunnerManager());
-
+  private void startReportingExecMetrics() {
     logger.info("starting reporting Executor Metrics");
     this.metricsManager.startReporting("AZ-EXEC", this.props);
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
index c5185be..b0e544f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -16,6 +16,7 @@
 
 package azkaban.execapp;
 
+import azkaban.execapp.metric.ProjectCacheHitRatio;
 import azkaban.metrics.MetricsManager;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -27,15 +28,19 @@ import javax.inject.Singleton;
 public class ExecMetrics {
 
   private final MetricsManager metricsManager;
+  private final ProjectCacheHitRatio projectCacheHitRatio;
 
   @Inject
   ExecMetrics(final MetricsManager metricsManager) {
     this.metricsManager = metricsManager;
-    setupStaticMetrics();
+    // setup project cache ratio metrics
+    this.projectCacheHitRatio = new ProjectCacheHitRatio();
+    metricsManager.addGauge("EXEC-ProjectDirCacheHitRatio",
+        this.projectCacheHitRatio::getRatio);
   }
 
-  public void setupStaticMetrics() {
-
+  ProjectCacheHitRatio getProjectCacheHitRatio() {
+    return this.projectCacheHitRatio;
   }
 
   public void addFlowRunnerManagerMetrics(final FlowRunnerManager flowRunnerManager) {
@@ -43,7 +48,5 @@ public class ExecMetrics {
         .addGauge("EXEC-NumRunningFlows", flowRunnerManager::getNumRunningFlows);
     this.metricsManager
         .addGauge("EXEC-NumQueuedFlows", flowRunnerManager::getNumQueuedFlows);
-    this.metricsManager
-        .addGauge("EXEC-ProjectDirCacheHitRatio", flowRunnerManager::getProjectDirCacheHitRatio);
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 98a8a73..e444841 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -20,6 +20,7 @@ package azkaban.execapp;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
+import azkaban.execapp.metric.ProjectCacheHitRatio;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.project.ProjectFileHandler;
@@ -55,35 +56,15 @@ class FlowPreparer {
   private final StorageManager storageManager;
   // Null if cache clean-up is disabled
   private final Optional<ProjectCacheCleaner> projectCacheCleaner;
-  private final ProjectCacheMetrics cacheMetrics;
-
-  @VisibleForTesting
-  static class ProjectCacheMetrics {
-    private long cacheHit;
-    private long cacheMiss;
-
-    /**
-     * @return hit ratio of project dirs cache
-     */
-    synchronized double getHitRatio() {
-      final long total = this.cacheHit + this.cacheMiss;
-      return total == 0 ? 0 : this.cacheHit * 1.0 / total;
-    }
-
-    synchronized void incrementCacheHit() {
-      this.cacheHit++;
-    }
-
-    synchronized void incrementCacheMiss() {
-      this.cacheMiss++;
-    }
-  }
+  private final ProjectCacheHitRatio projectCacheHitRatio;
 
   FlowPreparer(final StorageManager storageManager, final File executionsDir,
-      final File projectsDir, final ProjectCacheCleaner cleaner) {
+      final File projectsDir, final ProjectCacheCleaner cleaner,
+      final ProjectCacheHitRatio projectCacheHitRatio) {
     Preconditions.checkNotNull(storageManager);
     Preconditions.checkNotNull(executionsDir);
     Preconditions.checkNotNull(projectsDir);
+    Preconditions.checkNotNull(projectCacheHitRatio);
 
     Preconditions.checkArgument(projectsDir.exists());
     Preconditions.checkArgument(executionsDir.exists());
@@ -92,11 +73,7 @@ class FlowPreparer {
     this.executionsDir = executionsDir;
     this.projectCacheDir = projectsDir;
     this.projectCacheCleaner = Optional.ofNullable(cleaner);
-    this.cacheMetrics = new ProjectCacheMetrics();
-  }
-
-  double getProjectDirCacheHitRatio() {
-    return this.cacheMetrics.getHitRatio();
+    this.projectCacheHitRatio = projectCacheHitRatio;
   }
 
   /**
@@ -247,7 +224,7 @@ class FlowPreparer {
    * @throws IOException if downloading or unzipping fails.
    */
   @VisibleForTesting
-  File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj, int execId)
+  File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj, final int execId)
       throws IOException {
     final String projectDir = generateProjectDirName(proj);
     if (proj.getInstalledDir() == null) {
@@ -256,9 +233,9 @@ class FlowPreparer {
 
     // If directory exists, assume it's prepared and skip.
     if (proj.getInstalledDir().exists()) {
-	log.info("Project {} already cached. Skipping download. ExecId: {}", proj, execId);
+      log.info("Project {} already cached. Skipping download. ExecId: {}", proj, execId);
       // Hit the local cache.
-      this.cacheMetrics.incrementCacheHit();
+      this.projectCacheHitRatio.markHit();
       // Update last modified time of the file keeping project dir size when the project is
       // accessed. This last modified time will be used to determined least recently used
       // projects when performing project directory clean-up.
@@ -267,7 +244,7 @@ class FlowPreparer {
       return null;
     }
 
-    this.cacheMetrics.incrementCacheMiss();
+    this.projectCacheHitRatio.markMiss();
     final File tempDir = createTempDir(proj);
     downloadAndUnzipProject(proj, tempDir);
     return tempDir;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 096d4cb..78bebac 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -138,6 +138,7 @@ public class FlowRunnerManager implements EventListener,
   private final File projectDirectory;
   private final Object executionDirDeletionSync = new Object();
   private final CommonMetrics commonMetrics;
+  private final ExecMetrics execMetrics;
 
   private final int numThreads;
   private final int numJobThreadPerFlow;
@@ -164,6 +165,7 @@ public class FlowRunnerManager implements EventListener,
       final TriggerManager triggerManager,
       final AlerterHolder alerterHolder,
       final CommonMetrics commonMetrics,
+      final ExecMetrics execMetrics,
       @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
     this.azkabanProps = props;
 
@@ -192,6 +194,7 @@ public class FlowRunnerManager implements EventListener,
     this.triggerManager = triggerManager;
     this.alerterHolder = alerterHolder;
     this.commonMetrics = commonMetrics;
+    this.execMetrics = execMetrics;
 
     this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
     this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
@@ -219,7 +222,9 @@ public class FlowRunnerManager implements EventListener,
 
     // Create a flow preparer
     this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
-        this.projectDirectory, cleaner);
+        this.projectDirectory, cleaner, this.execMetrics.getProjectCacheHitRatio());
+
+    this.execMetrics.addFlowRunnerManagerMetrics(this);
 
     this.cleanerThread = new CleanerThread();
     this.cleanerThread.start();
@@ -232,10 +237,6 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
-  public double getProjectDirCacheHitRatio() {
-    return this.flowPreparer.getProjectDirCacheHitRatio();
-  }
-
   /**
    * Setting the gid bit on the execution directory forces all files/directories created within the
    * directory to be a part of the group associated with the azkaban process. Then, when users
@@ -383,10 +384,10 @@ public class FlowRunnerManager implements EventListener,
     // Record the time between submission, and when the flow preparation/execution starts.
     // Note that since submit time is recorded on the web server, while flow preparation is on
     // the executor, there could be some inaccuracies due to clock skew.
-    commonMetrics.addQueueWait(System.currentTimeMillis() -
+    this.commonMetrics.addQueueWait(System.currentTimeMillis() -
         flow.getExecutableFlow().getSubmitTime());
 
-    final Timer.Context flowPrepTimerContext = commonMetrics.getFlowSetupTimerContext();
+    final Timer.Context flowPrepTimerContext = this.commonMetrics.getFlowSetupTimerContext();
 
     try {
       if (this.active || isExecutorSpecified(flow)) {
@@ -1036,11 +1037,11 @@ public class FlowRunnerManager implements EventListener,
           if (execId != -1) {
             FlowRunnerManager.logger.info("Submitting flow " + execId);
             submitFlow(execId);
-            commonMetrics.markDispatchSuccess();
+            FlowRunnerManager.this.commonMetrics.markDispatchSuccess();
           }
         } catch (final Exception e) {
           FlowRunnerManager.logger.error("Failed to submit flow ", e);
-          commonMetrics.markDispatchFail();
+          FlowRunnerManager.this.commonMetrics.markDispatchFail();
         }
       }
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/ProjectCacheHitRatio.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/ProjectCacheHitRatio.java
new file mode 100644
index 0000000..99bb3cb
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/ProjectCacheHitRatio.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2019 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import com.codahale.metrics.RatioGauge;
+import com.codahale.metrics.SlidingWindowReservoir;
+import java.util.Arrays;
+
+/**
+ * Project cache hit ratio of last 100 cache accesses.
+ *
+ * <p>The advantage of sampling last 100 caches accesses over time-based sampling like last hour's
+ * cache accesses is the former is more deterministic. Suppose there's only few execution in last
+ * hour, then hit ratio might not be truly informative, which doesn't necessarily reflect
+ * performance of the cache.</p>
+ */
+public class ProjectCacheHitRatio extends RatioGauge {
+
+  private final SlidingWindowReservoir hits;
+  public static final int WINDOW_SIZE = 100;
+
+  public ProjectCacheHitRatio() {
+    this.hits = new SlidingWindowReservoir(WINDOW_SIZE);
+  }
+
+  public synchronized void markHit() {
+    this.hits.update(1);
+  }
+
+  public synchronized void markMiss() {
+    this.hits.update(0);
+  }
+
+  @Override
+  public synchronized Ratio getRatio() {
+    final long hitCount = Arrays.stream(this.hits.getSnapshot().getValues()).sum();
+    return Ratio.of(hitCount, this.hits.getSnapshot().size());
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index a57b218..66933b2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -29,7 +29,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import azkaban.execapp.FlowPreparer.ProjectCacheMetrics;
+import azkaban.execapp.metric.ProjectCacheHitRatio;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.project.ProjectFileHandler;
@@ -88,7 +88,8 @@ public class FlowPreparerTest {
     this.projectsDir = this.temporaryFolder.newFolder("projects");
 
     this.instance = spy(
-        new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null));
+        new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null,
+            new ProjectCacheHitRatio()));
     doNothing().when(this.instance).updateLastModifiedTime(any());
   }
 
@@ -185,34 +186,4 @@ public class FlowPreparerTest {
     assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
   }
 
-
-  @Test
-  public void testProjectsCacheMetricsZeroHit() {
-    //given
-    final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
-
-    //when zero hit and zero miss then
-    assertThat(cacheMetrics.getHitRatio()).isEqualTo(0);
-
-    //when
-    cacheMetrics.incrementCacheMiss();
-    //then
-    assertThat(cacheMetrics.getHitRatio()).isEqualTo(0);
-  }
-
-  @Test
-  public void testProjectsCacheMetricsHit() {
-    //given
-    final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
-
-    //when one hit
-    cacheMetrics.incrementCacheHit();
-    //then
-    assertThat(cacheMetrics.getHitRatio()).isEqualTo(1);
-
-    //when one miss
-    cacheMetrics.incrementCacheMiss();
-    //then
-    assertThat(cacheMetrics.getHitRatio()).isEqualTo(0.5);
-  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/metrics/ProjectCacheHitRatioTest.java b/azkaban-exec-server/src/test/java/azkaban/metrics/ProjectCacheHitRatioTest.java
new file mode 100644
index 0000000..e781f30
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/metrics/ProjectCacheHitRatioTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.execapp.metric.ProjectCacheHitRatio;
+import org.junit.Test;
+
+
+public class ProjectCacheHitRatioTest {
+
+  @Test
+  public void testProjectCacheZeroHit() {
+    //given
+    final ProjectCacheHitRatio hitRatio = new ProjectCacheHitRatio();
+
+    //when zero hit and zero miss then
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(Double.NaN);
+
+    //when
+    hitRatio.markMiss();
+    //then
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(0);
+  }
+
+  @Test
+  public void testProjectCacheMetricsHit() {
+    //given
+    final ProjectCacheHitRatio hitRatio = new ProjectCacheHitRatio();
+
+    //when one hit
+    hitRatio.markHit();
+    //then
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(1);
+
+    //when one miss
+    hitRatio.markMiss();
+    //then
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(0.5);
+  }
+
+  @Test
+  public void testProjectCacheAccessWindowSize() {
+    //given
+    final ProjectCacheHitRatio hitRatio = new ProjectCacheHitRatio();
+
+    //when all hits
+    for (int i = 0; i < ProjectCacheHitRatio.WINDOW_SIZE; i++) {
+      hitRatio.markHit();
+    }
+    //then
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(1);
+
+    //when the 101th access is miss
+    hitRatio.markMiss();
+
+    //then
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(0.99);
+
+    //when the 102th access is miss
+    hitRatio.markMiss();
+    assertThat(hitRatio.getRatio().getValue()).isEqualTo(0.98);
+  }
+}