azkaban-aplcache

Incorporating feedback pr #500

9/23/2015 8:14:10 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6120002..80e8167 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -171,6 +171,20 @@ public interface ExecutorLoader {
   public void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
+
+  /**
+   * <pre>
+   * Unset executor Id for an execution
+   * Note:-
+   * throws an Exception in case of a SQL issue
+   * </pre>
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void unassignExecutor(int executionId) throws ExecutorManagerException;
+
   /**
    * <pre>
    * Set an executor Id to an execution
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index e622a15..cca687b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -74,9 +74,9 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.use.multiple.executors";
   private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
     "azkaban.webserver.queue.size";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
     "azkaban.activeexecutor.refresh.milisecinterval";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
     "azkaban.activeexecutor.refresh.flowinterval";
   private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
       "azkaban.executorinfo.refresh.maxThreads";
@@ -115,20 +115,19 @@ public class ExecutorManager extends EventHandler implements
   private long lastSuccessfulExecutorInfoRefresh;
   private ExecutorService executorInforRefresherService;
 
-  public ExecutorManager(Props props, ExecutorLoader loader,
-      Map<String, Alerter> alters) throws ExecutorManagerException {
-    alerters = alters;
-    azkProps = props;
+  public ExecutorManager(Props azkProps, ExecutorLoader loader,
+      Map<String, Alerter> alerters) throws ExecutorManagerException {
+    this.alerters = alerters;
+    this.azkProps = azkProps;
     this.executorLoader = loader;
-
     this.setupExecutors();
     this.loadRunningFlows();
 
     queuedFlows =
-        new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+        new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
 
-    cacheDir = new File(props.getString("cache.directory", "cache"));
+    cacheDir = new File(azkProps.getString("cache.directory", "cache"));
 
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
@@ -138,7 +137,7 @@ public class ExecutorManager extends EventHandler implements
     }
 
     long executionLogsRetentionMs =
-      props.getLong("execution.logs.retention.ms",
+        azkProps.getLong("execution.logs.retention.ms",
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
 
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -172,8 +171,8 @@ public class ExecutorManager extends EventHandler implements
     queueProcessor =
       new QueueProcessorThread(azkProps.getBoolean(
         AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 10));
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 1000), azkProps.getInt(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 10));
 
     queueProcessor.start();
   }
@@ -1734,10 +1733,18 @@ public class ExecutorManager extends EventHandler implements
   private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
     Executor choosenExecutor) throws ExecutorManagerException {
     exflow.setUpdateTime(System.currentTimeMillis());
-    callExecutorServer(exflow, choosenExecutor,
-      ConnectorParams.EXECUTE_ACTION);
+
     executorLoader.assignExecutor(choosenExecutor.getId(),
       exflow.getExecutionId());
+    try {
+      callExecutorServer(exflow, choosenExecutor,
+        ConnectorParams.EXECUTE_ACTION);
+    } catch (ExecutorManagerException ex) {
+      logger.error("Rolling back executor assignment for execution id:"
+        + exflow.getExecutionId(), ex);
+      executorLoader.unassignExecutor(exflow.getExecutionId());
+      throw new ExecutorManagerException(ex);
+    }
     reference.setExecutor(choosenExecutor);
 
     // move from flow to running flows
@@ -1855,14 +1862,14 @@ public class ExecutorManager extends EventHandler implements
         } else {
           exflow.setUpdateTime(currentTime);
           // process flow with current snapshot of activeExecutors
-          processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
         }
         currentContinuousFlowProcessed++;
       }
     }
 
     /* process flow with a snapshot of available Executors */
-    private void processFlow(ExecutionReference reference,
+    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
       ExecutableFlow exflow, Set<Executor> availableExecutors)
       throws ExecutorManagerException {
       synchronized (exflow) {
@@ -1953,7 +1960,7 @@ public class ExecutorManager extends EventHandler implements
       } else {
         remainingExecutors.remove(lastSelectedExecutor);
         // try other executors except chosenExecutor
-        processFlow(reference, exflow, remainingExecutors);
+        selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
       }
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 7740163..d6ffb4b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -1559,4 +1559,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       return events;
     }
   }
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+   */
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows = runner.update(UPDATE, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to unassign executor for execution : %d  ", executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating execution id "
+        + executionId, e);
+    }
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index a7e2b5f..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -335,6 +335,43 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+  /* Test exception when unassigning an missing execution */
+  @Test
+  public void testUnassignExecutorException() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      loader.unassignExecutor(2);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test happy case when unassigning executor for a flow execution */
+  @Test
+  public void testUnassignExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+    loader.unassignExecutor(flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+  }
+
   /* Test exception when assigning a non-existent executor to a flow */
   @Test
   public void testAssignExecutorInvalidExecutor()
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 833e0c6..0db2de5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -368,4 +368,9 @@ public class MockExecutorLoader implements ExecutorLoader {
     }
     return queuedFlows;
   }
+
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    executionExecutorMapping.remove(executionId);
+  }
 }