azkaban-aplcache

Create a unit test for RunningExecutionsUpdater + minor refactoring

10/18/2018 5:15:15 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
index b536555..d8d98ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -125,4 +125,24 @@ public class ExecutorApiGateway {
     return this.apiClient.httpPost(uri, paramList);
   }
 
+  public Map<String, Object> updateExecutions(final Executor executor,
+      final List<ExecutableFlow> executions) throws ExecutorManagerException {
+    final List<Long> updateTimesList = new ArrayList<>();
+    final List<Integer> executionIdsList = new ArrayList<>();
+    // We pack the parameters of the same host together before query
+    for (final ExecutableFlow flow : executions) {
+      executionIdsList.add(flow.getExecutionId());
+      updateTimesList.add(flow.getUpdateTime());
+    }
+    final Pair<String, String> updateTimes = new Pair<>(
+        ConnectorParams.UPDATE_TIME_LIST_PARAM,
+        JSONUtils.toJSON(updateTimesList));
+    final Pair<String, String> executionIds = new Pair<>(
+        ConnectorParams.EXEC_ID_LIST_PARAM,
+        JSONUtils.toJSON(executionIdsList));
+
+    return callWithExecutionId(executor.getHost(), executor.getPort(),
+        ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index e15e0c8..f9b0cab 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -18,7 +18,6 @@ package azkaban.executor;
 
 import azkaban.alert.Alerter;
 import azkaban.metrics.CommonMetrics;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,6 +27,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import javax.inject.Inject;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 /**
  * Updates running executions.
@@ -40,8 +40,8 @@ public class RunningExecutionsUpdater {
   // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
   private final int numErrorsBetweenUnresponsiveEmail = 360;
   // First email is sent after 1 minute of unresponsiveness
-  private final int numErrorsBeforeUnresponsiveEmail = 6;
-  private final long errorThreshold = 10000;
+  final int numErrorsBeforeUnresponsiveEmail = 6;
+  final long errorThreshold = 10000;
   private final ExecutorManagerUpdaterStage updaterStage;
   private final AlerterHolder alerterHolder;
   private final CommonMetrics commonMetrics;
@@ -74,8 +74,6 @@ public class RunningExecutionsUpdater {
 
     for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
         .entrySet()) {
-      final List<Long> updateTimesList = new ArrayList<>();
-      final List<Integer> executionIdsList = new ArrayList<>();
 
       final Optional<Executor> executorOption = entry.getKey();
       if (!executorOption.isPresent()) {
@@ -91,24 +89,9 @@ public class RunningExecutionsUpdater {
       this.updaterStage.set("Starting update flows on " + executor.getHost() + ":"
           + executor.getPort());
 
-      // We pack the parameters of the same host together before we
-      // query.
-      fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
-          updateTimesList);
-
-      final Pair<String, String> updateTimes =
-          new Pair<>(
-              ConnectorParams.UPDATE_TIME_LIST_PARAM,
-              JSONUtils.toJSON(updateTimesList));
-      final Pair<String, String> executionIds =
-          new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
-              JSONUtils.toJSON(executionIdsList));
-
       Map<String, Object> results = null;
       try {
-        results = this.apiGateway.callWithExecutionId(executor.getHost(),
-            executor.getPort(), ConnectorParams.UPDATE_ACTION,
-            null, null, executionIds, updateTimes);
+        results = this.apiGateway.updateExecutions(executor, entry.getValue());
       } catch (final ExecutorManagerException e) {
         handleException(entry, executor, e);
       }
@@ -149,8 +132,8 @@ public class RunningExecutionsUpdater {
     this.updaterStage.set("Updated all active flows. Waiting for next round.");
   }
 
-  private void handleException(Entry<Optional<Executor>, List<ExecutableFlow>> entry,
-      Executor executor, ExecutorManagerException e) {
+  private void handleException(final Entry<Optional<Executor>, List<ExecutableFlow>> entry,
+      final Executor executor, final ExecutorManagerException e) {
     logger.error("Failed to get update from executor " + executor.getHost(), e);
     boolean sendUnresponsiveEmail = false;
     for (final ExecutableFlow flow : entry.getValue()) {
@@ -162,7 +145,7 @@ public class RunningExecutionsUpdater {
           .set("Failed to get update for flow " + pair.getSecond().getExecutionId());
 
       final ExecutionReference ref = pair.getFirst();
-      ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
+      ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
       ref.setNumErrors(ref.getNumErrors() + 1);
       if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
           || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
@@ -189,7 +172,7 @@ public class RunningExecutionsUpdater {
 
       // We can set the next check time to prevent the checking of certain
       // flows.
-      if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+      if (ref.getNextCheckTime() >= DateTime.now().getMillis()) {
         continue;
       }
 
@@ -205,14 +188,6 @@ public class RunningExecutionsUpdater {
     return exFlowMap;
   }
 
-  private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
-      final List<Integer> executionIds, final List<Long> updateTimes) {
-    for (final ExecutableFlow flow : flows) {
-      executionIds.add(flow.getExecutionId());
-      updateTimes.add(flow.getUpdateTime());
-    }
-  }
-
   private ExecutableFlow updateExecution(final Map<String, Object> updateData)
       throws ExecutorManagerException {
 
@@ -226,8 +201,10 @@ public class RunningExecutionsUpdater {
     final Pair<ExecutionReference, ExecutableFlow> refPair =
         this.runningExecutions.get().get(execId);
     if (refPair == null) {
+      // this shouldn't ever happen on real azkaban runtime.
+      // but this can easily happen in unit tests if there's some inconsistent mocking.
       throw new ExecutorManagerException(
-          "No running flow found with the execution id. Removing " + execId);
+          "No execution found in the map with the execution id any more. Removing " + execId);
     }
 
     final ExecutionReference ref = refPair.getFirst();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
index aad96b2..59bb3ce 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
@@ -16,21 +16,36 @@
 
 package azkaban.executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import com.google.common.collect.ImmutableMap;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 
 public class ExecutorApiGatewayTest {
 
   private ExecutorApiGateway gateway;
   private ExecutorApiClient client;
+  @Captor
+  ArgumentCaptor<List<Pair<String, String>>> params;
 
   @Before
   public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
     this.client = Mockito.mock(ExecutorApiClient.class);
     this.gateway = new ExecutorApiGateway(this.client);
   }
@@ -40,10 +55,26 @@ public class ExecutorApiGatewayTest {
     final ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89,
         10);
     final String json = JSONUtils.toJSON(exeInfo);
-    when(this.client.httpPost(Mockito.any(), Mockito.any())).thenReturn(json);
+    when(this.client.httpPost(any(), any())).thenReturn(json);
     final ExecutorInfo exeInfo2 = this.gateway
         .callForJsonType("localhost", 1234, "executor", null, ExecutorInfo.class);
     Assert.assertTrue(exeInfo.equals(exeInfo2));
   }
 
+  @Test
+  public void updateExecutions() throws Exception {
+    final ImmutableMap<String, String> map = ImmutableMap.of("test", "response");
+    when(this.client
+        .httpPost(eq(new URI("http://executor-2:1234/executor")), this.params.capture()))
+        .thenReturn(JSONUtils.toJSON(map));
+    final Map<String, Object> response = this.gateway
+        .updateExecutions(new Executor(2, "executor-2", 1234, true),
+            Collections.singletonList(new ExecutableFlow()));
+    assertEquals(map, response);
+    assertEquals(new Pair<>("executionId", "[-1]"), this.params.getValue().get(0));
+    assertEquals(new Pair<>("updatetime", "[-1]"), this.params.getValue().get(1));
+    assertEquals(new Pair<>("action", "update"), this.params.getValue().get(2));
+    assertEquals(new Pair<>("execid", "null"), this.params.getValue().get(3));
+    assertEquals(new Pair<>("user", null), this.params.getValue().get(4));
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index fedc383..48017c6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -18,7 +18,6 @@ package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
@@ -367,8 +366,7 @@ public class ExecutorManagerTest {
   @SuppressWarnings("unchecked")
   private void mockUpdateResponse(
       final Map<String, List<Map<String, Object>>> map) throws Exception {
-    doReturn(map).when(this.apiGateway).callWithExecutionId(
-        any(), anyInt(), eq(ConnectorParams.UPDATE_ACTION), any(), any(), any(), any());
+    doReturn(map).when(this.apiGateway).updateExecutions(any(), any());
   }
 
   /*
diff --git a/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
new file mode 100644
index 0000000..bc166f9
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/RunningExecutionsUpdaterTest.java
@@ -0,0 +1,166 @@
+package azkaban.executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import azkaban.alert.Alerter;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Pair;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.joda.time.DateTimeUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class RunningExecutionsUpdaterTest {
+
+  private static final int EXECUTION_ID_77 = 77;
+  private static final ExecutorManagerException API_CALL_EXCEPTION =
+      new ExecutorManagerException("Mocked API timeout");
+
+  @Mock
+  ExecutorManagerUpdaterStage updaterStage;
+  @Mock
+  AlerterHolder alerterHolder;
+  @Mock
+  CommonMetrics commonMetrics;
+  @Mock
+  ExecutorApiGateway apiGateway;
+  @Mock
+  ExecutionFinalizer executionFinalizer;
+  @Mock
+  private Alerter mailAlerter;
+
+  private ExecutableFlow execution;
+  private RunningExecutions runningExecutions;
+  private Executor activeExecutor;
+
+  private RunningExecutionsUpdater updater;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    this.execution = new ExecutableFlow();
+    this.execution.setExecutionId(EXECUTION_ID_77);
+    this.activeExecutor = new Executor(1, "activeExecutor-1", 9999, true);
+    this.runningExecutions = new RunningExecutions();
+    this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
+        new ExecutionReference(EXECUTION_ID_77, this.activeExecutor), this.execution));
+    this.updater = new RunningExecutionsUpdater(this.updaterStage, this.alerterHolder,
+        this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer);
+    when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
+  }
+
+  @After
+  public void tearDown() {
+    DateTimeUtils.setCurrentMillisSystem();
+  }
+
+  @Test
+  public void updateExecutionsStillRunning() throws Exception {
+    mockFlowStillRunning();
+    this.updater.updateExecutions();
+    verifyCallUpdateApi();
+    verifyZeroInteractions(this.executionFinalizer);
+  }
+
+  @Test
+  public void updateExecutionsSucceeded() throws Exception {
+    mockFlowSucceeded();
+    this.updater.updateExecutions();
+    verifyCallUpdateApi();
+    verifyFinalizeFlow();
+  }
+
+  @Test
+  public void updateExecutionsExecutorDoesNotExist() throws Exception {
+    mockExecutorDoesNotExist();
+    this.updater.updateExecutions();
+    verifyFinalizeFlow();
+  }
+
+  @Test
+  public void updateExecutionsFlowDoesNotExist() throws Exception {
+    mockFlowDoesNotExist();
+    this.updater.updateExecutions();
+    verifyCallUpdateApi();
+    verifyFinalizeFlow();
+  }
+
+  @Test
+  public void updateExecutionsUpdateCallFails() throws Exception {
+    mockUpdateCallFails();
+    DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
+    for (int i = 0; i < this.updater.numErrorsBeforeUnresponsiveEmail; i++) {
+      this.updater.updateExecutions();
+      DateTimeUtils.setCurrentMillisFixed(
+          DateTimeUtils.currentTimeMillis() + this.updater.errorThreshold + 1L);
+    }
+    verify(this.mailAlerter).alertOnFailedUpdate(
+        this.activeExecutor, Collections.singletonList(this.execution), API_CALL_EXCEPTION);
+
+    // TODO change to checking if executor exist in the DB any more
+    verifyZeroInteractions(this.executionFinalizer);
+    // verify(this.executionFinalizer).finalizeFlow(this.execution, "TODO", null);
+  }
+
+  private void mockFlowStillRunning() throws Exception {
+    mockUpdateResponse();
+  }
+
+  private void mockFlowSucceeded() throws Exception {
+    final Map<String, Object> executionMap = mockUpdateResponse();
+    executionMap.put(ExecutableNode.STATUS_PARAM, Status.SUCCEEDED.getNumVal());
+  }
+
+  private void mockExecutorDoesNotExist() {
+    this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
+        new ExecutionReference(EXECUTION_ID_77, null), this.execution));
+  }
+
+  private void mockUpdateCallFails() throws ExecutorManagerException {
+    doThrow(API_CALL_EXCEPTION).when(this.apiGateway).updateExecutions(any(), any());
+  }
+
+  private void verifyCallUpdateApi() throws ExecutorManagerException {
+    verify(this.apiGateway).updateExecutions(
+        this.activeExecutor, Collections.singletonList(this.execution));
+  }
+
+  private void mockFlowDoesNotExist() throws Exception {
+    final Map<String, Object> executionMap = mockUpdateResponse();
+    executionMap.put(ConnectorParams.RESPONSE_ERROR, "Flow does not exist");
+  }
+
+  private Map<String, Object> mockUpdateResponse() throws Exception {
+    final Map<String, Object> executionMap = new HashMap<>(ImmutableMap.of(
+        ConnectorParams.UPDATE_MAP_EXEC_ID, EXECUTION_ID_77));
+    mockUpdateResponse(ImmutableMap.of(
+        ConnectorParams.RESPONSE_UPDATED_FLOWS, Collections.singletonList(executionMap)));
+    return executionMap;
+  }
+
+  // Suppress "unchecked generic array creation for varargs parameter".
+  // No way to avoid this when mocking a method with generic varags.
+  @SuppressWarnings("unchecked")
+  private void mockUpdateResponse(
+      final Map<String, List<Map<String, Object>>> map) throws Exception {
+    doReturn(map).when(this.apiGateway).updateExecutions(any(), any());
+  }
+
+  private void verifyFinalizeFlow() {
+    verify(this.executionFinalizer).finalizeFlow(this.execution,
+        "Not running on the assigned executor (any more)", null);
+  }
+
+}