azkaban-developers

Remove recently finished flows cache (#1093) * Remove recentlyFinishedFlows

6/4/2017 1:25:57 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 2bf7baf..eb8c24c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -19,6 +19,7 @@ package azkaban.executor;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.time.Duration;
 
 import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
@@ -32,6 +33,9 @@ public interface ExecutorLoader {
   ExecutableFlow fetchExecutableFlow(int execId)
       throws ExecutorManagerException;
 
+  List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+      throws ExecutorManagerException;
+
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4549ef0..b1e315b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -96,8 +97,7 @@ public class ExecutorManager extends EventHandler implements
 
   private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
       new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-  private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
-      new ConcurrentHashMap<Integer, ExecutableFlow>();
+
 
   QueuedExecutions queuedFlows;
 
@@ -109,6 +109,7 @@ public class ExecutorManager extends EventHandler implements
   // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
       * 24 * 60 * 60 * 1000L;
+  private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
   private long lastCleanerThreadCheckTime = -1;
 
   private long lastThreadCheckTime = -1;
@@ -626,7 +627,15 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public List<ExecutableFlow> getRecentlyFinishedFlows() {
-    return new ArrayList<ExecutableFlow>(recentlyFinished.values());
+    List<ExecutableFlow> flows = new ArrayList<>();
+    try {
+      flows = executorLoader.fetchRecentlyFinishedFlows(
+          RECENTLY_FINISHED_LIFETIME);
+    } catch(ExecutorManagerException e) {
+      //Todo jamiesjc: fix error handling.
+      logger.error("Failed to fetch recently finished flows.", e);
+    }
+    return flows;
   }
 
   @Override
@@ -1212,8 +1221,6 @@ public class ExecutorManager extends EventHandler implements
       this.setName("ExecutorManagerUpdaterThread");
     }
 
-    // 10 mins recently finished threshold.
-    private long recentlyFinishedLifetimeMs = 600000;
     private int waitTimeIdleMs = 2000;
     private int waitTimeMs = 500;
 
@@ -1327,10 +1334,6 @@ public class ExecutorManager extends EventHandler implements
               }
             }
 
-            updaterStage = "Evicting old recently finished flows.";
-
-            evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
-            // Add new finished
             for (ExecutableFlow flow : finishedFlows) {
               if (flow.getScheduleId() >= 0
                   && flow.getStatus() == Status.SUCCEEDED) {
@@ -1338,7 +1341,6 @@ public class ExecutorManager extends EventHandler implements
                     cacheDir);
               }
               fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow)));
-              recentlyFinished.put(flow.getExecutionId(), flow);
             }
 
             updaterStage =
@@ -1404,7 +1406,6 @@ public class ExecutorManager extends EventHandler implements
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
       runningFlows.remove(execId);
       fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
-      recentlyFinished.put(execId, dsFlow);
 
     } catch (ExecutorManagerException e) {
       alertUser = false; // failed due to azkaban internal error, not to alert user
@@ -1506,20 +1507,6 @@ public class ExecutorManager extends EventHandler implements
     exFlow.setStatus(Status.FAILED);
   }
 
