Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
new file mode 100644
index 0000000..c526d4f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.db.DatabaseOperator;
+import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+@Singleton
+public class AssignExecutorDao {
+
+ private final ExecutorDao executorDao;
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ public AssignExecutorDao(final DatabaseOperator dbOperator,
+ final ExecutorDao executorDao) {
+ this.dbOperator = dbOperator;
+ this.executorDao = executorDao;
+ }
+
+ public void assignExecutor(final int executorId, final int executionId)
+ throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=? where exec_id=?";
+ try {
+ if (this.executorDao.fetchExecutor(executorId) == null) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign non-existent executor Id: %d to execution : %d ",
+ executorId, executionId));
+ }
+
+ if (this.dbOperator.update(UPDATE, executorId, executionId) == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign executor Id: %d to non-existent execution : %d ",
+ executorId, executionId));
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error updating executor id "
+ + executorId, e);
+ }
+ }
+
+ void unassignExecutor(final int executionId) throws ExecutorManagerException {
+ final String UPDATE = "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+ try {
+ final int rows = this.dbOperator.update(UPDATE, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to unassign executor for execution : %d ", executionId));
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error updating execution id " + executionId, e);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index b88f8a9..0818532 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -27,19 +27,15 @@ import com.google.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
-import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
-import org.apache.commons.dbutils.QueryRunner;
import org.apache.log4j.Logger;
@Singleton
public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutorLoader {
- private static final Logger logger = Logger
- .getLogger(JdbcExecutorLoader.class);
private final ExecutionFlowDao executionFlowDao;
private final ExecutorDao executorDao;
private final ExecutionJobDao executionJobDao;
@@ -47,6 +43,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private final ExecutorEventsDao executorEventsDao;
private final ActiveExecutingFlowsDao activeExecutingFlowsDao;
private final FetchActiveFlowDao fetchActiveFlowDao;
+ private final AssignExecutorDao assignExecutorDao;
private final NumExecutionsDao numExecutionsDao;
private EncodingType defaultEncodingType = EncodingType.GZIP;
@@ -59,6 +56,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
final ExecutorEventsDao executorEventsDao,
final ActiveExecutingFlowsDao activeExecutingFlowsDao,
final FetchActiveFlowDao fetchActiveFlowDao,
+ final AssignExecutorDao assignExecutorDao,
final NumExecutionsDao numExecutionsDao) {
super(props, commonMetrics);
this.executionFlowDao = executionFlowDao;
@@ -69,6 +67,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
this.activeExecutingFlowsDao = activeExecutingFlowsDao;
this.fetchActiveFlowDao = fetchActiveFlowDao;
this.numExecutionsDao = numExecutionsDao;
+ this.assignExecutorDao = assignExecutorDao;
}
public EncodingType getDefaultEncodingType() {
@@ -128,21 +127,18 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
-
return this.numExecutionsDao.fetchNumExecutableFlows();
}
@Override
public int fetchNumExecutableFlows(final int projectId, final String flowId)
throws ExecutorManagerException {
-
return this.numExecutionsDao.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public int fetchNumExecutableNodes(final int projectId, final String jobId)
throws ExecutorManagerException {
-
return this.numExecutionsDao.fetchNumExecutableNodes(projectId, jobId);
}
@@ -339,35 +335,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
final int offset) throws ExecutorManagerException {
-
return this.executorEventsDao.getExecutorEvents(executor, num, offset);
}
@Override
public void assignExecutor(final int executorId, final int executionId)
throws ExecutorManagerException {
- final String UPDATE =
- "UPDATE execution_flows SET executor_id=? where exec_id=?";
-
- final QueryRunner runner = createQueryRunner();
- try {
- final Executor executor = fetchExecutor(executorId);
- if (executor == null) {
- throw new ExecutorManagerException(String.format(
- "Failed to assign non-existent executor Id: %d to execution : %d ",
- executorId, executionId));
- }
-
- final int rows = runner.update(UPDATE, executorId, executionId);
- if (rows == 0) {
- throw new ExecutorManagerException(String.format(
- "Failed to assign executor Id: %d to non-existent execution : %d ",
- executorId, executionId));
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error updating executor id "
- + executorId, e);
- }
+ this.assignExecutorDao.assignExecutor(executorId, executionId);
}
@Override
@@ -379,26 +353,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public int removeExecutionLogsByTime(final long millis)
throws ExecutorManagerException {
-
return this.executionLogsDao.removeExecutionLogsByTime(millis);
}
@Override
public void unassignExecutor(final int executionId) throws ExecutorManagerException {
- final String UPDATE =
- "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
-
- final QueryRunner runner = createQueryRunner();
- try {
- final int rows = runner.update(UPDATE, executionId);
- if (rows == 0) {
- throw new ExecutorManagerException(String.format(
- "Failed to unassign executor for execution : %d ", executionId));
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error updating execution id "
- + executionId, e);
- }
+ this.assignExecutorDao.unassignExecutor(executionId);
}
-
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 08e95de..2cc254e 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -17,6 +17,7 @@
package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import azkaban.db.DatabaseOperator;
import azkaban.test.Utils;
@@ -40,6 +41,8 @@ public class ExecutionFlowDaoTest {
private static DatabaseOperator dbOperator;
private ExecutionFlowDao executionFlowDao;
+ private ExecutorDao executorDao;
+ private AssignExecutorDao assignExecutor;
@BeforeClass
public static void setUp() throws Exception {
@@ -59,12 +62,15 @@ public class ExecutionFlowDaoTest {
@Before
public void setup() {
this.executionFlowDao = new ExecutionFlowDao(dbOperator);
+ this.executorDao= new ExecutorDao(dbOperator);
+ this.assignExecutor= new AssignExecutorDao(dbOperator, this.executorDao);
}
@After
public void clearDB() {
try {
dbOperator.update("DELETE FROM execution_flows");
+ dbOperator.update("DELETE FROM executors");
} catch (final SQLException e) {
e.printStackTrace();
}
@@ -176,6 +182,49 @@ public class ExecutionFlowDaoTest {
assertTwoFlowSame(flow2, fetchedFlow2.getSecond());
}
+ @Test
+ public void testAssignAndUnassignExecutor() throws Exception {
+ final String host = "localhost";
+ final int port = 12345;
+ final Executor executor = this.executorDao.addExecutor(host, port);
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ this.executionFlowDao.uploadExecutableFlow(flow);
+ this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
+
+ final Executor fetchExecutor = this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId());
+ assertThat(fetchExecutor).isEqualTo(executor);
+
+ this.assignExecutor.unassignExecutor(flow.getExecutionId());
+ assertThat(this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId())).isNull();
+ }
+
+ /* Test exception when assigning a non-existent executor to a flow */
+ @Test
+ public void testAssignExecutorInvalidExecutor() throws Exception {
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ this.executionFlowDao.uploadExecutableFlow(flow);
+
+ // Since we haven't inserted any executors, 1 should be non-existent executor id.
+ assertThatThrownBy(
+ () -> this.assignExecutor.assignExecutor(1, flow.getExecutionId()))
+ .isInstanceOf(ExecutorManagerException.class)
+ .hasMessageContaining("non-existent executor");
+ }
+
+ /* Test exception when assigning an executor to a non-existent flow execution */
+ @Test
+ public void testAssignExecutorInvalidExecution() throws Exception{
+ final String host = "localhost";
+ final int port = 12345;
+ final Executor executor = this.executorDao.addExecutor(host, port);
+
+ // Make 99 a random non-existent execution id.
+ assertThatThrownBy(
+ () -> this.assignExecutor.assignExecutor(executor.getId(), 99))
+ .isInstanceOf(ExecutorManagerException.class)
+ .hasMessageContaining("non-existent execution");
+ }
+
private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 4c6bd2f..d10b2a9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1062,7 +1062,7 @@ public class JdbcExecutorLoaderTest {
return new JdbcExecutorLoader(props,
new CommonMetrics(new MetricsManager(new MetricRegistry())), null
, null, null, null, null,
- null, null, null);
+ null, null, null, null);
}
private boolean isTestSetup() {