azkaban-memoizeit

Guicify ExecutorApiClient (#1425) * Guicify ExecutorApiClient -

9/6/2017 3:19:57 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
index cd3d598..eafb823 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -18,6 +18,7 @@ package azkaban.executor;
 
 import azkaban.utils.RestfulApiClient;
 import java.io.IOException;
+import javax.inject.Singleton;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
 import org.apache.http.client.HttpResponseException;
@@ -27,24 +28,9 @@ import org.apache.http.util.EntityUtils;
  * Client class that will be used to handle all Restful API calls between Executor and the host
  * application.
  */
+@Singleton
 public class ExecutorApiClient extends RestfulApiClient<String> {
 
-  private static ExecutorApiClient instance = null;
-
-  private ExecutorApiClient() {
-  }
-
-  /**
-   * Singleton method to return the instance of the current object.
-   */
-  public static ExecutorApiClient getInstance() {
-    if (null == instance) {
-      instance = new ExecutorApiClient();
-    }
-
-    return instance;
-  }
-
   /**
    * Implementing the parseResponse function to return de-serialized Json object.
    *
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a83a798..65eb46f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -99,6 +99,7 @@ public class ExecutorManager extends EventHandler implements
   private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
       new ConcurrentHashMap<>();
   private final ExecutingManagerUpdaterThread executingManager;
+  private final ExecutorApiClient apiClient;
   QueuedExecutions queuedFlows;
   File cacheDir;
   private QueueProcessorThread queueProcessor;
@@ -114,11 +115,13 @@ public class ExecutorManager extends EventHandler implements
   @Inject
   public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
       final AlerterHolder alerterHolder,
-      final CommonMetrics commonMetrics) throws ExecutorManagerException {
+      final CommonMetrics commonMetrics,
+      final ExecutorApiClient apiClient) throws ExecutorManagerException {
     this.alerterHolder = alerterHolder;
     this.azkProps = azkProps;
     this.commonMetrics = commonMetrics;
     this.executorLoader = loader;
+    this.apiClient = apiClient;
     this.setupExecutors();
     this.loadRunningFlows();
 
@@ -1135,12 +1138,11 @@ public class ExecutorManager extends EventHandler implements
       paramList = new ArrayList<>();
     }
 
-    final ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
     @SuppressWarnings("unchecked") final URI uri =
         ExecutorApiClient.buildUri(host, port, path, true,
             paramList.toArray(new Pair[0]));
 
-    return apiclient.httpGet(uri, null);
+    return this.apiClient.httpGet(uri, null);
   }
 
   /**
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index ce79de2..b4d195c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -59,30 +59,24 @@ public class ExecutorManagerTest {
   private ExecutableFlow flow1;
   private ExecutableFlow flow2;
   private AlerterHolder alertHolder;
+  private ExecutorApiClient apiClient;
 
   @Before
   public void setup() {
     this.props = AbstractMailerTest.createMailProperties();
     this.alertHolder = new AlerterHolder(this.props, new Emailer(this.props, this.commonMetrics));
-  }
-
-  /* Helper method to create a ExecutorManager Instance */
-  private ExecutorManager createMultiExecutorManagerInstance()
-      throws ExecutorManagerException {
-    return createMultiExecutorManagerInstance(new MockExecutorLoader());
+    this.loader = new MockExecutorLoader();
   }
 
   /*
-   * Helper method to create a ExecutorManager Instance with the given
-   * ExecutorLoader
+   * Helper method to create a ExecutorManager Instance
    */
-  private ExecutorManager createMultiExecutorManagerInstance(
-      final ExecutorLoader loader) throws ExecutorManagerException {
+  private ExecutorManager createMultiExecutorManagerInstance() throws ExecutorManagerException {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
-    loader.addExecutor("localhost", 12345);
-    loader.addExecutor("localhost", 12346);
-    return new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+    this.loader.addExecutor("localhost", 12345);
+    this.loader.addExecutor("localhost", 12346);
+    return createExecutorManager();
   }
 
   /*
@@ -92,9 +86,7 @@ public class ExecutorManagerTest {
   @Test(expected = ExecutorManagerException.class)
   public void testNoExecutorScenario() throws ExecutorManagerException {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    final ExecutorLoader loader = new MockExecutorLoader();
-    @SuppressWarnings("unused") final ExecutorManager manager =
-        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+    @SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
   }
 
   /*
@@ -103,9 +95,7 @@ public class ExecutorManagerTest {
   @Test
   public void testLocalExecutorScenario() throws ExecutorManagerException {
     this.props.put("executor.port", 12345);
-    final ExecutorLoader loader = new MockExecutorLoader();
-    final ExecutorManager manager =
-        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+    final ExecutorManager manager = createExecutorManager();
     final Set<Executor> activeExecutors =
         new HashSet(manager.getAllActiveExecutors());
 
@@ -113,7 +103,7 @@ public class ExecutorManagerTest {
     final Executor executor = activeExecutors.iterator().next();
     Assert.assertEquals(executor.getHost(), "localhost");
     Assert.assertEquals(executor.getPort(), 12345);
-    Assert.assertArrayEquals(activeExecutors.toArray(), loader
+    Assert.assertArrayEquals(activeExecutors.toArray(), this.loader
         .fetchActiveExecutors().toArray());
   }
 
@@ -123,36 +113,38 @@ public class ExecutorManagerTest {
   @Test
   public void testMultipleExecutorScenario() throws ExecutorManagerException {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    final ExecutorLoader loader = new MockExecutorLoader();
-    final Executor executor1 = loader.addExecutor("localhost", 12345);
-    final Executor executor2 = loader.addExecutor("localhost", 12346);
+    final Executor executor1 = this.loader.addExecutor("localhost", 12345);
+    final Executor executor2 = this.loader.addExecutor("localhost", 12346);
 
-    final ExecutorManager manager =
-        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+    final ExecutorManager manager = createExecutorManager();
     final Set<Executor> activeExecutors =
         new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[]{
         executor1, executor2});
   }
 
+  private ExecutorManager createExecutorManager()
+      throws ExecutorManagerException {
+    return new ExecutorManager(this.props, this.loader, this.alertHolder, this.commonMetrics,
+        this.apiClient);
+  }
+
   /*
    * Test executor manager active executor reload
    */
   @Test
   public void testSetupExecutorsSucess() throws ExecutorManagerException {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    final ExecutorLoader loader = new MockExecutorLoader();
-    final Executor executor1 = loader.addExecutor("localhost", 12345);
-    final ExecutorManager manager =
-        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+    final Executor executor1 = this.loader.addExecutor("localhost", 12345);
+    final ExecutorManager manager = createExecutorManager();
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
         new Executor[]{executor1});
 
     // mark older executor as inactive
     executor1.setActive(false);
-    loader.updateExecutor(executor1);
-    final Executor executor2 = loader.addExecutor("localhost", 12346);
-    final Executor executor3 = loader.addExecutor("localhost", 12347);
+    this.loader.updateExecutor(executor1);
+    final Executor executor2 = this.loader.addExecutor("localhost", 12346);
+    final Executor executor3 = this.loader.addExecutor("localhost", 12347);
     manager.setupExecutors();
 
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
@@ -166,10 +158,8 @@ public class ExecutorManagerTest {
   @Test(expected = ExecutorManagerException.class)
   public void testSetupExecutorsException() throws ExecutorManagerException {
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    final ExecutorLoader loader = new MockExecutorLoader();
-    final Executor executor1 = loader.addExecutor("localhost", 12345);
-    final ExecutorManager manager =
-        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
+    final Executor executor1 = this.loader.addExecutor("localhost", 12345);
+    final ExecutorManager manager = createExecutorManager();
     final Set<Executor> activeExecutors =
         new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(),
@@ -177,7 +167,7 @@ public class ExecutorManagerTest {
 
     // mark older executor as inactive
     executor1.setActive(false);
-    loader.updateExecutor(executor1);
+    this.loader.updateExecutor(executor1);
     manager.setupExecutors();
   }
 
@@ -203,8 +193,7 @@ public class ExecutorManagerTest {
   /* Test submit a non-dispatched flow */
   @Test
   public void testQueuedFlows() throws ExecutorManagerException, IOException {
-    final ExecutorLoader loader = new MockExecutorLoader();
-    final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    final ExecutorManager manager = createMultiExecutorManagerInstance();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.setExecutionId(1);
     final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
@@ -217,7 +206,7 @@ public class ExecutorManagerTest {
     final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
 
     final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
-        loader.fetchQueuedFlows();
+        this.loader.fetchQueuedFlows();
     Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
     // Verify things are correctly setup in db
     for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
@@ -255,15 +244,14 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
-    final ExecutorLoader loader = new MockExecutorLoader();
-    final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    final ExecutorManager manager = createMultiExecutorManagerInstance();
     final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     final User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
 
     manager.cancelFlow(flow1, testUser.getUserId());
     final ExecutableFlow fetchedFlow =
-        loader.fetchExecutableFlow(flow1.getExecutionId());
+        this.loader.fetchExecutableFlow(flow1.getExecutionId());
     Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
 
     Assert.assertFalse(manager.getRunningFlows().contains(flow1));
@@ -333,6 +321,7 @@ public class ExecutorManagerTest {
   private void testSetUpForRunningFlows()
       throws ExecutorManagerException, IOException {
     this.loader = mock(ExecutorLoader.class);
+    this.apiClient = mock(ExecutorApiClient.class);
     this.user = TestUtils.getTestUser();
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
@@ -346,8 +335,7 @@ public class ExecutorManagerTest {
     executors.add(executor2);
 
     when(this.loader.fetchActiveExecutors()).thenReturn(executors);
-    this.manager = new ExecutorManager(this.props, this.loader, this.alertHolder,
-        this.commonMetrics);
+    this.manager = createExecutorManager();
 
     this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index d45992a..16fa1c2 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -16,7 +16,10 @@
 
 package azkaban.trigger;
 
+import static org.mockito.Mockito.mock;
+
 import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutorApiClient;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
@@ -42,6 +45,7 @@ public class TriggerManagerDeadlockTest {
   TriggerLoader loader;
   TriggerManager triggerManager;
   ExecutorLoader execLoader;
+  ExecutorApiClient apiClient;
 
   @Before
   public void setup() throws ExecutorManagerException, TriggerManagerException {
@@ -50,10 +54,11 @@ public class TriggerManagerDeadlockTest {
     props.put("trigger.scan.interval", 1000);
     props.put("executor.port", 12321);
     this.execLoader = new MockExecutorLoader();
+    this.apiClient = mock(ExecutorApiClient.class);
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
         new AlerterHolder(props, new Emailer(props, commonMetrics)),
-        commonMetrics);
+        commonMetrics, this.apiClient);
     this.triggerManager = new TriggerManager(props, this.loader, executorManager);
   }