Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index f9b0cab..1e934f8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -48,18 +48,20 @@ public class RunningExecutionsUpdater {
private final ExecutorApiGateway apiGateway;
private final RunningExecutions runningExecutions;
private final ExecutionFinalizer executionFinalizer;
+ private final ExecutorLoader executorLoader;
@Inject
public RunningExecutionsUpdater(final ExecutorManagerUpdaterStage updaterStage,
final AlerterHolder alerterHolder, final CommonMetrics commonMetrics,
final ExecutorApiGateway apiGateway, final RunningExecutions runningExecutions,
- final ExecutionFinalizer executionFinalizer) {
+ final ExecutionFinalizer executionFinalizer, final ExecutorLoader executorLoader) {
this.updaterStage = updaterStage;
this.alerterHolder = alerterHolder;
this.commonMetrics = commonMetrics;
this.apiGateway = apiGateway;
this.runningExecutions = runningExecutions;
this.executionFinalizer = executionFinalizer;
+ this.executorLoader = executorLoader;
}
/**
@@ -93,7 +95,7 @@ public class RunningExecutionsUpdater {
try {
results = this.apiGateway.updateExecutions(executor, entry.getValue());
} catch (final ExecutorManagerException e) {
- handleException(entry, executor, e);
+ handleException(entry, executor, e, finalizeFlows);
}
if (results != null) {
@@ -133,24 +135,31 @@ public class RunningExecutionsUpdater {
}
private void handleException(final Entry<Optional<Executor>, List<ExecutableFlow>> entry,
- final Executor executor, final ExecutorManagerException e) {
+ final Executor executor, final ExecutorManagerException e,
+ final ArrayList<ExecutableFlow> finalizeFlows) {
logger.error("Failed to get update from executor " + executor.getHost(), e);
boolean sendUnresponsiveEmail = false;
+ final boolean executorRemoved = isExecutorRemoved(executor.getId());
for (final ExecutableFlow flow : entry.getValue()) {
final Pair<ExecutionReference, ExecutableFlow> pair =
this.runningExecutions.get().get(flow.getExecutionId());
- // TODO can runningFlows.get ever return null, causing NPE below?
this.updaterStage
.set("Failed to get update for flow " + pair.getSecond().getExecutionId());
- final ExecutionReference ref = pair.getFirst();
- ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
- ref.setNumErrors(ref.getNumErrors() + 1);
- if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
- || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
- // if any of the executions has failed many enough updates, alert
- sendUnresponsiveEmail = true;
+ if (executorRemoved) {
+ logger.warn("Finalizing execution " + flow.getExecutionId()
+ + ". Executor is removed");
+ finalizeFlows.add(flow);
+ } else {
+ final ExecutionReference ref = pair.getFirst();
+ ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
+ ref.setNumErrors(ref.getNumErrors() + 1);
+ if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
+ || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
+ // if any of the executions has failed many enough updates, alert
+ sendUnresponsiveEmail = true;
+ }
}
}
if (sendUnresponsiveEmail) {
@@ -159,6 +168,18 @@ public class RunningExecutionsUpdater {
}
}
+ private boolean isExecutorRemoved(final int id) {
+ Executor fetchedExecutor;
+ try {
+ fetchedExecutor = this.executorLoader.fetchExecutor(id);
+ } catch (final ExecutorManagerException e) {
+ logger.error("Couldn't check if executor exists", e);
+ // don't know if removed or not -> default to false
+ return false;
+ }
+ return fetchedExecutor == null;
+ }
+
/* Group Executable flow by Executors to reduce number of REST calls */
private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 81e1419..dae0552 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -151,7 +151,7 @@ public class ExecutorManagerTest {
final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
new RunningExecutionsUpdater(
this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
- this.runningExecutions, executionFinalizer), this.runningExecutions);
+ this.runningExecutions, executionFinalizer, this.loader), this.runningExecutions);
updaterThread.waitTimeIdleMs = 0;
updaterThread.waitTimeMs = 0;
final ExecutorManager executorManager = new ExecutorManager(this.props, this.loader,
diff --git a/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
index bc166f9..c4ed295 100644
--- a/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
@@ -1,6 +1,7 @@
package azkaban.executor;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
@@ -40,6 +41,8 @@ public class RunningExecutionsUpdaterTest {
ExecutionFinalizer executionFinalizer;
@Mock
private Alerter mailAlerter;
+ @Mock
+ private ExecutorLoader executorLoader;
private ExecutableFlow execution;
private RunningExecutions runningExecutions;
@@ -57,7 +60,8 @@ public class RunningExecutionsUpdaterTest {
this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
new ExecutionReference(EXECUTION_ID_77, this.activeExecutor), this.execution));
this.updater = new RunningExecutionsUpdater(this.updaterStage, this.alerterHolder,
- this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer);
+ this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer,
+ this.executorLoader);
when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
}
@@ -100,6 +104,7 @@ public class RunningExecutionsUpdaterTest {
@Test
public void updateExecutionsUpdateCallFails() throws Exception {
mockUpdateCallFails();
+ when(this.executorLoader.fetchExecutor(anyInt())).thenReturn(this.activeExecutor);
DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
for (int i = 0; i < this.updater.numErrorsBeforeUnresponsiveEmail; i++) {
this.updater.updateExecutions();
@@ -108,10 +113,33 @@ public class RunningExecutionsUpdaterTest {
}
verify(this.mailAlerter).alertOnFailedUpdate(
this.activeExecutor, Collections.singletonList(this.execution), API_CALL_EXCEPTION);
+ verifyZeroInteractions(this.executionFinalizer);
+ }
- // TODO change to checking if executor exist in the DB any more
+ /**
+ * Should finalize execution if executor doesn't exist in the DB.
+ */
+ @Test
+ public void updateExecutionsUpdateCallFailsExecutorDoesntExist() throws Exception {
+ mockUpdateCallFails();
+ when(this.executorLoader.fetchExecutor(anyInt())).thenReturn(null);
+ DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
+ this.updater.updateExecutions();
+ verify(this.executionFinalizer).finalizeFlow(
+ this.execution, "Not running on the assigned executor (any more)", null);
+ }
+
+ /**
+ * Shouldn't finalize executions if executor's existence can't be checked.
+ */
+ @Test
+ public void updateExecutionsUpdateCallFailsExecutorCheckThrows() throws Exception {
+ mockUpdateCallFails();
+ when(this.executorLoader.fetchExecutor(anyInt()))
+ .thenThrow(new ExecutorManagerException("Mocked fetchExecutor failure"));
+ DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
+ this.updater.updateExecutions();
verifyZeroInteractions(this.executionFinalizer);
- // verify(this.executionFinalizer).finalizeFlow(this.execution, "TODO", null);
}
private void mockFlowStillRunning() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 6021af2..8871bb4 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -86,7 +86,7 @@ public class TriggerManagerDeadlockTest {
private RunningExecutionsUpdaterThread getRunningExecutionsUpdaterThread() {
return new RunningExecutionsUpdaterThread(new RunningExecutionsUpdater(
this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
- this.runningExecutions, this.executionFinalizer), runningExecutions);
+ this.runningExecutions, this.executionFinalizer, this.execLoader), this.runningExecutions);
}
@After