azkaban-aplcache

create num execution DAO class (#1378) As a follow-up of #1345,

8/21/2017 9:03:02 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 00c3f86..b88f8a9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -27,14 +27,12 @@ import com.google.inject.Singleton;
 import java.io.File;
 import java.io.IOException;
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
-import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.log4j.Logger;
 
 @Singleton
@@ -49,6 +47,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   private final ExecutorEventsDao executorEventsDao;
   private final ActiveExecutingFlowsDao activeExecutingFlowsDao;
   private final FetchActiveFlowDao fetchActiveFlowDao;
+  private final NumExecutionsDao numExecutionsDao;
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
@@ -59,7 +58,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
                             final ExecutionLogsDao executionLogsDao,
                             final ExecutorEventsDao executorEventsDao,
                             final ActiveExecutingFlowsDao activeExecutingFlowsDao,
-                            final FetchActiveFlowDao fetchActiveFlowDao) {
+                            final FetchActiveFlowDao fetchActiveFlowDao,
+                            final NumExecutionsDao numExecutionsDao) {
     super(props, commonMetrics);
     this.executionFlowDao = executionFlowDao;
     this.executorDao = executorDao;
@@ -68,6 +68,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     this.executorEventsDao = executorEventsDao;
     this.activeExecutingFlowsDao = activeExecutingFlowsDao;
     this.fetchActiveFlowDao = fetchActiveFlowDao;
+    this.numExecutionsDao = numExecutionsDao;
   }
 
   public EncodingType getDefaultEncodingType() {
@@ -127,47 +128,22 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   @Override
   public int fetchNumExecutableFlows() throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    final IntHandler intHandler = new IntHandler();
-    try {
-      final int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
-      return count;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching num executions", e);
-    }
+    return this.numExecutionsDao.fetchNumExecutableFlows();
   }
 
   @Override
   public int fetchNumExecutableFlows(final int projectId, final String flowId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    final IntHandler intHandler = new IntHandler();
-    try {
-      final int count =
-          runner.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId,
-              flowId);
-      return count;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching num executions", e);
-    }
+    return this.numExecutionsDao.fetchNumExecutableFlows(projectId, flowId);
   }
 
   @Override
   public int fetchNumExecutableNodes(final int projectId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    final IntHandler intHandler = new IntHandler();
-    try {
-      final int count =
-          runner.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId,
-              jobId);
-      return count;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching num executions", e);
-    }
+    return this.numExecutionsDao.fetchNumExecutableNodes(projectId, jobId);
   }
 
   @Override
@@ -425,23 +401,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  private static class IntHandler implements ResultSetHandler<Integer> {
-    private static final String NUM_EXECUTIONS =
-        "SELECT COUNT(1) FROM execution_flows";
-    private static final String NUM_FLOW_EXECUTIONS =
-        "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
-    private static final String NUM_JOB_EXECUTIONS =
-        "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
-    private static final String FETCH_EXECUTOR_ID =
-        "SELECT executor_id FROM execution_flows WHERE exec_id=?";
-
-    @Override
-    public Integer handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return 0;
-      }
-      return rs.getInt(1);
-    }
-  }
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/NumExecutionsDao.java b/azkaban-common/src/main/java/azkaban/executor/NumExecutionsDao.java
new file mode 100644
index 0000000..d2618f5
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/NumExecutionsDao.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.db.DatabaseOperator;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class NumExecutionsDao {
+
+  private static final Logger logger = Logger.getLogger(NumExecutionsDao.class);
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  public NumExecutionsDao(final DatabaseOperator dbOperator) {
+    this.dbOperator = dbOperator;
+  }
+
+  public int fetchNumExecutableFlows() throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(IntHandler.NUM_EXECUTIONS, new IntHandler());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching num executions", e);
+    }
+  }
+
+  public int fetchNumExecutableFlows(final int projectId, final String flowId)
+      throws ExecutorManagerException {
+    final IntHandler intHandler = new IntHandler();
+    try {
+      return this.dbOperator.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching num executions", e);
+    }
+  }
+
+  public int fetchNumExecutableNodes(final int projectId, final String jobId)
+      throws ExecutorManagerException {
+    final IntHandler intHandler = new IntHandler();
+    try {
+      return this.dbOperator.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching num executions", e);
+    }
+  }
+
+  private static class IntHandler implements ResultSetHandler<Integer> {
+
+    private static final String NUM_EXECUTIONS =
+        "SELECT COUNT(1) FROM execution_flows";
+    private static final String NUM_FLOW_EXECUTIONS =
+        "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
+    private static final String NUM_JOB_EXECUTIONS =
+        "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+
+    @Override
+    public Integer handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return 0;
+      }
+      return rs.getInt(1);
+    }
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index e3317f1..4c6bd2f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1062,7 +1062,7 @@ public class JdbcExecutorLoaderTest {
     return new JdbcExecutorLoader(props,
         new CommonMetrics(new MetricsManager(new MetricRegistry())), null
         , null, null, null, null,
-        null, null);
+        null, null, null);
   }
 
   private boolean isTestSetup() {
diff --git a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
new file mode 100644
index 0000000..d3791e7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.test.Utils;
+import azkaban.utils.TestUtils;
+import java.sql.SQLException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class NumExecutionsDaoTest {
+
+  private static DatabaseOperator dbOperator;
+  private NumExecutionsDao numExecutionsDao;
+  private ExecutionFlowDao executionFlowDao;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    dbOperator = Utils.initTestDB();
+  }
+
+  @AfterClass
+  public static void destroyDB() throws Exception {
+    try {
+      dbOperator.update("DROP ALL OBJECTS");
+      dbOperator.update("SHUTDOWN");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Before
+  public void setup() {
+    this.executionFlowDao = new ExecutionFlowDao(dbOperator);
+    this.numExecutionsDao = new NumExecutionsDao(dbOperator);
+  }
+
+  @After
+  public void clearDB() {
+    try {
+      dbOperator.update("delete from execution_flows");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testFetchNumExecutableFlows() throws Exception {
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow1.setStatus(Status.PREPARING);
+    this.executionFlowDao.uploadExecutableFlow(flow1);
+
+
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    flow2.setStatus(Status.RUNNING);
+    this.executionFlowDao.uploadExecutableFlow(flow2);
+
+    final ExecutableFlow flow2b = TestUtils.createExecutableFlow("exectest1", "exec2");
+    flow2b.setStatus(Status.FAILED);
+    this.executionFlowDao.uploadExecutableFlow(flow2b);
+
+    final int count = this.numExecutionsDao.fetchNumExecutableFlows();
+    assertThat(count).isEqualTo(3);
+
+    final int flow2Count = this.numExecutionsDao.fetchNumExecutableFlows(1, "derived-member-data-2");
+    assertThat(flow2Count).isEqualTo(2);
+  }
+
+  @Test
+  public void testFetchNumExecutableNodes() throws Exception {
+    // This test will be filled up after execution_jobs test completes.
+  }
+}