azkaban-aplcache
Changes
build.gradle 2(+1 -1)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c89ed33..52bd8a4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1385,6 +1385,13 @@ public class ExecutorManager extends EventHandler implements
// move from flow to running flows
this.runningFlows.put(exflow.getExecutionId(),
new Pair<>(reference, exflow));
+ synchronized (this) {
+ // Wake up ExecutingManagerUpdaterThread from wait() so that it will immediately check status
+ // from executor(s). Normally flows will run at least some time and can't be cleaned up
+ // immediately, so there will be another wait round (or many, actually), but for unit tests
+ // this is significant to let them run quickly.
+ this.notifyAll();
+ }
logger.info(String.format(
"Successfully dispatched exec %d with error count %d",
@@ -1521,12 +1528,12 @@ public class ExecutorManager extends EventHandler implements
ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
- synchronized (this) {
+ synchronized (ExecutorManager.this) {
try {
if (ExecutorManager.this.runningFlows.size() > 0) {
- this.wait(this.waitTimeMs);
+ ExecutorManager.this.wait(this.waitTimeMs);
} else {
- this.wait(this.waitTimeIdleMs);
+ ExecutorManager.this.wait(this.waitTimeIdleMs);
}
} catch (final InterruptedException e) {
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 9ee67e3..666dfc3 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,7 +16,11 @@
package azkaban.executor;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -30,15 +34,17 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import com.codahale.metrics.MetricRegistry;
-import java.io.IOException;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -68,10 +74,17 @@ public class ExecutorManagerTest {
this.loader = new MockExecutorLoader();
}
+ @After
+ public void tearDown() {
+ if (this.manager != null) {
+ this.manager.shutdown();
+ }
+ }
+
/*
* Helper method to create a ExecutorManager Instance
*/
- private ExecutorManager createMultiExecutorManagerInstance() throws ExecutorManagerException {
+ private ExecutorManager createMultiExecutorManagerInstance() throws Exception {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
this.loader.addExecutor("localhost", 12345);
@@ -84,7 +97,7 @@ public class ExecutorManagerTest {
* remote
*/
@Test(expected = ExecutorManagerException.class)
- public void testNoExecutorScenario() throws ExecutorManagerException {
+ public void testNoExecutorScenario() throws Exception {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
@SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
}
@@ -93,7 +106,7 @@ public class ExecutorManagerTest {
* Test backward compatibility with just local executor
*/
@Test
- public void testLocalExecutorScenario() throws ExecutorManagerException {
+ public void testLocalExecutorScenario() throws Exception {
this.props.put("executor.port", 12345);
final ExecutorManager manager = createExecutorManager();
final Set<Executor> activeExecutors =
@@ -111,7 +124,7 @@ public class ExecutorManagerTest {
* Test executor manager initialization with multiple executors
*/
@Test
- public void testMultipleExecutorScenario() throws ExecutorManagerException {
+ public void testMultipleExecutorScenario() throws Exception {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final Executor executor1 = this.loader.addExecutor("localhost", 12345);
final Executor executor2 = this.loader.addExecutor("localhost", 12346);
@@ -133,7 +146,7 @@ public class ExecutorManagerTest {
* Test executor manager active executor reload
*/
@Test
- public void testSetupExecutorsSucess() throws ExecutorManagerException {
+ public void testSetupExecutorsSucess() throws Exception {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final Executor executor1 = this.loader.addExecutor("localhost", 12345);
final ExecutorManager manager = createExecutorManager();
@@ -156,7 +169,7 @@ public class ExecutorManagerTest {
* executors
*/
@Test(expected = ExecutorManagerException.class)
- public void testSetupExecutorsException() throws ExecutorManagerException {
+ public void testSetupExecutorsException() throws Exception {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final Executor executor1 = this.loader.addExecutor("localhost", 12345);
final ExecutorManager manager = createExecutorManager();
@@ -173,7 +186,7 @@ public class ExecutorManagerTest {
/* Test disabling queue process thread to pause dispatching */
@Test
- public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+ public void testDisablingQueueProcessThread() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
manager.enableQueueProcessorThread();
Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
@@ -183,7 +196,7 @@ public class ExecutorManagerTest {
/* Test renabling queue process thread to pause restart dispatching */
@Test
- public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+ public void testEnablingQueueProcessThread() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
manager.enableQueueProcessorThread();
@@ -192,7 +205,7 @@ public class ExecutorManagerTest {
/* Test submit a non-dispatched flow */
@Test
- public void testQueuedFlows() throws ExecutorManagerException, IOException {
+ public void testQueuedFlows() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.setExecutionId(1);
@@ -226,8 +239,7 @@ public class ExecutorManagerTest {
/* Test submit duplicate flow when previous instance is not dispatched */
@Test(expected = ExecutorManagerException.class)
- public void testDuplicateQueuedFlows() throws ExecutorManagerException,
- IOException {
+ public void testDuplicateQueuedFlows() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.getExecutionOptions().setConcurrentOption(
@@ -243,7 +255,7 @@ public class ExecutorManagerTest {
* non-dispatched flow
*/
@Test
- public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+ public void testKillQueuedFlow() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
final User testUser = TestUtils.getTestUser();
@@ -257,12 +269,38 @@ public class ExecutorManagerTest {
Assert.assertFalse(manager.getRunningFlows().contains(flow1));
}
+ /* Flow has been running on an executor but is not any more (for example because of restart) */
+ @Test
+ public void testNotFoundFlows() throws Exception {
+ testSetUpForRunningFlows();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+
+ mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
+ Collections.singletonList(ImmutableMap.of(
+ ConnectorParams.UPDATE_MAP_EXEC_ID, -1,
+ "error", "Flow does not exist"))));
+
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
+ Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+ }
+
+ // Suppress "unchecked generic array creation for varargs parameter".
+ // No way to avoid this when mocking a method with generic varags.
+ @SuppressWarnings("unchecked")
+ private void mockUpdateResponse(
+ final Map<String, List<Map<String, Object>>> map) throws Exception {
+ doReturn(map).when(this.apiGateway).callWithExecutionId(
+ any(), anyInt(), eq(ConnectorParams.UPDATE_ACTION), any(), any(), any(), any());
+ }
+
/*
* Added tests for runningFlows
* TODO: When removing queuedFlows cache, will refactor rest of the ExecutorManager test cases
*/
@Test
- public void testSubmitFlows() throws ExecutorManagerException, IOException {
+ public void testSubmitFlows() throws Exception {
testSetUpForRunningFlows();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
this.manager.submitExecutableFlow(flow1, this.user.getUserId());
@@ -272,7 +310,7 @@ public class ExecutorManagerTest {
@Ignore
@Test
- public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
+ public void testFetchAllActiveFlows() throws Exception {
testSetUpForRunningFlows();
final List<ExecutableFlow> flows = this.manager.getRunningFlows();
for (final Pair<ExecutionReference, ExecutableFlow> pair : this.activeFlows.values()) {
@@ -282,7 +320,7 @@ public class ExecutorManagerTest {
@Ignore
@Test
- public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
+ public void testFetchActiveFlowByProject() throws Exception {
testSetUpForRunningFlows();
final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
this.flow1.getFlowId());
@@ -293,7 +331,7 @@ public class ExecutorManagerTest {
@Ignore
@Test
- public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
+ public void testFetchActiveFlowWithExecutor() throws Exception {
testSetUpForRunningFlows();
final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
this.manager.getActiveFlowsWithExecutor();
@@ -304,7 +342,7 @@ public class ExecutorManagerTest {
}
@Test
- public void testFetchAllActiveExecutorServerHosts() throws ExecutorManagerException, IOException {
+ public void testFetchAllActiveExecutorServerHosts() throws Exception {
testSetUpForRunningFlows();
final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
@@ -318,8 +356,7 @@ public class ExecutorManagerTest {
/*
* TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
*/
- private void testSetUpForRunningFlows()
- throws ExecutorManagerException, IOException {
+ private void testSetUpForRunningFlows() throws Exception {
this.loader = mock(ExecutorLoader.class);
this.apiGateway = mock(ExecutorApiGateway.class);
this.user = TestUtils.getTestUser();
@@ -349,4 +386,19 @@ public class ExecutorManagerTest {
this.activeFlows.put(this.flow2.getExecutionId(), new Pair<>(ref2, this.flow2));
when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
}
+
+ private ExecutableFlow waitFlowFinished(final ExecutableFlow flow) throws Exception {
+ TestUtils.await().untilAsserted(() -> assertThat(getFlowStatus(flow))
+ .matches(Status::isStatusFinished, "isStatusFinished"));
+ return fetchFlow(flow);
+ }
+
+ private Status getFlowStatus(final ExecutableFlow flow) throws Exception {
+ return fetchFlow(flow) != null ? fetchFlow(flow).getStatus() : null;
+ }
+
+ private ExecutableFlow fetchFlow(final ExecutableFlow flow) throws ExecutorManagerException {
+ return this.loader.fetchExecutableFlow(flow.getExecutionId());
+ }
+
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index 4c3db69..aa66d9a 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -26,6 +26,9 @@ import azkaban.user.XmlUserManager;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
/**
* Commonly used utils method for unit/integration tests
@@ -65,4 +68,13 @@ public class TestUtils {
final UserManager manager = new XmlUserManager(props);
return manager;
}
+
+ /**
+ * Wait for 10 seconds, max. Poll every 10ms.
+ */
+ public static ConditionFactory await() {
+ return Awaitility.await().atMost(10L, TimeUnit.SECONDS)
+ .pollInterval(10L, TimeUnit.MILLISECONDS);
+ }
+
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index f49e688..d7b1bb3 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -31,13 +31,12 @@ import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.spi.EventType;
import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
-import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -317,8 +316,7 @@ public class JobRunnerTest {
Assert.assertTrue(logFile.exists());
// wait so that there's time to make the "DB update" for KILLED status
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(10L, TimeUnit.MILLISECONDS)
- .until(() -> loader.getNodeUpdateCount("testJob"), is(2));
+ TestUtils.await().until(() -> loader.getNodeUpdateCount("testJob"), is(2));
eventCollector.assertEvents(EventType.JOB_FINISHED);
}
build.gradle 2(+1 -1)
diff --git a/build.gradle b/build.gradle
index 7875ed3..b7d840b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -99,7 +99,7 @@ ext.deps = [
math3 : 'org.apache.commons:commons-math3:3.0',
metricsCore : 'io.dropwizard.metrics:metrics-core:3.1.0',
metricsJvm : 'io.dropwizard.metrics:metrics-jvm:3.1.0',
- mockito : 'org.mockito:mockito-all:1.10.19',
+ mockito : 'org.mockito:mockito-core:2.10.0',
mysqlConnector : 'mysql:mysql-connector-java:5.1.28',
quartz : 'org.quartz-scheduler:quartz:2.2.1',
restliGenerator : 'com.linkedin.pegasus:generator:' + versions.restli,