azkaban-aplcache

Merge pull request #500 from logiclord/multipleexecutors Fixing

9/23/2015 10:46:59 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6120002..80e8167 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -171,6 +171,20 @@ public interface ExecutorLoader {
   public void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
+
+  /**
+   * <pre>
+   * Unset executor Id for an execution
+   * Note:-
+   * throws an Exception in case of a SQL issue
+   * </pre>
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void unassignExecutor(int executionId) throws ExecutorManagerException;
+
   /**
    * <pre>
    * Set an executor Id to an execution
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 39dd831..cca687b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -74,9 +74,9 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.use.multiple.executors";
   private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
     "azkaban.webserver.queue.size";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
     "azkaban.activeexecutor.refresh.milisecinterval";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
     "azkaban.activeexecutor.refresh.flowinterval";
   private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
       "azkaban.executorinfo.refresh.maxThreads";
@@ -115,20 +115,19 @@ public class ExecutorManager extends EventHandler implements
   private long lastSuccessfulExecutorInfoRefresh;
   private ExecutorService executorInforRefresherService;
 
-  public ExecutorManager(Props props, ExecutorLoader loader,
-      Map<String, Alerter> alters) throws ExecutorManagerException {
-    azkProps = props;
+  public ExecutorManager(Props azkProps, ExecutorLoader loader,
+      Map<String, Alerter> alerters) throws ExecutorManagerException {
+    this.alerters = alerters;
+    this.azkProps = azkProps;
     this.executorLoader = loader;
     this.setupExecutors();
     this.loadRunningFlows();
 
     queuedFlows =
-        new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+        new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
 
-    alerters = alters;
-
-    cacheDir = new File(props.getString("cache.directory", "cache"));
+    cacheDir = new File(azkProps.getString("cache.directory", "cache"));
 
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
@@ -138,7 +137,7 @@ public class ExecutorManager extends EventHandler implements
     }
 
     long executionLogsRetentionMs =
-      props.getLong("execution.logs.retention.ms",
+        azkProps.getLong("execution.logs.retention.ms",
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
 
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -172,8 +171,8 @@ public class ExecutorManager extends EventHandler implements
     queueProcessor =
       new QueueProcessorThread(azkProps.getBoolean(
         AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 1000), azkProps.getInt(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 10));
 
     queueProcessor.start();
   }
@@ -255,8 +254,9 @@ public class ExecutorManager extends EventHandler implements
           // max 5 secs
           String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
           executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
-          logger.info("Successfully refreshed ExecutorInfo for executor: "
-            + executor);
+          logger.info(String.format(
+            "Successfully refreshed executor: %s with executor info : %s",
+            executor, jsonString));
         } catch (TimeoutException e) {
           wasSuccess = false;
           logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -1035,14 +1035,11 @@ public class ExecutorManager extends EventHandler implements
           executorLoader.addActiveExecutableReference(reference);
           queuedFlows.enqueue(exflow, reference);
         } else {
-          Executor executor = activeExecutors.iterator().next();
           // assign only local executor we have
-          reference.setExecutor(executor);
+          Executor choosenExecutor = activeExecutors.iterator().next();
           executorLoader.addActiveExecutableReference(reference);
           try {
-            callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
-            runningFlows.put(exflow.getExecutionId(),
-              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+            dispatch(reference, exflow, choosenExecutor);
           } catch (ExecutorManagerException e) {
             executorLoader.removeActiveExecutableReference(reference
               .getExecId());
@@ -1729,6 +1726,36 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  /**
+   * Calls executor to dispatch the flow, update db to assign the executor and
+   * in-memory state of executableFlow
+   */
+  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+    Executor choosenExecutor) throws ExecutorManagerException {
+    exflow.setUpdateTime(System.currentTimeMillis());
+
+    executorLoader.assignExecutor(choosenExecutor.getId(),
+      exflow.getExecutionId());
+    try {
+      callExecutorServer(exflow, choosenExecutor,
+        ConnectorParams.EXECUTE_ACTION);
+    } catch (ExecutorManagerException ex) {
+      logger.error("Rolling back executor assignment for execution id:"
+        + exflow.getExecutionId(), ex);
+      executorLoader.unassignExecutor(exflow.getExecutionId());
+      throw new ExecutorManagerException(ex);
+    }
+    reference.setExecutor(choosenExecutor);
+
+    // move from flow to running flows
+    runningFlows.put(exflow.getExecutionId(),
+      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+    logger.info(String.format(
+      "Successfully dispatched exec %d with error count %d",
+      exflow.getExecutionId(), reference.getNumErrors()));
+  }
+
   /*
    * This thread is responsible for processing queued flows using dispatcher and
    * making rest api calls to executor server
@@ -1790,7 +1817,7 @@ public class ExecutorManager extends EventHandler implements
     private void processQueuedFlows(long activeExecutorsRefreshWindow,
       int maxContinuousFlowProcessed) throws InterruptedException,
       ExecutorManagerException {
-      long lastExecutorRefreshTime = System.currentTimeMillis();
+      long lastExecutorRefreshTime = 0;
       Pair<ExecutionReference, ExecutableFlow> runningCandidate;
       int currentContinuousFlowProcessed = 0;
 
@@ -1811,15 +1838,38 @@ public class ExecutorManager extends EventHandler implements
           currentContinuousFlowProcessed = 0;
         }
 
-        exflow.setUpdateTime(currentTime);
-        // process flow with current snapshot of activeExecutors
-        processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        /**
+         * <pre>
+         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+         *        Currently we try each queued flow once to infer a global busy state
+         * Possible improvements:-
+         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
+         * Assumptions:-
+         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
+         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+         * </pre>
+         */
+        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+          // put back in the queue
+          queuedFlows.enqueue(exflow, reference);
+          long sleepInterval =
+            activeExecutorsRefreshWindow
+              - (currentTime - lastExecutorRefreshTime);
+          // wait till next executor refresh
+          sleep(sleepInterval);
+        } else {
+          exflow.setUpdateTime(currentTime);
+          // process flow with current snapshot of activeExecutors
+          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        }
         currentContinuousFlowProcessed++;
       }
     }
 
     /* process flow with a snapshot of available Executors */
