azkaban-aplcache

rewrite assign executor jdbc methods (#1379) Still, this

8/22/2017 5:25:23 PM
3.33.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
new file mode 100644
index 0000000..c526d4f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.db.DatabaseOperator;
+import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+@Singleton
+public class AssignExecutorDao {
+
+  private final ExecutorDao executorDao;
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  public AssignExecutorDao(final DatabaseOperator dbOperator,
+                           final ExecutorDao executorDao) {
+    this.dbOperator = dbOperator;
+    this.executorDao = executorDao;
+  }
+
+  public void assignExecutor(final int executorId, final int executionId)
+      throws ExecutorManagerException {
+    final String UPDATE =
+        "UPDATE execution_flows SET executor_id=? where exec_id=?";
+    try {
+      if (this.executorDao.fetchExecutor(executorId) == null) {
+        throw new ExecutorManagerException(String.format(
+            "Failed to assign non-existent executor Id: %d to execution : %d  ",
+            executorId, executionId));
+      }
+
+      if (this.dbOperator.update(UPDATE, executorId, executionId) == 0) {
+        throw new ExecutorManagerException(String.format(
+            "Failed to assign executor Id: %d to non-existent execution : %d  ",
+            executorId, executionId));
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error updating executor id "
+          + executorId, e);
+    }
+  }
+
+  void unassignExecutor(final int executionId) throws ExecutorManagerException {
+    final String UPDATE = "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+    try {
+      final int rows = this.dbOperator.update(UPDATE, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+            "Failed to unassign executor for execution : %d  ", executionId));
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error updating execution id " + executionId, e);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index b88f8a9..0818532 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -27,19 +27,15 @@ import com.google.inject.Singleton;
 import java.io.File;
 import java.io.IOException;
 import java.sql.Connection;
-import java.sql.SQLException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
-import org.apache.commons.dbutils.QueryRunner;
 import org.apache.log4j.Logger;
 
 @Singleton
 public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     ExecutorLoader {
-  private static final Logger logger = Logger
-      .getLogger(JdbcExecutorLoader.class);
   private final ExecutionFlowDao executionFlowDao;
   private final ExecutorDao executorDao;
   private final ExecutionJobDao executionJobDao;
@@ -47,6 +43,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   private final ExecutorEventsDao executorEventsDao;
   private final ActiveExecutingFlowsDao activeExecutingFlowsDao;
   private final FetchActiveFlowDao fetchActiveFlowDao;
+  private final AssignExecutorDao assignExecutorDao;
   private final NumExecutionsDao numExecutionsDao;
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
@@ -59,6 +56,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
                             final ExecutorEventsDao executorEventsDao,
                             final ActiveExecutingFlowsDao activeExecutingFlowsDao,
                             final FetchActiveFlowDao fetchActiveFlowDao,
+                            final AssignExecutorDao assignExecutorDao,
                             final NumExecutionsDao numExecutionsDao) {
     super(props, commonMetrics);
     this.executionFlowDao = executionFlowDao;
@@ -69,6 +67,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     this.activeExecutingFlowsDao = activeExecutingFlowsDao;
     this.fetchActiveFlowDao = fetchActiveFlowDao;
     this.numExecutionsDao = numExecutionsDao;
+    this.assignExecutorDao = assignExecutorDao;
   }
 
   public EncodingType getDefaultEncodingType() {
@@ -128,21 +127,18 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   @Override
   public int fetchNumExecutableFlows() throws ExecutorManagerException {
-
     return this.numExecutionsDao.fetchNumExecutableFlows();
   }
 
   @Override
   public int fetchNumExecutableFlows(final int projectId, final String flowId)
       throws ExecutorManagerException {
-
     return this.numExecutionsDao.fetchNumExecutableFlows(projectId, flowId);
   }
 
   @Override
   public int fetchNumExecutableNodes(final int projectId, final String jobId)
       throws ExecutorManagerException {
-
     return this.numExecutionsDao.fetchNumExecutableNodes(projectId, jobId);
   }
 
@@ -339,35 +335,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
                                                   final int offset) throws ExecutorManagerException {
-
     return this.executorEventsDao.getExecutorEvents(executor, num, offset);
   }
 
   @Override
   public void assignExecutor(final int executorId, final int executionId)
     throws ExecutorManagerException {
-    final String UPDATE =
-      "UPDATE execution_flows SET executor_id=? where exec_id=?";
-
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final Executor executor = fetchExecutor(executorId);
-      if (executor == null) {
-        throw new ExecutorManagerException(String.format(
-          "Failed to assign non-existent executor Id: %d to execution : %d  ",
-          executorId, executionId));
-      }
-
-      final int rows = runner.update(UPDATE, executorId, executionId);
-      if (rows == 0) {
-        throw new ExecutorManagerException(String.format(
-          "Failed to assign executor Id: %d to non-existent execution : %d  ",
-          executorId, executionId));
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error updating executor id "
-        + executorId, e);
-    }
+    this.assignExecutorDao.assignExecutor(executorId, executionId);
   }
 
   @Override
@@ -379,26 +353,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public int removeExecutionLogsByTime(final long millis)
       throws ExecutorManagerException {
-
     return this.executionLogsDao.removeExecutionLogsByTime(millis);
   }
 
   @Override
   public void unassignExecutor(final int executionId) throws ExecutorManagerException {
-    final String UPDATE =
-      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
-
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final int rows = runner.update(UPDATE, executionId);
-      if (rows == 0) {
-        throw new ExecutorManagerException(String.format(
-          "Failed to unassign executor for execution : %d  ", executionId));
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error updating execution id "
-        + executionId, e);
-    }
+    this.assignExecutorDao.unassignExecutor(executionId);
   }
