Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
index b536555..d8d98ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -125,4 +125,24 @@ public class ExecutorApiGateway {
return this.apiClient.httpPost(uri, paramList);
}
+ public Map<String, Object> updateExecutions(final Executor executor,
+ final List<ExecutableFlow> executions) throws ExecutorManagerException {
+ final List<Long> updateTimesList = new ArrayList<>();
+ final List<Integer> executionIdsList = new ArrayList<>();
+ // We pack the parameters of the same host together before query
+ for (final ExecutableFlow flow : executions) {
+ executionIdsList.add(flow.getExecutionId());
+ updateTimesList.add(flow.getUpdateTime());
+ }
+ final Pair<String, String> updateTimes = new Pair<>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
+ final Pair<String, String> executionIds = new Pair<>(
+ ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
+
+ return callWithExecutionId(executor.getHost(), executor.getPort(),
+ ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index e15e0c8..f9b0cab 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -18,7 +18,6 @@ package azkaban.executor;
import azkaban.alert.Alerter;
import azkaban.metrics.CommonMetrics;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,6 +27,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import javax.inject.Inject;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
/**
* Updates running executions.
@@ -40,8 +40,8 @@ public class RunningExecutionsUpdater {
// times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
private final int numErrorsBetweenUnresponsiveEmail = 360;
// First email is sent after 1 minute of unresponsiveness
- private final int numErrorsBeforeUnresponsiveEmail = 6;
- private final long errorThreshold = 10000;
+ final int numErrorsBeforeUnresponsiveEmail = 6;
+ final long errorThreshold = 10000;
private final ExecutorManagerUpdaterStage updaterStage;
private final AlerterHolder alerterHolder;
private final CommonMetrics commonMetrics;
@@ -74,8 +74,6 @@ public class RunningExecutionsUpdater {
for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
.entrySet()) {
- final List<Long> updateTimesList = new ArrayList<>();
- final List<Integer> executionIdsList = new ArrayList<>();
final Optional<Executor> executorOption = entry.getKey();
if (!executorOption.isPresent()) {
@@ -91,24 +89,9 @@ public class RunningExecutionsUpdater {
this.updaterStage.set("Starting update flows on " + executor.getHost() + ":"
+ executor.getPort());
- // We pack the parameters of the same host together before we
- // query.
- fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
- updateTimesList);
-
- final Pair<String, String> updateTimes =
- new Pair<>(
- ConnectorParams.UPDATE_TIME_LIST_PARAM,
- JSONUtils.toJSON(updateTimesList));
- final Pair<String, String> executionIds =
- new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
- JSONUtils.toJSON(executionIdsList));
-
Map<String, Object> results = null;
try {
- results = this.apiGateway.callWithExecutionId(executor.getHost(),
- executor.getPort(), ConnectorParams.UPDATE_ACTION,
- null, null, executionIds, updateTimes);
+ results = this.apiGateway.updateExecutions(executor, entry.getValue());
} catch (final ExecutorManagerException e) {
handleException(entry, executor, e);
}
@@ -149,8 +132,8 @@ public class RunningExecutionsUpdater {
this.updaterStage.set("Updated all active flows. Waiting for next round.");
}
- private void handleException(Entry<Optional<Executor>, List<ExecutableFlow>> entry,
- Executor executor, ExecutorManagerException e) {
+ private void handleException(final Entry<Optional<Executor>, List<ExecutableFlow>> entry,
+ final Executor executor, final ExecutorManagerException e) {
logger.error("Failed to get update from executor " + executor.getHost(), e);
boolean sendUnresponsiveEmail = false;
for (final ExecutableFlow flow : entry.getValue()) {
@@ -162,7 +145,7 @@ public class RunningExecutionsUpdater {
.set("Failed to get update for flow " + pair.getSecond().getExecutionId());
final ExecutionReference ref = pair.getFirst();
- ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
+ ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
ref.setNumErrors(ref.getNumErrors() + 1);
if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
|| ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
@@ -189,7 +172,7 @@ public class RunningExecutionsUpdater {
// We can set the next check time to prevent the checking of certain
// flows.
- if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+ if (ref.getNextCheckTime() >= DateTime.now().getMillis()) {
continue;
}
@@ -205,14 +188,6 @@ public class RunningExecutionsUpdater {
return exFlowMap;
}
- private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
- final List<Integer> executionIds, final List<Long> updateTimes) {
- for (final ExecutableFlow flow : flows) {
- executionIds.add(flow.getExecutionId());
- updateTimes.add(flow.getUpdateTime());
- }
- }
-
private ExecutableFlow updateExecution(final Map<String, Object> updateData)
throws ExecutorManagerException {
@@ -226,8 +201,10 @@ public class RunningExecutionsUpdater {
final Pair<ExecutionReference, ExecutableFlow> refPair =
this.runningExecutions.get().get(execId);
if (refPair == null) {
+ // this shouldn't ever happen on real azkaban runtime.
+ // but this can easily happen in unit tests if there's some inconsistent mocking.
throw new ExecutorManagerException(
- "No running flow found with the execution id. Removing " + execId);
+ "No execution found in the map with the execution id any more. Removing " + execId);
}
final ExecutionReference ref = refPair.getFirst();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
index aad96b2..59bb3ce 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
@@ -16,21 +16,36 @@
package azkaban.executor;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import com.google.common.collect.ImmutableMap;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
public class ExecutorApiGatewayTest {
private ExecutorApiGateway gateway;
private ExecutorApiClient client;
+ @Captor
+ ArgumentCaptor<List<Pair<String, String>>> params;
@Before
public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
this.client = Mockito.mock(ExecutorApiClient.class);
this.gateway = new ExecutorApiGateway(this.client);
}
@@ -40,10 +55,26 @@ public class ExecutorApiGatewayTest {
final ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89,
10);
final String json = JSONUtils.toJSON(exeInfo);
- when(this.client.httpPost(Mockito.any(), Mockito.any())).thenReturn(json);
+ when(this.client.httpPost(any(), any())).thenReturn(json);
final ExecutorInfo exeInfo2 = this.gateway
.callForJsonType("localhost", 1234, "executor", null, ExecutorInfo.class);
Assert.assertTrue(exeInfo.equals(exeInfo2));
}
+ @Test
+ public void updateExecutions() throws Exception {
+ final ImmutableMap<String, String> map = ImmutableMap.of("test", "response");
+ when(this.client
+ .httpPost(eq(new URI("http://executor-2:1234/executor")), this.params.capture()))
+ .thenReturn(JSONUtils.toJSON(map));
+ final Map<String, Object> response = this.gateway
+ .updateExecutions(new Executor(2, "executor-2", 1234, true),
+ Collections.singletonList(new ExecutableFlow()));
+ assertEquals(map, response);
+ assertEquals(new Pair<>("executionId", "[-1]"), this.params.getValue().get(0));
+ assertEquals(new Pair<>("updatetime", "[-1]"), this.params.getValue().get(1));
+ assertEquals(new Pair<>("action", "update"), this.params.getValue().get(2));
+ assertEquals(new Pair<>("execid", "null"), this.params.getValue().get(3));
+ assertEquals(new Pair<>("user", null), this.params.getValue().get(4));
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index fedc383..48017c6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -18,7 +18,6 @@ package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
@@ -367,8 +366,7 @@ public class ExecutorManagerTest {
@SuppressWarnings("unchecked")
private void mockUpdateResponse(
final Map<String, List<Map<String, Object>>> map) throws Exception {
- doReturn(map).when(this.apiGateway).callWithExecutionId(
- any(), anyInt(), eq(ConnectorParams.UPDATE_ACTION), any(), any(), any(), any());
+ doReturn(map).when(this.apiGateway).updateExecutions(any(), any());
}
/*
diff --git a/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
new file mode 100644
index 0000000..bc166f9
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
@@ -0,0 +1,166 @@
+package azkaban.executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import azkaban.alert.Alerter;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Pair;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.joda.time.DateTimeUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class RunningExecutionsUpdaterTest {
+
+ private static final int EXECUTION_ID_77 = 77;
+ private static final ExecutorManagerException API_CALL_EXCEPTION =
+ new ExecutorManagerException("Mocked API timeout");
+
+ @Mock
+ ExecutorManagerUpdaterStage updaterStage;
+ @Mock
+ AlerterHolder alerterHolder;
+ @Mock
+ CommonMetrics commonMetrics;
+ @Mock
+ ExecutorApiGateway apiGateway;
+ @Mock
+ ExecutionFinalizer executionFinalizer;
+ @Mock
+ private Alerter mailAlerter;
+
+ private ExecutableFlow execution;
+ private RunningExecutions runningExecutions;
+ private Executor activeExecutor;
+
+ private RunningExecutionsUpdater updater;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ this.execution = new ExecutableFlow();
+ this.execution.setExecutionId(EXECUTION_ID_77);
+ this.activeExecutor = new Executor(1, "activeExecutor-1", 9999, true);
+ this.runningExecutions = new RunningExecutions();
+ this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
+ new ExecutionReference(EXECUTION_ID_77, this.activeExecutor), this.execution));
+ this.updater = new RunningExecutionsUpdater(this.updaterStage, this.alerterHolder,
+ this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer);
+ when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
+ }
+
+ @After
+ public void tearDown() {
+ DateTimeUtils.setCurrentMillisSystem();
+ }
+
+ @Test
+ public void updateExecutionsStillRunning() throws Exception {
+ mockFlowStillRunning();
+ this.updater.updateExecutions();
+ verifyCallUpdateApi();
+ verifyZeroInteractions(this.executionFinalizer);
+ }
+
+ @Test
+ public void updateExecutionsSucceeded() throws Exception {
+ mockFlowSucceeded();
+ this.updater.updateExecutions();
+ verifyCallUpdateApi();
+ verifyFinalizeFlow();
+ }
+
+ @Test
+ public void updateExecutionsExecutorDoesNotExist() throws Exception {
+ mockExecutorDoesNotExist();
+ this.updater.updateExecutions();
+ verifyFinalizeFlow();
+ }
+
+ @Test
+ public void updateExecutionsFlowDoesNotExist() throws Exception {
+ mockFlowDoesNotExist();
+ this.updater.updateExecutions();
+ verifyCallUpdateApi();
+ verifyFinalizeFlow();
+ }
+
+ @Test
+ public void updateExecutionsUpdateCallFails() throws Exception {
+ mockUpdateCallFails();
+ DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
+ for (int i = 0; i < this.updater.numErrorsBeforeUnresponsiveEmail; i++) {
+ this.updater.updateExecutions();
+ DateTimeUtils.setCurrentMillisFixed(
+ DateTimeUtils.currentTimeMillis() + this.updater.errorThreshold + 1L);
+ }
+ verify(this.mailAlerter).alertOnFailedUpdate(
+ this.activeExecutor, Collections.singletonList(this.execution), API_CALL_EXCEPTION);
+
+ // TODO change to checking if executor exist in the DB any more
+ verifyZeroInteractions(this.executionFinalizer);
+ // verify(this.executionFinalizer).finalizeFlow(this.execution, "TODO", null);
+ }
+
+ private void mockFlowStillRunning() throws Exception {
+ mockUpdateResponse();
+ }
+
+ private void mockFlowSucceeded() throws Exception {
+ final Map<String, Object> executionMap = mockUpdateResponse();
+ executionMap.put(ExecutableNode.STATUS_PARAM, Status.SUCCEEDED.getNumVal());
+ }
+
+ private void mockExecutorDoesNotExist() {
+ this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
+ new ExecutionReference(EXECUTION_ID_77, null), this.execution));
+ }
+
+ private void mockUpdateCallFails() throws ExecutorManagerException {
+ doThrow(API_CALL_EXCEPTION).when(this.apiGateway).updateExecutions(any(), any());
+ }
+
+ private void verifyCallUpdateApi() throws ExecutorManagerException {
+ verify(this.apiGateway).updateExecutions(
+ this.activeExecutor, Collections.singletonList(this.execution));
+ }
+
+ private void mockFlowDoesNotExist() throws Exception {
+ final Map<String, Object> executionMap = mockUpdateResponse();
+ executionMap.put(ConnectorParams.RESPONSE_ERROR, "Flow does not exist");
+ }
+
+ private Map<String, Object> mockUpdateResponse() throws Exception {
+ final Map<String, Object> executionMap = new HashMap<>(ImmutableMap.of(
+ ConnectorParams.UPDATE_MAP_EXEC_ID, EXECUTION_ID_77));
+ mockUpdateResponse(ImmutableMap.of(
+ ConnectorParams.RESPONSE_UPDATED_FLOWS, Collections.singletonList(executionMap)));
+ return executionMap;
+ }
+
+ // Suppress "unchecked generic array creation for varargs parameter".
+ // No way to avoid this when mocking a method with generic varags.
+ @SuppressWarnings("unchecked")
+ private void mockUpdateResponse(
+ final Map<String, List<Map<String, Object>>> map) throws Exception {
+ doReturn(map).when(this.apiGateway).updateExecutions(any(), any());
+ }
+
+ private void verifyFinalizeFlow() {
+ verify(this.executionFinalizer).finalizeFlow(this.execution,
+ "Not running on the assigned executor (any more)", null);
+ }
+
+}