azkaban-aplcache

Finalize execution if executor doesn't exist (#2016) * Finalize

11/6/2018 7:29:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index f9b0cab..1e934f8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -48,18 +48,20 @@ public class RunningExecutionsUpdater {
   private final ExecutorApiGateway apiGateway;
   private final RunningExecutions runningExecutions;
   private final ExecutionFinalizer executionFinalizer;
+  private final ExecutorLoader executorLoader;
 
   @Inject
   public RunningExecutionsUpdater(final ExecutorManagerUpdaterStage updaterStage,
       final AlerterHolder alerterHolder, final CommonMetrics commonMetrics,
       final ExecutorApiGateway apiGateway, final RunningExecutions runningExecutions,
-      final ExecutionFinalizer executionFinalizer) {
+      final ExecutionFinalizer executionFinalizer, final ExecutorLoader executorLoader) {
     this.updaterStage = updaterStage;
     this.alerterHolder = alerterHolder;
     this.commonMetrics = commonMetrics;
     this.apiGateway = apiGateway;
     this.runningExecutions = runningExecutions;
     this.executionFinalizer = executionFinalizer;
+    this.executorLoader = executorLoader;
   }
 
   /**
@@ -93,7 +95,7 @@ public class RunningExecutionsUpdater {
       try {
         results = this.apiGateway.updateExecutions(executor, entry.getValue());
       } catch (final ExecutorManagerException e) {
-        handleException(entry, executor, e);
+        handleException(entry, executor, e, finalizeFlows);
       }
 
       if (results != null) {
@@ -133,24 +135,31 @@ public class RunningExecutionsUpdater {
   }
 
   private void handleException(final Entry<Optional<Executor>, List<ExecutableFlow>> entry,
-      final Executor executor, final ExecutorManagerException e) {
+      final Executor executor, final ExecutorManagerException e,
+      final ArrayList<ExecutableFlow> finalizeFlows) {
     logger.error("Failed to get update from executor " + executor.getHost(), e);
     boolean sendUnresponsiveEmail = false;
+    final boolean executorRemoved = isExecutorRemoved(executor.getId());
     for (final ExecutableFlow flow : entry.getValue()) {
       final Pair<ExecutionReference, ExecutableFlow> pair =
           this.runningExecutions.get().get(flow.getExecutionId());
-      // TODO can runningFlows.get ever return null, causing NPE below?
 
       this.updaterStage
           .set("Failed to get update for flow " + pair.getSecond().getExecutionId());
 
-      final ExecutionReference ref = pair.getFirst();
-      ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
-      ref.setNumErrors(ref.getNumErrors() + 1);
-      if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
-          || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
-        // if any of the executions has failed many enough updates, alert
-        sendUnresponsiveEmail = true;
+      if (executorRemoved) {
+        logger.warn("Finalizing execution " + flow.getExecutionId()
+            + ". Executor is removed");
+        finalizeFlows.add(flow);
+      } else {
+        final ExecutionReference ref = pair.getFirst();
+        ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
+        ref.setNumErrors(ref.getNumErrors() + 1);
+        if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
+            || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
+          // if any of the executions has failed many enough updates, alert
+          sendUnresponsiveEmail = true;
+        }
       }
     }
     if (sendUnresponsiveEmail) {
@@ -159,6 +168,18 @@ public class RunningExecutionsUpdater {
     }
   }
 
+  private boolean isExecutorRemoved(final int id) {
+    Executor fetchedExecutor;
+    try {
+      fetchedExecutor = this.executorLoader.fetchExecutor(id);
+    } catch (final ExecutorManagerException e) {
+      logger.error("Couldn't check if executor exists", e);
+      // don't know if removed or not -> default to false
+      return false;
+    }
+    return fetchedExecutor == null;
+  }
+
   /* Group Executable flow by Executors to reduce number of REST calls */
   private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
     final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 81e1419..dae0552 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -151,7 +151,7 @@ public class ExecutorManagerTest {
     final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
         new RunningExecutionsUpdater(
             this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
-            this.runningExecutions, executionFinalizer), this.runningExecutions);
+            this.runningExecutions, executionFinalizer, this.loader), this.runningExecutions);
     updaterThread.waitTimeIdleMs = 0;
     updaterThread.waitTimeMs = 0;
     final ExecutorManager executorManager = new ExecutorManager(this.props, this.loader,
diff --git a/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
index bc166f9..c4ed295 100644
--- a/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
@@ -1,6 +1,7 @@
 package azkaban.executor;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
@@ -40,6 +41,8 @@ public class RunningExecutionsUpdaterTest {
   ExecutionFinalizer executionFinalizer;
   @Mock
   private Alerter mailAlerter;
+  @Mock
+  private ExecutorLoader executorLoader;
 
   private ExecutableFlow execution;
   private RunningExecutions runningExecutions;
@@ -57,7 +60,8 @@ public class RunningExecutionsUpdaterTest {
     this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
         new ExecutionReference(EXECUTION_ID_77, this.activeExecutor), this.execution));
     this.updater = new RunningExecutionsUpdater(this.updaterStage, this.alerterHolder,
-        this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer);
+        this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer,
+        this.executorLoader);
     when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
   }
 
@@ -100,6 +104,7 @@ public class RunningExecutionsUpdaterTest {
   @Test
   public void updateExecutionsUpdateCallFails() throws Exception {
     mockUpdateCallFails();
+    when(this.executorLoader.fetchExecutor(anyInt())).thenReturn(this.activeExecutor);
     DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
     for (int i = 0; i < this.updater.numErrorsBeforeUnresponsiveEmail; i++) {
       this.updater.updateExecutions();
@@ -108,10 +113,33 @@ public class RunningExecutionsUpdaterTest {
     }
     verify(this.mailAlerter).alertOnFailedUpdate(
         this.activeExecutor, Collections.singletonList(this.execution), API_CALL_EXCEPTION);
+    verifyZeroInteractions(this.executionFinalizer);
+  }
 
-    // TODO change to checking if executor exist in the DB any more
+  /**
+   * Should finalize execution if executor doesn't exist in the DB.
+   */
+  @Test
+  public void updateExecutionsUpdateCallFailsExecutorDoesntExist() throws Exception {
+    mockUpdateCallFails();
+    when(this.executorLoader.fetchExecutor(anyInt())).thenReturn(null);
+    DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
+    this.updater.updateExecutions();
+    verify(this.executionFinalizer).finalizeFlow(
+        this.execution, "Not running on the assigned executor (any more)", null);
+  }
+
+  /**
+   * Shouldn't finalize executions if executor's existence can't be checked.
+   */
+  @Test
+  public void updateExecutionsUpdateCallFailsExecutorCheckThrows() throws Exception {
+    mockUpdateCallFails();
+    when(this.executorLoader.fetchExecutor(anyInt()))
+        .thenThrow(new ExecutorManagerException("Mocked fetchExecutor failure"));
+    DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
+    this.updater.updateExecutions();
     verifyZeroInteractions(this.executionFinalizer);
-    // verify(this.executionFinalizer).finalizeFlow(this.execution, "TODO", null);
   }
 
   private void mockFlowStillRunning() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 6021af2..8871bb4 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -86,7 +86,7 @@ public class TriggerManagerDeadlockTest {
   private RunningExecutionsUpdaterThread getRunningExecutionsUpdaterThread() {
     return new RunningExecutionsUpdaterThread(new RunningExecutionsUpdater(
         this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
-        this.runningExecutions, this.executionFinalizer), runningExecutions);
+        this.runningExecutions, this.executionFinalizer, this.execLoader), this.runningExecutions);
   }
 
   @After