azkaban-aplcache

rewrite fetchQueuedFlows and fetchRecentlyFinishedFlows

8/21/2017 5:01:08 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 1d49c98..26046b2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -21,9 +21,11 @@ import azkaban.db.DatabaseOperator;
 import azkaban.db.SQLTransaction;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -96,6 +98,16 @@ public class ExecutionFlowDao {
     }
   }
 
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+          new FetchQueuedExecutableFlows());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
   List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
                                         final int skip, final int num,
                                         final Status status)
@@ -108,6 +120,18 @@ public class ExecutionFlowDao {
     }
   }
 
+  List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
+          new FetchRecentlyFinishedFlows(), System.currentTimeMillis() - maxAge.toMillis(),
+          Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
+          Status.FAILED.getNumVal());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching recently finished flows", e);
+    }
+  }
+
   List<ExecutableFlow> fetchFlowHistory(final String projContain, final String flowContains,
                                         final String userNameContains, final int status,
                                         final long startTime, final long endTime,
@@ -304,4 +328,105 @@ public class ExecutionFlowDao {
       return execFlows;
     }
   }
+
+  /**
+   * JDBC ResultSetHandler to fetch queued executions
+   */
+  private static class FetchQueuedExecutableFlows implements
+      ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select queued unassigned flows
+    private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows"
+            + " Where executor_id is NULL AND status = "
+            + Status.PREPARING.getNumVal();
+
+    @Override
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
+        throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+          new ArrayList<>();
+      do {
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+
+        if (data == null) {
+          logger.error("Found a flow with empty data blob exec_id: " + id);
+        } else {
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
+          try {
+            // Convoluted way to inflate strings. Should find common package or
+            // helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              final String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            final ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            final ExecutionReference ref = new ExecutionReference(id);
+            execFlows.add(new Pair<>(ref, exFlow));
+          } catch (final IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
+  private static class FetchRecentlyFinishedFlows implements
+      ResultSetHandler<List<ExecutableFlow>> {
+    // Execution_flows table is already indexed by end_time
+    private static final String FETCH_RECENTLY_FINISHED_FLOW =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+            + "WHERE end_time > ? AND status IN (?, ?, ?)";
+
+    @Override
+    public List<ExecutableFlow> handle(
+        final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<ExecutableFlow> execFlows = new ArrayList<>();
+      do {
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+
+        if (data != null) {
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
+          try {
+            if (encType == EncodingType.GZIP) {
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              final String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            final ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+
+            execFlows.add(exFlow);
+          } catch (final IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+      return execFlows;
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3856122..162c34e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -20,8 +20,6 @@ import azkaban.database.AbstractJdbcLoader;
 import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.metrics.CommonMetrics;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.GZIPUtils;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import com.google.inject.Inject;
@@ -32,8 +30,6 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
@@ -100,25 +96,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return this.executionFlowDao.fetchExecutableFlow(id);
   }
 
-  /**
-   *
-   * {@inheritDoc}
-   * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
-   */
-  @Override
+ @Override
   public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
     throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
-
-    try {
-      final List<Pair<ExecutionReference, ExecutableFlow>> flows =
-        runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
-          flowHandler);
-      return flows;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows", e);
-    }
+    return this.executionFlowDao.fetchQueuedFlows();
   }
 
   /**
@@ -127,19 +108,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
-
-    try {
-      final List<ExecutableFlow> flows =
-          runner.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
-              flowHandler, System.currentTimeMillis() - maxAge.toMillis(),
-              Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
-              Status.FAILED.getNumVal());
-      return flows;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching recently finished flows", e);
-    }
+    return this.executionFlowDao.fetchRecentlyFinishedFlows(maxAge);
   }
 
   @Override
@@ -347,55 +316,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return connection;
   }
 
-
-  /**
-   *
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
-   */
-  @Override
+ @Override
   public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
     return this.executorDao.fetchAllExecutors();
   }
 
-  /**
-   *
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
-   */
-  @Override
+ @Override
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
     return this.executorDao.fetchActiveExecutors();
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
-   */
-  @Override
+ @Override
   public Executor fetchExecutor(final String host, final int port)
     throws ExecutorManagerException {
     return this.executorDao.fetchExecutor(host, port);
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
-   */
-  @Override
+ @Override
   public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
     return this.executorDao.fetchExecutor(executorId);
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   */
-  @Override
+ @Override
   public void updateExecutor(final Executor executor) throws ExecutorManagerException {
     final String UPDATE =
       "UPDATE executors SET host=?, port=?, active=? where id=?";
@@ -415,35 +357,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
-   */
   @Override
   public Executor addExecutor(final String host, final int port)
     throws ExecutorManagerException {
     return this.executorDao.addExecutor(host, port);
   }
 
-
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#removeExecutor(String, int)
-   */
-  @Override
+ @Override
   public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
     this.executorDao.removeExecutor(host, port);
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
-   *      azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
-   *      java.lang.String)
-   */
   @Override
   public void postExecutorEvent(final Executor executor, final EventType type, final String user,
                                 final String message) throws ExecutorManagerException{
@@ -451,12 +375,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     this.executorEventsDao.postExecutorEvent(executor, type, user, message);
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#getExecutorEvents(azkaban.executor.Executor,
-   *      int, int)
-   */
   @Override
   public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
                                                   final int offset) throws ExecutorManagerException {
@@ -464,12 +382,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return this.executorEventsDao.getExecutorEvents(executor, num, offset);
   }
 
-  /**
-   *
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
-   */
   @Override
   public void assignExecutor(final int executorId, final int executionId)
     throws ExecutorManagerException {
@@ -497,12 +409,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  /**
-   *
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
-   */
   @Override
   public Executor fetchExecutorByExecutionId(final int executionId)
     throws ExecutorManagerException {
@@ -516,11 +422,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return this.executionLogsDao.removeExecutionLogsByTime(millis);
   }
 
-  /**
-   *
-   * {@inheritDoc}
-   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
-   */
   @Override
   public void unassignExecutor(final int executionId) throws ExecutorManagerException {
     final String UPDATE =
@@ -539,121 +440,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  private static class LastInsertID implements ResultSetHandler<Long> {
-    private static final String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
-
-    @Override
-    public Long handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return -1L;
-      }
-      final long id = rs.getLong(1);
-      return id;
-    }
-  }
-
-  /**
-   * JDBC ResultSetHandler to fetch queued executions
-   */
-  private static class FetchQueuedExecutableFlows implements
-    ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
-    // Select queued unassigned flows
-    private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows"
-            + " Where executor_id is NULL AND status = "
-            + Status.PREPARING.getNumVal();
-
-    @Override
-    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
-      throws SQLException {
-      if (!rs.next()) {
-        return Collections.emptyList();
-      }
-
-      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
-        new ArrayList<>();
-      do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-
-        if (data == null) {
-          logger.error("Found a flow with empty data blob exec_id: " + id);
-        } else {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
-          try {
-            // Convoluted way to inflate strings. Should find common package or
-            // helper function.
-            if (encType == EncodingType.GZIP) {
-              // Decompress the sucker.
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
-            final ExecutableFlow exFlow =
-              ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            final ExecutionReference ref = new ExecutionReference(id);
-            execFlows.add(new Pair<>(ref, exFlow));
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
-        }
-      } while (rs.next());
-
-      return execFlows;
-    }
-  }
-
-  private static class FetchRecentlyFinishedFlows implements
-    ResultSetHandler<List<ExecutableFlow>> {
-    // Execution_flows table is already indexed by end_time
-    private static final String FETCH_RECENTLY_FINISHED_FLOW =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
-            + "WHERE end_time > ? AND status IN (?, ?, ?)";
-
-    @Override
-    public List<ExecutableFlow> handle(
-        final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return Collections.emptyList();
-      }
-
-      final List<ExecutableFlow> execFlows = new ArrayList<>();
-      do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-
-        if (data != null) {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
-          try {
-            if (encType == EncodingType.GZIP) {
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
-            final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
-
-            execFlows.add(exFlow);
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
-        }
-      } while (rs.next());
-
-      return execFlows;
-    }
-  }
-
   private static class IntHandler implements ResultSetHandler<Integer> {
     private static final String NUM_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_flows";
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 5d2d6f3..d1f1263 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -20,10 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.test.Utils;
+import azkaban.utils.Pair;
 import azkaban.utils.TestUtils;
 import java.sql.SQLException;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
+import org.joda.time.DateTimeUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -32,6 +35,9 @@ import org.junit.Test;
 
 public class ExecutionFlowDaoTest {
 
+  private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
+  private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
+
   private static DatabaseOperator dbOperator;
   private ExecutionFlowDao executionFlowDao;
 
@@ -115,6 +121,60 @@ public class ExecutionFlowDaoTest {
     assertTwoFlowSame(flowList1.get(0), fetchFlow);
   }
 
+  @Test
+  public void testFetchRecentlyFinishedFlows() throws Exception {
+    final ExecutableFlow flow1 = createTestFlow();
+    this.executionFlowDao.uploadExecutableFlow(flow1);
+    flow1.setStatus(Status.SUCCEEDED);
+    flow1.setEndTime(System.currentTimeMillis());
+    this.executionFlowDao.updateExecutableFlow(flow1);
+
+    //Flow just finished. Fetch recently finished flows immediately. Should get it.
+    final List<ExecutableFlow> flows = this.executionFlowDao.fetchRecentlyFinishedFlows(
+        RECENTLY_FINISHED_LIFETIME);
+    assertThat(flows.size()).isEqualTo(1);
+    assertTwoFlowSame(flow1, flows.get(0));
+  }
+
+  @Test
+  public void testFetchEmptyRecentlyFinishedFlows() throws Exception {
+    final ExecutableFlow flow1 = createTestFlow();
+    this.executionFlowDao.uploadExecutableFlow(flow1);
+    flow1.setStatus(Status.SUCCEEDED);
+    flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+    this.executionFlowDao.updateExecutableFlow(flow1);
+    //Todo jamiesjc: use java8.java.time api instead of jodatime
+
+    //Mock flow finished time to be 2 min ago.
+    DateTimeUtils.setCurrentMillisOffset(-FLOW_FINISHED_TIME.toMillis());
+    flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+    this.executionFlowDao.updateExecutableFlow(flow1);
+
+    //Fetch recently finished flows within 1 min. Should be empty.
+    final List<ExecutableFlow> flows = this.executionFlowDao
+        .fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
+    assertThat(flows.size()).isEqualTo(0);
+  }
+
+  @Test
+  public void testFetchQueuedFlows() throws Exception {
+
+    final ExecutableFlow flow = createTestFlow();
+    flow.setStatus(Status.PREPARING);
+    this.executionFlowDao.uploadExecutableFlow(flow);
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    flow2.setStatus(Status.PREPARING);
+    this.executionFlowDao.uploadExecutableFlow(flow2);
+
+    final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = this.executionFlowDao.fetchQueuedFlows();
+    assertThat(fetchedQueuedFlows.size()).isEqualTo(2);
+    final Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
+    final Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
+
+    assertTwoFlowSame(flow, fetchedFlow1.getSecond());
+    assertTwoFlowSame(flow2, fetchedFlow2.getSecond());
+  }
+
   private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
     assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
     assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());