diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
new file mode 100644
index 0000000..dd26850
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class ExecutorDao extends AbstractJdbcLoader {
+
+ private static final Logger logger = Logger.getLogger(ExecutorDao.class);
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ public ExecutorDao(final Props props, final CommonMetrics commonMetrics,
+ final DatabaseOperator dbOperator) {
+ super(props, commonMetrics);
+ this.dbOperator = dbOperator;
+ }
+
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ final List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+ return executors;
+ } catch (final Exception e) {
+ throw new ExecutorManagerException("Error fetching executors", e);
+ }
+ }
+
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ final List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
+ executorHandler);
+ return executors;
+ } catch (final Exception e) {
+ throw new ExecutorManagerException("Error fetching active executors", e);
+ }
+ }
+
+ public Executor fetchExecutor(final String host, final int port)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ final List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+ executorHandler, host, port);
+ if (executors.isEmpty()) {
+ return null;
+ } else {
+ return executors.get(0);
+ }
+ } catch (final Exception e) {
+ throw new ExecutorManagerException(String.format(
+ "Error fetching executor %s:%d", host, port), e);
+ }
+ }
+
+ public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ final List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+ executorHandler, executorId);
+ if (executors.isEmpty()) {
+ return null;
+ } else {
+ return executors.get(0);
+ }
+ } catch (final Exception e) {
+ throw new ExecutorManagerException(String.format(
+ "Error fetching executor with id: %d", executorId), e);
+ }
+ }
+
+ public Executor fetchExecutorByExecutionId(final int executionId)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ Executor executor = null;
+ try {
+ final List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+ executorHandler, executionId);
+ if (executors.size() > 0) {
+ executor = executors.get(0);
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(
+ "Error fetching executor for exec_id : " + executionId, e);
+ }
+ return executor;
+ }
+
+ public Executor addExecutor(final String host, final int port)
+ throws ExecutorManagerException {
+ // verify, if executor already exists
+ Executor executor = fetchExecutor(host, port);
+ if (executor != null) {
+ throw new ExecutorManagerException(String.format(
+ "Executor %s:%d already exist", host, port));
+ }
+ // add new executor
+ addExecutorHelper(host, port);
+ // fetch newly added executor
+ executor = fetchExecutor(host, port);
+
+ return executor;
+ }
+
+ private void addExecutorHelper(final String host, final int port)
+ throws ExecutorManagerException {
+ final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
+ final QueryRunner runner = createQueryRunner();
+ try {
+ runner.update(INSERT, host, port);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+ host, port), e);
+ }
+ }
+
+ public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
+ final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
+ final QueryRunner runner = createQueryRunner();
+ try {
+ final int rows = runner.update(DELETE, host, port);
+ if (rows == 0) {
+ throw new ExecutorManagerException("No executor with host, port :"
+ + "(" + host + "," + port + ")");
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error removing executor with host, port : "
+ + "(" + host + "," + port + ")", e);
+ }
+ }
+
+ /**
+ * JDBC ResultSetHandler to fetch records from executors table
+ */
+ public static class FetchExecutorHandler implements
+ ResultSetHandler<List<Executor>> {
+
+ static String FETCH_ALL_EXECUTORS =
+ "SELECT id, host, port, active FROM executors";
+ static String FETCH_ACTIVE_EXECUTORS =
+ "SELECT id, host, port, active FROM executors where active=true";
+ static String FETCH_EXECUTOR_BY_ID =
+ "SELECT id, host, port, active FROM executors where id=?";
+ static String FETCH_EXECUTOR_BY_HOST_PORT =
+ "SELECT id, host, port, active FROM executors where host=? AND port=?";
+ static String FETCH_EXECUTION_EXECUTOR =
+ "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+ + " executors ex INNER JOIN execution_flows ef "
+ + "on ex.id = ef.executor_id where exec_id=?";
+
+ @Override
+ public List<Executor> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<Executor> executors = new ArrayList<>();
+ do {
+ final int id = rs.getInt(1);
+ final String host = rs.getString(2);
+ final int port = rs.getInt(3);
+ final boolean active = rs.getBoolean(4);
+ final Executor executor = new Executor(id, host, port, active);
+ executors.add(executor);
+ } while (rs.next());
+
+ return executors;
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 207796c..9c1d579 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -58,13 +58,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutorLoader {
private static final Logger logger = Logger
.getLogger(JdbcExecutorLoader.class);
- private final ExecutionFlowDBManager executionFlowDBManager;
+ private final ExecutionFlowDao executionFlowDao;
+ private final ExecutorDao executorDao;
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics, final ExecutionFlowDBManager executionFlowDBManager) {
+ public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics,
+ final ExecutionFlowDao executionFlowDao,
+ final ExecutorDao executorDao) {
super(props, commonMetrics);
- this.executionFlowDBManager = executionFlowDBManager;
+ this.executionFlowDao = executionFlowDao;
+ this.executorDao = executorDao;
}
public EncodingType getDefaultEncodingType() {
@@ -78,19 +82,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- this.executionFlowDBManager.uploadExecutableFlow(flow);
+ this.executionFlowDao.uploadExecutableFlow(flow);
}
@Override
public void updateExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- this.executionFlowDBManager.updateExecutableFlow(flow);
+ this.executionFlowDao.updateExecutableFlow(flow);
}
@Override
public ExecutableFlow fetchExecutableFlow(final int id)
throws ExecutorManagerException {
- return this.executionFlowDBManager.fetchExecutableFlow(id);
+ return this.executionFlowDao.fetchExecutableFlow(id);
}
/**
@@ -220,26 +224,26 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num) throws ExecutorManagerException {
- return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num);
+ return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num, final Status status) throws ExecutorManagerException {
- return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num, status);
+ return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num, status);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
throws ExecutorManagerException {
- return this.executionFlowDBManager.fetchFlowHistory(skip,num);
+ return this.executionFlowDao.fetchFlowHistory(skip,num);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final String projContain,
final String flowContains, final String userNameContains, final int status, final long startTime,
final long endTime, final int skip, final int num) throws ExecutorManagerException {
- return this.executionFlowDBManager.fetchFlowHistory(projContain, flowContains,
+ return this.executionFlowDao.fetchFlowHistory(projContain, flowContains,
userNameContains, status, startTime, endTime, skip, num);
}
@@ -648,16 +652,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
@Override
public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
- try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
- return executors;
- } catch (final Exception e) {
- throw new ExecutorManagerException("Error fetching executors", e);
- }
+ return this.executorDao.fetchAllExecutors();
}
/**
@@ -668,17 +663,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
@Override
public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
- try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
- executorHandler);
- return executors;
- } catch (final Exception e) {
- throw new ExecutorManagerException("Error fetching active executors", e);
- }
+ return this.executorDao.fetchActiveExecutors();
}
/**
@@ -689,22 +674,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Executor fetchExecutor(final String host, final int port)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
- try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
- executorHandler, host, port);
- if (executors.isEmpty()) {
- return null;
- } else {
- return executors.get(0);
- }
- } catch (final Exception e) {
- throw new ExecutorManagerException(String.format(
- "Error fetching executor %s:%d", host, port), e);
- }
+ return this.executorDao.fetchExecutor(host, port);
}
/**
@@ -714,22 +684,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
@Override
public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
- try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
- executorHandler, executorId);
- if (executors.isEmpty()) {
- return null;
- } else {
- return executors.get(0);
- }
- } catch (final Exception e) {
- throw new ExecutorManagerException(String.format(
- "Error fetching executor with id: %d", executorId), e);
- }
+ return this.executorDao.fetchExecutor(executorId);
}
/**
@@ -764,30 +719,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Executor addExecutor(final String host, final int port)
throws ExecutorManagerException {
- // verify, if executor already exists
- Executor executor = fetchExecutor(host, port);
- if (executor != null) {
- throw new ExecutorManagerException(String.format(
- "Executor %s:%d already exist", host, port));
- }
- // add new executor
- addExecutorHelper(host, port);
- // fetch newly added executor
- executor = fetchExecutor(host, port);
-
- return executor;
- }
-
- private void addExecutorHelper(final String host, final int port)
- throws ExecutorManagerException {
- final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
- final QueryRunner runner = createQueryRunner();
- try {
- runner.update(INSERT, host, port);
- } catch (final SQLException e) {
- throw new ExecutorManagerException(String.format("Error adding %s:%d ",
- host, port), e);
- }
+ return this.executorDao.addExecutor(host, port);
}
@@ -798,18 +730,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
@Override
public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
- final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
- final QueryRunner runner = createQueryRunner();
- try {
- final int rows = runner.update(DELETE, host, port);
- if (rows == 0) {
- throw new ExecutorManagerException("No executor with host, port :"
- + "(" + host + "," + port + ")");
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error removing executor with host, port : "
- + "(" + host + "," + port + ")", e);
- }
+ this.executorDao.removeExecutor(host, port);
}
/**
@@ -902,21 +823,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Executor fetchExecutorByExecutionId(final int executionId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
- Executor executor = null;
- try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
- executorHandler, executionId);
- if (executors.size() > 0) {
- executor = executors.get(0);
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException(
- "Error fetching executor for exec_id : " + executionId, e);
- }
- return executor;
+ return this.executorDao.fetchExecutorByExecutionId(executionId);
}
@Override
@@ -1406,45 +1313,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- /**
- * JDBC ResultSetHandler to fetch records from executors table
- */
- private static class FetchExecutorHandler implements
- ResultSetHandler<List<Executor>> {
- private static final String FETCH_ALL_EXECUTORS =
- "SELECT id, host, port, active FROM executors";
- private static final String FETCH_ACTIVE_EXECUTORS =
- "SELECT id, host, port, active FROM executors where active=true";
- private static final String FETCH_EXECUTOR_BY_ID =
- "SELECT id, host, port, active FROM executors where id=?";
- private static final String FETCH_EXECUTOR_BY_HOST_PORT =
- "SELECT id, host, port, active FROM executors where host=? AND port=?";
- private static final String FETCH_EXECUTION_EXECUTOR =
- "SELECT ex.id, ex.host, ex.port, ex.active FROM "
- + " executors ex INNER JOIN execution_flows ef "
- + "on ex.id = ef.executor_id where exec_id=?";
-
- @Override
- public List<Executor> handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return Collections.<Executor> emptyList();
- }
-
- final List<Executor> executors = new ArrayList<>();
- do {
- final int id = rs.getInt(1);
- final String host = rs.getString(2);
- final int port = rs.getInt(3);
- final boolean active = rs.getBoolean(4);
- final Executor executor = new Executor(id, host, port, active);
- executors.add(executor);
- } while (rs.next());
-
- return executors;
- }
- }
-
- /**
+ /**
* JDBC ResultSetHandler to fetch records from executor_events table
*/
private static class ExecutorLogsResultHandler implements