-    private void processFlow(ExecutionReference reference,
+    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
       ExecutableFlow exflow, Set<Executor> availableExecutors)
       throws ExecutorManagerException {
       synchronized (exflow) {
@@ -1910,7 +1960,7 @@ public class ExecutorManager extends EventHandler implements
       } else {
         remainingExecutors.remove(lastSelectedExecutor);
         // try other executors except chosenExecutor
-        processFlow(reference, exflow, remainingExecutors);
+        selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
       }
     }
 
@@ -1925,23 +1975,5 @@ public class ExecutorManager extends EventHandler implements
       // schedule can starve all others
       queuedFlows.enqueue(exflow, reference);
     }
-
-    private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
-      Executor choosenExecutor) throws ExecutorManagerException {
-      exflow.setUpdateTime(System.currentTimeMillis());
-      callExecutorServer(exflow, choosenExecutor,
-        ConnectorParams.EXECUTE_ACTION);
-      executorLoader.assignExecutor(choosenExecutor.getId(),
-        exflow.getExecutionId());
-      reference.setExecutor(choosenExecutor);
-
-      // move from flow to running flows
-      runningFlows.put(exflow.getExecutionId(),
-        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
-      logger.info(String.format(
-        "Successfully dispatched exec %d with error count %d",
-        exflow.getExecutionId(), reference.getNumErrors()));
-    }
   }
 }
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 7740163..d6ffb4b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -1559,4 +1559,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       return events;
     }
   }
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+   */
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows = runner.update(UPDATE, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to unassign executor for execution : %d  ", executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating execution id "
+        + executionId, e);
+    }
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index a7e2b5f..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -335,6 +335,43 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+  /* Test exception when unassigning an missing execution */
+  @Test
+  public void testUnassignExecutorException() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      loader.unassignExecutor(2);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test happy case when unassigning executor for a flow execution */
+  @Test
+  public void testUnassignExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+    loader.unassignExecutor(flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+  }
+
   /* Test exception when assigning a non-existent executor to a flow */
   @Test
   public void testAssignExecutorInvalidExecutor()
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 833e0c6..0db2de5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -368,4 +368,9 @@ public class MockExecutorLoader implements ExecutorLoader {
     }
     return queuedFlows;
   }
+
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    executionExecutorMapping.remove(executionId);
+  }
 }