azkaban-aplcache

Faster updater thread and test for an interrupted execution

9/16/2017 5:19:54 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c89ed33..52bd8a4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1385,6 +1385,13 @@ public class ExecutorManager extends EventHandler implements
     // move from flow to running flows
     this.runningFlows.put(exflow.getExecutionId(),
         new Pair<>(reference, exflow));
+    synchronized (this) {
+      // Wake up ExecutingManagerUpdaterThread from wait() so that it will immediately check status
+      // from executor(s). Normally flows will run at least some time and can't be cleaned up
+      // immediately, so there will be another wait round (or many, actually), but for unit tests
+      // this is significant to let them run quickly.
+      this.notifyAll();
+    }
 
     logger.info(String.format(
         "Successfully dispatched exec %d with error count %d",
@@ -1521,12 +1528,12 @@ public class ExecutorManager extends EventHandler implements
 
           ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
 
-          synchronized (this) {
+          synchronized (ExecutorManager.this) {
             try {
               if (ExecutorManager.this.runningFlows.size() > 0) {
-                this.wait(this.waitTimeMs);
+                ExecutorManager.this.wait(this.waitTimeMs);
               } else {
-                this.wait(this.waitTimeIdleMs);
+                ExecutorManager.this.wait(this.waitTimeIdleMs);
               }
             } catch (final InterruptedException e) {
             }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 9ee67e3..666dfc3 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,7 +16,11 @@
 
 package azkaban.executor;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -30,15 +34,17 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
 import com.codahale.metrics.MetricRegistry;
-import java.io.IOException;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -68,10 +74,17 @@ public class ExecutorManagerTest {
     this.loader = new MockExecutorLoader();
   }
 
+  @After
+  public void tearDown() {
+    if (this.manager != null) {
+      this.manager.shutdown();
+    }
+  }
+
   /*
    * Helper method to create a ExecutorManager Instance
    */
-  private ExecutorManager createMultiExecutorManagerInstance() throws ExecutorManagerException {
+  private ExecutorManager createMultiExecutorManagerInstance() throws Exception {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
     this.loader.addExecutor("localhost", 12345);
@@ -84,7 +97,7 @@ public class ExecutorManagerTest {
    * remote
    */
   @Test(expected = ExecutorManagerException.class)
-  public void testNoExecutorScenario() throws ExecutorManagerException {
+  public void testNoExecutorScenario() throws Exception {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     @SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
   }
@@ -93,7 +106,7 @@ public class ExecutorManagerTest {
    * Test backward compatibility with just local executor
    */
   @Test
-  public void testLocalExecutorScenario() throws ExecutorManagerException {
+  public void testLocalExecutorScenario() throws Exception {
     this.props.put("executor.port", 12345);
     final ExecutorManager manager = createExecutorManager();
     final Set<Executor> activeExecutors =
@@ -111,7 +124,7 @@ public class ExecutorManagerTest {
    * Test executor manager initialization with multiple executors
    */
   @Test
-  public void testMultipleExecutorScenario() throws ExecutorManagerException {
+  public void testMultipleExecutorScenario() throws Exception {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final Executor executor1 = this.loader.addExecutor("localhost", 12345);
     final Executor executor2 = this.loader.addExecutor("localhost", 12346);
@@ -133,7 +146,7 @@ public class ExecutorManagerTest {
    * Test executor manager active executor reload
    */
   @Test
-  public void testSetupExecutorsSucess() throws ExecutorManagerException {
+  public void testSetupExecutorsSucess() throws Exception {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final Executor executor1 = this.loader.addExecutor("localhost", 12345);
     final ExecutorManager manager = createExecutorManager();
@@ -156,7 +169,7 @@ public class ExecutorManagerTest {
    * executors
    */
   @Test(expected = ExecutorManagerException.class)
-  public void testSetupExecutorsException() throws ExecutorManagerException {
+  public void testSetupExecutorsException() throws Exception {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final Executor executor1 = this.loader.addExecutor("localhost", 12345);
     final ExecutorManager manager = createExecutorManager();
@@ -173,7 +186,7 @@ public class ExecutorManagerTest {
 
   /* Test disabling queue process thread to pause dispatching */
   @Test
-  public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+  public void testDisablingQueueProcessThread() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
     manager.enableQueueProcessorThread();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
@@ -183,7 +196,7 @@ public class ExecutorManagerTest {
 
   /* Test renabling queue process thread to pause restart dispatching */
   @Test
-  public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+  public void testEnablingQueueProcessThread() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
     manager.enableQueueProcessorThread();
@@ -192,7 +205,7 @@ public class ExecutorManagerTest {
 
   /* Test submit a non-dispatched flow */
   @Test
-  public void testQueuedFlows() throws ExecutorManagerException, IOException {
+  public void testQueuedFlows() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.setExecutionId(1);
@@ -226,8 +239,7 @@ public class ExecutorManagerTest {
 
   /* Test submit duplicate flow when previous instance is not dispatched */
   @Test(expected = ExecutorManagerException.class)
-  public void testDuplicateQueuedFlows() throws ExecutorManagerException,
-      IOException {
+  public void testDuplicateQueuedFlows() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.getExecutionOptions().setConcurrentOption(
@@ -243,7 +255,7 @@ public class ExecutorManagerTest {
    * non-dispatched flow
    */
   @Test
-  public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+  public void testKillQueuedFlow() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     final User testUser = TestUtils.getTestUser();
@@ -257,12 +269,38 @@ public class ExecutorManagerTest {
     Assert.assertFalse(manager.getRunningFlows().contains(flow1));
   }
 
+  /* Flow has been running on an executor but is not any more (for example because of restart) */
+  @Test
+  public void testNotFoundFlows() throws Exception {
+    testSetUpForRunningFlows();
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+
+    mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
+        Collections.singletonList(ImmutableMap.of(
+            ConnectorParams.UPDATE_MAP_EXEC_ID, -1,
+            "error", "Flow does not exist"))));
+
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
+    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+  }
+
+  // Suppress "unchecked generic array creation for varargs parameter".
+  // No way to avoid this when mocking a method with generic varags.
+  @SuppressWarnings("unchecked")
+  private void mockUpdateResponse(
+      final Map<String, List<Map<String, Object>>> map) throws Exception {
+    doReturn(map).when(this.apiGateway).callWithExecutionId(
+        any(), anyInt(), eq(ConnectorParams.UPDATE_ACTION), any(), any(), any(), any());
+  }
+
   /*
    * Added tests for runningFlows
    * TODO: When removing queuedFlows cache, will refactor rest of the ExecutorManager test cases
    */
   @Test
-  public void testSubmitFlows() throws ExecutorManagerException, IOException {
+  public void testSubmitFlows() throws Exception {
     testSetUpForRunningFlows();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     this.manager.submitExecutableFlow(flow1, this.user.getUserId());
@@ -272,7 +310,7 @@ public class ExecutorManagerTest {
 
   @Ignore
   @Test
-  public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
+  public void testFetchAllActiveFlows() throws Exception {
     testSetUpForRunningFlows();
     final List<ExecutableFlow> flows = this.manager.getRunningFlows();
     for (final Pair<ExecutionReference, ExecutableFlow> pair : this.activeFlows.values()) {
@@ -282,7 +320,7 @@ public class ExecutorManagerTest {
 
   @Ignore
   @Test
-  public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
+  public void testFetchActiveFlowByProject() throws Exception {
     testSetUpForRunningFlows();
     final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
         this.flow1.getFlowId());
@@ -293,7 +331,7 @@ public class ExecutorManagerTest {
 
   @Ignore
   @Test
-  public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
+  public void testFetchActiveFlowWithExecutor() throws Exception {
     testSetUpForRunningFlows();
     final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
         this.manager.getActiveFlowsWithExecutor();
@@ -304,7 +342,7 @@ public class ExecutorManagerTest {
   }
 
   @Test
-  public void testFetchAllActiveExecutorServerHosts() throws ExecutorManagerException, IOException {
+  public void testFetchAllActiveExecutorServerHosts() throws Exception {
     testSetUpForRunningFlows();
     final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
     final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
@@ -318,8 +356,7 @@ public class ExecutorManagerTest {
   /*
    * TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
    */
-  private void testSetUpForRunningFlows()
-      throws ExecutorManagerException, IOException {
+  private void testSetUpForRunningFlows() throws Exception {
     this.loader = mock(ExecutorLoader.class);
     this.apiGateway = mock(ExecutorApiGateway.class);
     this.user = TestUtils.getTestUser();
@@ -349,4 +386,19 @@ public class ExecutorManagerTest {
     this.activeFlows.put(this.flow2.getExecutionId(), new Pair<>(ref2, this.flow2));
     when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
   }
+
+  private ExecutableFlow waitFlowFinished(final ExecutableFlow flow) throws Exception {
+    TestUtils.await().untilAsserted(() -> assertThat(getFlowStatus(flow))
+        .matches(Status::isStatusFinished, "isStatusFinished"));
+    return fetchFlow(flow);
+  }
+
+  private Status getFlowStatus(final ExecutableFlow flow) throws Exception {
+    return fetchFlow(flow) != null ? fetchFlow(flow).getStatus() : null;
+  }
+
+  private ExecutableFlow fetchFlow(final ExecutableFlow flow) throws ExecutorManagerException {
+    return this.loader.fetchExecutableFlow(flow.getExecutionId());
+  }
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index 4c3db69..aa66d9a 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -26,6 +26,9 @@ import azkaban.user.XmlUserManager;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
 
 /**
  * Commonly used utils method for unit/integration tests
@@ -65,4 +68,13 @@ public class TestUtils {
     final UserManager manager = new XmlUserManager(props);
     return manager;
   }
+
+  /**
+   * Wait for 10 seconds, max. Poll every 10ms.
+   */
+  public static ConditionFactory await() {
+    return Awaitility.await().atMost(10L, TimeUnit.SECONDS)
+        .pollInterval(10L, TimeUnit.MILLISECONDS);
+  }
+
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index f49e688..d7b1bb3 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -31,13 +31,12 @@ import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.spi.EventType;
 import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
-import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -317,8 +316,7 @@ public class JobRunnerTest {
     Assert.assertTrue(logFile.exists());
 
     // wait so that there's time to make the "DB update" for KILLED status
-    Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(10L, TimeUnit.MILLISECONDS)
-        .until(() -> loader.getNodeUpdateCount("testJob"), is(2));
+    TestUtils.await().until(() -> loader.getNodeUpdateCount("testJob"), is(2));
     eventCollector.assertEvents(EventType.JOB_FINISHED);
   }
 

build.gradle 2(+1 -1)

diff --git a/build.gradle b/build.gradle
index 7875ed3..b7d840b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -99,7 +99,7 @@ ext.deps = [
         math3               : 'org.apache.commons:commons-math3:3.0',
         metricsCore         : 'io.dropwizard.metrics:metrics-core:3.1.0',
         metricsJvm          : 'io.dropwizard.metrics:metrics-jvm:3.1.0',
-        mockito             : 'org.mockito:mockito-all:1.10.19',
+        mockito             : 'org.mockito:mockito-core:2.10.0',
         mysqlConnector      : 'mysql:mysql-connector-java:5.1.28',
         quartz              : 'org.quartz-scheduler:quartz:2.2.1',
         restliGenerator     : 'com.linkedin.pegasus:generator:' + versions.restli,