Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 00c3f86..b88f8a9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -27,14 +27,12 @@ import com.google.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
-import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;
@Singleton
@@ -49,6 +47,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private final ExecutorEventsDao executorEventsDao;
private final ActiveExecutingFlowsDao activeExecutingFlowsDao;
private final FetchActiveFlowDao fetchActiveFlowDao;
+ private final NumExecutionsDao numExecutionsDao;
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
@@ -59,7 +58,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
final ExecutionLogsDao executionLogsDao,
final ExecutorEventsDao executorEventsDao,
final ActiveExecutingFlowsDao activeExecutingFlowsDao,
- final FetchActiveFlowDao fetchActiveFlowDao) {
+ final FetchActiveFlowDao fetchActiveFlowDao,
+ final NumExecutionsDao numExecutionsDao) {
super(props, commonMetrics);
this.executionFlowDao = executionFlowDao;
this.executorDao = executorDao;
@@ -68,6 +68,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
this.executorEventsDao = executorEventsDao;
this.activeExecutingFlowsDao = activeExecutingFlowsDao;
this.fetchActiveFlowDao = fetchActiveFlowDao;
+ this.numExecutionsDao = numExecutionsDao;
}
public EncodingType getDefaultEncodingType() {
@@ -127,47 +128,22 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final IntHandler intHandler = new IntHandler();
- try {
- final int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
- return count;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching num executions", e);
- }
+ return this.numExecutionsDao.fetchNumExecutableFlows();
}
@Override
public int fetchNumExecutableFlows(final int projectId, final String flowId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final IntHandler intHandler = new IntHandler();
- try {
- final int count =
- runner.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId,
- flowId);
- return count;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching num executions", e);
- }
+ return this.numExecutionsDao.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public int fetchNumExecutableNodes(final int projectId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final IntHandler intHandler = new IntHandler();
- try {
- final int count =
- runner.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId,
- jobId);
- return count;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching num executions", e);
- }
+ return this.numExecutionsDao.fetchNumExecutableNodes(projectId, jobId);
}
@Override
@@ -425,23 +401,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- private static class IntHandler implements ResultSetHandler<Integer> {
- private static final String NUM_EXECUTIONS =
- "SELECT COUNT(1) FROM execution_flows";
- private static final String NUM_FLOW_EXECUTIONS =
- "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
- private static final String NUM_JOB_EXECUTIONS =
- "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
- private static final String FETCH_EXECUTOR_ID =
- "SELECT executor_id FROM execution_flows WHERE exec_id=?";
-
- @Override
- public Integer handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return 0;
- }
- return rs.getInt(1);
- }
- }
-
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/NumExecutionsDao.java b/azkaban-common/src/main/java/azkaban/executor/NumExecutionsDao.java
new file mode 100644
index 0000000..d2618f5
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/NumExecutionsDao.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.db.DatabaseOperator;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class NumExecutionsDao {
+
+ private static final Logger logger = Logger.getLogger(NumExecutionsDao.class);
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ public NumExecutionsDao(final DatabaseOperator dbOperator) {
+ this.dbOperator = dbOperator;
+ }
+
+ public int fetchNumExecutableFlows() throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(IntHandler.NUM_EXECUTIONS, new IntHandler());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching num executions", e);
+ }
+ }
+
+ public int fetchNumExecutableFlows(final int projectId, final String flowId)
+ throws ExecutorManagerException {
+ final IntHandler intHandler = new IntHandler();
+ try {
+ return this.dbOperator.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching num executions", e);
+ }
+ }
+
+ public int fetchNumExecutableNodes(final int projectId, final String jobId)
+ throws ExecutorManagerException {
+ final IntHandler intHandler = new IntHandler();
+ try {
+ return this.dbOperator.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching num executions", e);
+ }
+ }
+
+ private static class IntHandler implements ResultSetHandler<Integer> {
+
+ private static final String NUM_EXECUTIONS =
+ "SELECT COUNT(1) FROM execution_flows";
+ private static final String NUM_FLOW_EXECUTIONS =
+ "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
+ private static final String NUM_JOB_EXECUTIONS =
+ "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+
+ @Override
+ public Integer handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return 0;
+ }
+ return rs.getInt(1);
+ }
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index e3317f1..4c6bd2f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1062,7 +1062,7 @@ public class JdbcExecutorLoaderTest {
return new JdbcExecutorLoader(props,
new CommonMetrics(new MetricsManager(new MetricRegistry())), null
, null, null, null, null,
- null, null);
+ null, null, null);
}
private boolean isTestSetup() {
diff --git a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
new file mode 100644
index 0000000..d3791e7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.test.Utils;
+import azkaban.utils.TestUtils;
+import java.sql.SQLException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class NumExecutionsDaoTest {
+
+ private static DatabaseOperator dbOperator;
+ private NumExecutionsDao numExecutionsDao;
+ private ExecutionFlowDao executionFlowDao;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ dbOperator = Utils.initTestDB();
+ }
+
+ @AfterClass
+ public static void destroyDB() throws Exception {
+ try {
+ dbOperator.update("DROP ALL OBJECTS");
+ dbOperator.update("SHUTDOWN");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setup() {
+ this.executionFlowDao = new ExecutionFlowDao(dbOperator);
+ this.numExecutionsDao = new NumExecutionsDao(dbOperator);
+ }
+
+ @After
+ public void clearDB() {
+ try {
+ dbOperator.update("delete from execution_flows");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testFetchNumExecutableFlows() throws Exception {
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow1.setStatus(Status.PREPARING);
+ this.executionFlowDao.uploadExecutableFlow(flow1);
+
+
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ flow2.setStatus(Status.RUNNING);
+ this.executionFlowDao.uploadExecutableFlow(flow2);
+
+ final ExecutableFlow flow2b = TestUtils.createExecutableFlow("exectest1", "exec2");
+ flow2b.setStatus(Status.FAILED);
+ this.executionFlowDao.uploadExecutableFlow(flow2b);
+
+ final int count = this.numExecutionsDao.fetchNumExecutableFlows();
+ assertThat(count).isEqualTo(3);
+
+ final int flow2Count = this.numExecutionsDao.fetchNumExecutableFlows(1, "derived-member-data-2");
+ assertThat(flow2Count).isEqualTo(2);
+ }
+
+ @Test
+ public void testFetchNumExecutableNodes() throws Exception {
+ // This test will be filled up after execution_jobs test completes.
+ }
+}