-
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 08e95de..2cc254e 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.test.Utils;
@@ -40,6 +41,8 @@ public class ExecutionFlowDaoTest {
 
   private static DatabaseOperator dbOperator;
   private ExecutionFlowDao executionFlowDao;
+  private ExecutorDao executorDao;
+  private AssignExecutorDao assignExecutor;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -59,12 +62,15 @@ public class ExecutionFlowDaoTest {
   @Before
   public void setup() {
     this.executionFlowDao = new ExecutionFlowDao(dbOperator);
+    this.executorDao= new ExecutorDao(dbOperator);
+    this.assignExecutor= new AssignExecutorDao(dbOperator, this.executorDao);
   }
 
   @After
   public void clearDB() {
     try {
       dbOperator.update("DELETE FROM execution_flows");
+      dbOperator.update("DELETE FROM executors");
     } catch (final SQLException e) {
       e.printStackTrace();
     }
@@ -176,6 +182,49 @@ public class ExecutionFlowDaoTest {
     assertTwoFlowSame(flow2, fetchedFlow2.getSecond());
   }
 
+  @Test
+  public void testAssignAndUnassignExecutor() throws Exception {
+    final String host = "localhost";
+    final int port = 12345;
+    final Executor executor = this.executorDao.addExecutor(host, port);
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    this.executionFlowDao.uploadExecutableFlow(flow);
+    this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
+
+    final Executor fetchExecutor = this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId());
+    assertThat(fetchExecutor).isEqualTo(executor);
+
+    this.assignExecutor.unassignExecutor(flow.getExecutionId());
+    assertThat(this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId())).isNull();
+  }
+
+  /* Test exception when assigning a non-existent executor to a flow */
+  @Test
+  public void testAssignExecutorInvalidExecutor() throws Exception {
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    this.executionFlowDao.uploadExecutableFlow(flow);
+
+    // Since we haven't inserted any executors, 1 should be non-existent executor id.
+    assertThatThrownBy(
+        () -> this.assignExecutor.assignExecutor(1, flow.getExecutionId()))
+            .isInstanceOf(ExecutorManagerException.class)
+            .hasMessageContaining("non-existent executor");
+  }
+
+  /* Test exception when assigning an executor to a non-existent flow execution */
+  @Test
+  public void testAssignExecutorInvalidExecution() throws Exception{
+    final String host = "localhost";
+    final int port = 12345;
+    final Executor executor = this.executorDao.addExecutor(host, port);
+
+    // Make 99 a random non-existent execution id.
+    assertThatThrownBy(
+        () -> this.assignExecutor.assignExecutor(executor.getId(), 99))
+        .isInstanceOf(ExecutorManagerException.class)
+        .hasMessageContaining("non-existent execution");
+  }
+
   private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
     assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
     assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 4c6bd2f..d10b2a9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1062,7 +1062,7 @@ public class JdbcExecutorLoaderTest {
     return new JdbcExecutorLoader(props,
         new CommonMetrics(new MetricsManager(new MetricRegistry())), null
         , null, null, null, null,
-        null, null, null);
+        null, null, null, null);
   }
 
   private boolean isTestSetup() {