-  private void evictOldRecentlyFinished(long ageMs) {
-    ArrayList<Integer> recentlyFinishedKeys =
-        new ArrayList<Integer>(recentlyFinished.keySet());
-    long oldAgeThreshold = System.currentTimeMillis() - ageMs;
-    for (Integer key : recentlyFinishedKeys) {
-      ExecutableFlow flow = recentlyFinished.get(key);
-
-      if (flow.getEndTime() < oldAgeThreshold) {
-        // Evict
-        recentlyFinished.remove(key);
-      }
-    }
-  }
-
   private ExecutableFlow updateExecution(Map<String, Object> updateData)
       throws ExecutorManagerException {
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0b79bb7..e7fff84 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -32,6 +32,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.time.Duration;
 
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
@@ -202,6 +203,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   * maxAge indicates how long finished flows are shown in Recently Finished flow page.
+   */
+  @Override
+  public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+      throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
+
+    try {
+      List<ExecutableFlow> flows =
+          runner.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
+              flowHandler, System.currentTimeMillis() - maxAge.toMillis(),
+              Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
+              Status.FAILED.getNumVal());
+      return flows;
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error fetching recently finished flows", e);
+    }
+  }
+
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
@@ -1364,6 +1386,52 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  private static class FetchRecentlyFinishedFlows implements
+    ResultSetHandler<List<ExecutableFlow>> {
+    // Execution_flows table is already indexed by end_time
+    private static String FETCH_RECENTLY_FINISHED_FLOW =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+            + "WHERE end_time > ? AND status IN (?, ?, ?)";
+
+    @Override
+    public List<ExecutableFlow> handle(
+        ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      List<ExecutableFlow> execFlows = new ArrayList<>();
+      do {
+        int id = rs.getInt(1);
+        int encodingType = rs.getInt(2);
+        byte[] data = rs.getBytes(3);
+
+        if (data != null) {
+          EncodingType encType = EncodingType.fromInteger(encodingType);
+          Object flowObj;
+          try {
+            if (encType == EncodingType.GZIP) {
+              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+
+            execFlows.add(exFlow);
+          } catch (IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
   private static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
     // Select running and executor assigned flows
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 3a41b56..ea0d43d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -33,6 +34,7 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -59,6 +61,8 @@ public class JdbcExecutorLoaderTest {
   private static final String user = "azkaban";
   private static final String password = "azkaban";
   private static final int numConnections = 10;
+  private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
+  private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
 
   @BeforeClass
   public static void setupDB() {
@@ -873,6 +877,51 @@ public class JdbcExecutorLoaderTest {
     Assert.assertEquals(flow1.getVersion(), execFlow1.getVersion());
   }
 
+  @Test
+  public void testFetchRecentlyFinishedFlows() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow1);
+    flow1.setStatus(Status.SUCCEEDED);
+    flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+    loader.updateExecutableFlow(flow1);
+    //Flow just finished. Fetch recently finished flows immediately. Should get it.
+    List<ExecutableFlow> flows = loader.fetchRecentlyFinishedFlows(
+        RECENTLY_FINISHED_LIFETIME);
+    Assert.assertEquals(1, flows.size());
+    Assert.assertEquals(flow1.getExecutionId(), flows.get(0).getExecutionId());
+    Assert.assertEquals(flow1.getProjectName(), flows.get(0).getProjectName());
+    Assert.assertEquals(flow1.getFlowId(), flows.get(0).getFlowId());
+    Assert.assertEquals(flow1.getVersion(), flows.get(0).getVersion());
+  }
+
+  @Test
+  public void testFetchEmptyRecentlyFinishedFlows() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow1);
+    flow1.setStatus(Status.SUCCEEDED);
+    flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+    loader.updateExecutableFlow(flow1);
+    //Todo jamiesjc: use java8.java.time api instead of jodatime
+    //Mock flow finished time to be 2 min ago.
+    DateTimeUtils.setCurrentMillisOffset(-FLOW_FINISHED_TIME.toMillis());
+    flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+    loader.updateExecutableFlow(flow1);
+    //Fetch recently finished flows within 1 min. Should be empty.
+    List<ExecutableFlow> flows = loader
+        .fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
+    Assert.assertTrue(flows.isEmpty());
+  }
+
   @Ignore @Test
   public void testSmallUploadLog() throws ExecutorManagerException {
     File logDir = new File(UNIT_BASE_DIR + "logtest");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 1d320c5..adadb2d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.time.Duration;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
@@ -410,4 +411,10 @@ public class MockExecutorLoader implements ExecutorLoader {
   public void unassignExecutor(final int executionId) throws ExecutorManagerException {
     this.executionExecutorMapping.remove(executionId);
   }
+
+  @Override
+  public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+      throws ExecutorManagerException {
+    return new ArrayList<>();
+  }
 }