diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a83a798..65eb46f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -99,6 +99,7 @@ public class ExecutorManager extends EventHandler implements
private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
new ConcurrentHashMap<>();
private final ExecutingManagerUpdaterThread executingManager;
+ private final ExecutorApiClient apiClient;
QueuedExecutions queuedFlows;
File cacheDir;
private QueueProcessorThread queueProcessor;
@@ -114,11 +115,13 @@ public class ExecutorManager extends EventHandler implements
@Inject
public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
final AlerterHolder alerterHolder,
- final CommonMetrics commonMetrics) throws ExecutorManagerException {
+ final CommonMetrics commonMetrics,
+ final ExecutorApiClient apiClient) throws ExecutorManagerException {
this.alerterHolder = alerterHolder;
this.azkProps = azkProps;
this.commonMetrics = commonMetrics;
this.executorLoader = loader;
+ this.apiClient = apiClient;
this.setupExecutors();
this.loadRunningFlows();
@@ -1135,12 +1138,11 @@ public class ExecutorManager extends EventHandler implements
paramList = new ArrayList<>();
}
- final ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
@SuppressWarnings("unchecked") final URI uri =
ExecutorApiClient.buildUri(host, port, path, true,
paramList.toArray(new Pair[0]));
- return apiclient.httpGet(uri, null);
+ return this.apiClient.httpGet(uri, null);
}
/**
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index ce79de2..b4d195c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -59,30 +59,24 @@ public class ExecutorManagerTest {
private ExecutableFlow flow1;
private ExecutableFlow flow2;
private AlerterHolder alertHolder;
+ private ExecutorApiClient apiClient;
@Before
public void setup() {
this.props = AbstractMailerTest.createMailProperties();
this.alertHolder = new AlerterHolder(this.props, new Emailer(this.props, this.commonMetrics));
- }
-
- /* Helper method to create a ExecutorManager Instance */
- private ExecutorManager createMultiExecutorManagerInstance()
- throws ExecutorManagerException {
- return createMultiExecutorManagerInstance(new MockExecutorLoader());
+ this.loader = new MockExecutorLoader();
}
/*
- * Helper method to create a ExecutorManager Instance with the given
- * ExecutorLoader
+ * Helper method to create a ExecutorManager Instance
*/
- private ExecutorManager createMultiExecutorManagerInstance(
- final ExecutorLoader loader) throws ExecutorManagerException {
+ private ExecutorManager createMultiExecutorManagerInstance() throws ExecutorManagerException {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
- loader.addExecutor("localhost", 12345);
- loader.addExecutor("localhost", 12346);
- return new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+ this.loader.addExecutor("localhost", 12345);
+ this.loader.addExecutor("localhost", 12346);
+ return createExecutorManager();
}
/*
@@ -92,9 +86,7 @@ public class ExecutorManagerTest {
@Test(expected = ExecutorManagerException.class)
public void testNoExecutorScenario() throws ExecutorManagerException {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- final ExecutorLoader loader = new MockExecutorLoader();
- @SuppressWarnings("unused") final ExecutorManager manager =
- new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+ @SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
}
/*
@@ -103,9 +95,7 @@ public class ExecutorManagerTest {
@Test
public void testLocalExecutorScenario() throws ExecutorManagerException {
this.props.put("executor.port", 12345);
- final ExecutorLoader loader = new MockExecutorLoader();
- final ExecutorManager manager =
- new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+ final ExecutorManager manager = createExecutorManager();
final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
@@ -113,7 +103,7 @@ public class ExecutorManagerTest {
final Executor executor = activeExecutors.iterator().next();
Assert.assertEquals(executor.getHost(), "localhost");
Assert.assertEquals(executor.getPort(), 12345);
- Assert.assertArrayEquals(activeExecutors.toArray(), loader
+ Assert.assertArrayEquals(activeExecutors.toArray(), this.loader
.fetchActiveExecutors().toArray());
}
@@ -123,36 +113,38 @@ public class ExecutorManagerTest {
@Test
public void testMultipleExecutorScenario() throws ExecutorManagerException {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- final ExecutorLoader loader = new MockExecutorLoader();
- final Executor executor1 = loader.addExecutor("localhost", 12345);
- final Executor executor2 = loader.addExecutor("localhost", 12346);
+ final Executor executor1 = this.loader.addExecutor("localhost", 12345);
+ final Executor executor2 = this.loader.addExecutor("localhost", 12346);
- final ExecutorManager manager =
- new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+ final ExecutorManager manager = createExecutorManager();
final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[]{
executor1, executor2});
}
+ private ExecutorManager createExecutorManager()
+ throws ExecutorManagerException {
+ return new ExecutorManager(this.props, this.loader, this.alertHolder, this.commonMetrics,
+ this.apiClient);
+ }
+
/*
* Test executor manager active executor reload
*/
@Test
public void testSetupExecutorsSucess() throws ExecutorManagerException {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- final ExecutorLoader loader = new MockExecutorLoader();
- final Executor executor1 = loader.addExecutor("localhost", 12345);
- final ExecutorManager manager =
- new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+ final Executor executor1 = this.loader.addExecutor("localhost", 12345);
+ final ExecutorManager manager = createExecutorManager();
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
new Executor[]{executor1});
// mark older executor as inactive
executor1.setActive(false);
- loader.updateExecutor(executor1);
- final Executor executor2 = loader.addExecutor("localhost", 12346);
- final Executor executor3 = loader.addExecutor("localhost", 12347);
+ this.loader.updateExecutor(executor1);
+ final Executor executor2 = this.loader.addExecutor("localhost", 12346);
+ final Executor executor3 = this.loader.addExecutor("localhost", 12347);
manager.setupExecutors();
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
@@ -166,10 +158,8 @@ public class ExecutorManagerTest {
@Test(expected = ExecutorManagerException.class)
public void testSetupExecutorsException() throws ExecutorManagerException {
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- final ExecutorLoader loader = new MockExecutorLoader();
- final Executor executor1 = loader.addExecutor("localhost", 12345);
- final ExecutorManager manager =
- new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+ final Executor executor1 = this.loader.addExecutor("localhost", 12345);
+ final ExecutorManager manager = createExecutorManager();
final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(),
@@ -177,7 +167,7 @@ public class ExecutorManagerTest {
// mark older executor as inactive
executor1.setActive(false);
- loader.updateExecutor(executor1);
+ this.loader.updateExecutor(executor1);
manager.setupExecutors();
}
@@ -203,8 +193,7 @@ public class ExecutorManagerTest {
/* Test submit a non-dispatched flow */
@Test
public void testQueuedFlows() throws ExecutorManagerException, IOException {
- final ExecutorLoader loader = new MockExecutorLoader();
- final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ final ExecutorManager manager = createMultiExecutorManagerInstance();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.setExecutionId(1);
final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
@@ -217,7 +206,7 @@ public class ExecutorManagerTest {
final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
- loader.fetchQueuedFlows();
+ this.loader.fetchQueuedFlows();
Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
// Verify things are correctly setup in db
for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
@@ -255,15 +244,14 @@ public class ExecutorManagerTest {
*/
@Test
public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
- final ExecutorLoader loader = new MockExecutorLoader();
- final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ final ExecutorManager manager = createMultiExecutorManagerInstance();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
final User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.cancelFlow(flow1, testUser.getUserId());
final ExecutableFlow fetchedFlow =
- loader.fetchExecutableFlow(flow1.getExecutionId());
+ this.loader.fetchExecutableFlow(flow1.getExecutionId());
Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
Assert.assertFalse(manager.getRunningFlows().contains(flow1));
@@ -333,6 +321,7 @@ public class ExecutorManagerTest {
private void testSetUpForRunningFlows()
throws ExecutorManagerException, IOException {
this.loader = mock(ExecutorLoader.class);
+ this.apiClient = mock(ExecutorApiClient.class);
this.user = TestUtils.getTestUser();
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
//To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
@@ -346,8 +335,7 @@ public class ExecutorManagerTest {
executors.add(executor2);
when(this.loader.fetchActiveExecutors()).thenReturn(executors);
- this.manager = new ExecutorManager(this.props, this.loader, this.alertHolder,
- this.commonMetrics);
+ this.manager = createExecutorManager();
this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index d45992a..16fa1c2 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -16,7 +16,10 @@
package azkaban.trigger;
+import static org.mockito.Mockito.mock;
+
import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutorApiClient;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
@@ -42,6 +45,7 @@ public class TriggerManagerDeadlockTest {
TriggerLoader loader;
TriggerManager triggerManager;
ExecutorLoader execLoader;
+ ExecutorApiClient apiClient;
@Before
public void setup() throws ExecutorManagerException, TriggerManagerException {
@@ -50,10 +54,11 @@ public class TriggerManagerDeadlockTest {
props.put("trigger.scan.interval", 1000);
props.put("executor.port", 12321);
this.execLoader = new MockExecutorLoader();
+ this.apiClient = mock(ExecutorApiClient.class);
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
new AlerterHolder(props, new Emailer(props, commonMetrics)),
- commonMetrics);
+ commonMetrics, this.apiClient);
this.triggerManager = new TriggerManager(props, this.loader, executorManager);
}