azkaban-aplcache

Refactor only: create ExecutorApiGateway (#1436) Extract

9/7/2017 5:53:38 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
new file mode 100644
index 0000000..30fc44b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Singleton;
+import org.codehaus.jackson.map.ObjectMapper;
+
+@Singleton
+public class ExecutorApiGateway {
+
+  private final ExecutorApiClient apiClient;
+
+  @Inject
+  public ExecutorApiGateway(final ExecutorApiClient apiClient) {
+    this.apiClient = apiClient;
+  }
+
+  Map<String, Object> callWithExecutable(final ExecutableFlow exflow,
+      final Executor executor, final String action) throws ExecutorManagerException {
+    return callWithExecutionId(executor.getHost(), executor.getPort(), action,
+        exflow.getExecutionId(), null, (Pair<String, String>[]) null);
+  }
+
+  Map<String, Object> callWithReference(final ExecutionReference ref, final String action,
+      final Pair<String, String>... params) throws ExecutorManagerException {
+    return callWithExecutionId(ref.getHost(), ref.getPort(), action, ref.getExecId(),
+        null, params);
+  }
+
+  Map<String, Object> callWithReferenceByUser(final ExecutionReference ref,
+      final String action, final String user, final Pair<String, String>... params)
+      throws ExecutorManagerException {
+    return callWithExecutionId(ref.getHost(), ref.getPort(), action,
+        ref.getExecId(), user, params);
+  }
+
+  Map<String, Object> callWithExecutionId(final String host, final int port,
+      final String action, final Integer executionId, final String user,
+      final Pair<String, String>... params) throws ExecutorManagerException {
+    try {
+      final List<Pair<String, String>> paramList = new ArrayList<>();
+
+      if (params != null) {
+        paramList.addAll(Arrays.asList(params));
+      }
+
+      paramList
+          .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
+      paramList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String
+          .valueOf(executionId)));
+      paramList.add(new Pair<>(ConnectorParams.USER_PARAM, user));
+
+      return callForJsonObjectMap(host, port, "/executor", paramList);
+    } catch (final IOException e) {
+      throw new ExecutorManagerException(e);
+    }
+  }
+
+  /**
+   * Call executor and parse the JSON response as an instance of the class given as an argument.
+   */
+  <T> T callForJsonType(final String host, final int port, final String path,
+      final List<Pair<String, String>> paramList, final Class<T> valueType) throws IOException {
+    final String responseString = callForJsonString(host, port, path, paramList);
+    if (null == responseString || responseString.length() == 0) {
+      return null;
+    }
+    return new ObjectMapper().readValue(responseString, valueType);
+  }
+
+  /*
+   * Call executor and return json object map.
+   */
+  Map<String, Object> callForJsonObjectMap(final String host, final int port,
+      final String path, final List<Pair<String, String>> paramList) throws IOException {
+    final String responseString =
+        callForJsonString(host, port, path, paramList);
+
+    @SuppressWarnings("unchecked") final Map<String, Object> jsonResponse =
+        (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
+    final String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+    if (error != null) {
+      throw new IOException(error);
+    }
+    return jsonResponse;
+  }
+
+  /*
+   * Call executor and return raw json string.
+   */
+  private String callForJsonString(final String host, final int port, final String path,
+      List<Pair<String, String>> paramList) throws IOException {
+    if (paramList == null) {
+      paramList = new ArrayList<>();
+    }
+
+    @SuppressWarnings("unchecked") final URI uri =
+        ExecutorApiClient.buildUri(host, port, path, true,
+            paramList.toArray(new Pair[0]));
+
+    return this.apiClient.httpGet(uri, null);
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
index c743499..016bdb8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -16,9 +16,6 @@
 
 package azkaban.executor;
 
-import java.io.IOException;
-import org.codehaus.jackson.map.ObjectMapper;
-
 /**
  * Class that exposes the statistics from the executor server. List of the statistics -
  * remainingMemoryPercent; remainingMemory; remainingFlowCapacity; numberOfAssignedFlows;
@@ -51,20 +48,6 @@ public class ExecutorInfo implements java.io.Serializable {
     this.numberOfAssignedFlows = numberOfAssignedFlows;
   }
 
-  /**
-   * Helper function to get an ExecutorInfo instance from the JSon String serialized from another
-   * object.
-   *
-   * @param jsonString the string that will be de-serialized from.
-   * @return instance of the object if the parsing is successful, null other wise.
-   */
-  public static ExecutorInfo fromJSONString(final String jsonString) throws IOException {
-    if (null == jsonString || jsonString.length() == 0) {
-      return null;
-    }
-    return new ObjectMapper().readValue(jsonString, ExecutorInfo.class);
-  }
-
   public double getCpuUsage() {
     return this.cpuUsage;
   }
@@ -144,4 +127,16 @@ public class ExecutorInfo implements java.io.Serializable {
     }
     return false;
   }
+
+  @Override
+  public String toString() {
+    return "ExecutorInfo{" +
+        "remainingMemoryPercent=" + this.remainingMemoryPercent +
+        ", remainingMemoryInMB=" + this.remainingMemoryInMB +
+        ", remainingFlowCapacity=" + this.remainingFlowCapacity +
+        ", numberOfAssignedFlows=" + this.numberOfAssignedFlows +
+        ", lastDispatchedTime=" + this.lastDispatchedTime +
+        ", cpuUsage=" + this.cpuUsage +
+        '}';
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 65eb46f..c89ed33 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -32,12 +32,9 @@ import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import com.google.common.collect.Lists;
-import javax.inject.Inject;
-import javax.inject.Singleton;
 import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
-import java.net.URI;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,13 +46,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
@@ -99,7 +97,7 @@ public class ExecutorManager extends EventHandler implements
   private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
       new ConcurrentHashMap<>();
   private final ExecutingManagerUpdaterThread executingManager;
-  private final ExecutorApiClient apiClient;
+  private final ExecutorApiGateway apiGateway;
   QueuedExecutions queuedFlows;
   File cacheDir;
   private QueueProcessorThread queueProcessor;
@@ -116,12 +114,12 @@ public class ExecutorManager extends EventHandler implements
   public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
       final AlerterHolder alerterHolder,
       final CommonMetrics commonMetrics,
-      final ExecutorApiClient apiClient) throws ExecutorManagerException {
+      final ExecutorApiGateway apiGateway) throws ExecutorManagerException {
     this.alerterHolder = alerterHolder;
     this.azkProps = azkProps;
     this.commonMetrics = commonMetrics;
     this.executorLoader = loader;
-    this.apiClient = apiClient;
+    this.apiGateway = apiGateway;
     this.setupExecutors();
     this.loadRunningFlows();
 
@@ -234,33 +232,30 @@ public class ExecutorManager extends EventHandler implements
   private void refreshExecutors() {
     synchronized (this.activeExecutors) {
 
-      final List<Pair<Executor, Future<String>>> futures =
+      final List<Pair<Executor, Future<ExecutorInfo>>> futures =
           new ArrayList<>();
       for (final Executor executor : this.activeExecutors) {
         // execute each executorInfo refresh task to fetch
-        final Future<String> fetchExecutionInfo =
-            this.executorInforRefresherService.submit(new Callable<String>() {
-              @Override
-              public String call() throws Exception {
-                return callExecutorForJsonString(executor.getHost(),
-                    executor.getPort(), "/serverStatistics", null);
-              }
-            });
+        final Future<ExecutorInfo> fetchExecutionInfo =
+            this.executorInforRefresherService.submit(
+                () -> this.apiGateway.callForJsonType(executor.getHost(),
+                    executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
         futures.add(new Pair<>(executor,
             fetchExecutionInfo));
       }
 
       boolean wasSuccess = true;
-      for (final Pair<Executor, Future<String>> refreshPair : futures) {
+      for (final Pair<Executor, Future<ExecutorInfo>> refreshPair : futures) {
         final Executor executor = refreshPair.getFirst();
-        executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
+        executor.setExecutorInfo(null); // invalidate cached ExecutorInfo
         try {
           // max 5 secs
-          final String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
-          executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+          final ExecutorInfo executorInfo = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+          // executorInfo is null if the response was empty
+          executor.setExecutorInfo(executorInfo);
           logger.info(String.format(
               "Successfully refreshed executor: %s with executor info : %s",
-              executor, jsonString));
+              executor, executorInfo));
         } catch (final TimeoutException e) {
           wasSuccess = false;
           logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -690,7 +685,7 @@ public class ExecutorManager extends EventHandler implements
           new Pair<>("length", String.valueOf(length));
 
       @SuppressWarnings("unchecked") final Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+          this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
               typeParam, offsetParam, lengthParam);
       return LogData.createLogDataFromObject(result);
     } else {
@@ -718,7 +713,7 @@ public class ExecutorManager extends EventHandler implements
           new Pair<>("attempt", String.valueOf(attempt));
 
       @SuppressWarnings("unchecked") final Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+          this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
               typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return LogData.createLogDataFromObject(result);
     } else {
@@ -744,7 +739,7 @@ public class ExecutorManager extends EventHandler implements
         new Pair<>("attempt", String.valueOf(attempt));
 
     @SuppressWarnings("unchecked") final Map<String, Object> result =
-        callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+        this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
             jobIdParam, attemptParam);
 
     @SuppressWarnings("unchecked") final List<Object> jobStats = (List<Object>) result
@@ -772,7 +767,7 @@ public class ExecutorManager extends EventHandler implements
           new Pair<>("attempt", String.valueOf(attempt));
 
       @SuppressWarnings("unchecked") final Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+          this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.METADATA_ACTION,
               typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return JobMetaData.createJobMetaDataFromObject(result);
     } else {
@@ -794,7 +789,7 @@ public class ExecutorManager extends EventHandler implements
       if (this.runningFlows.containsKey(exFlow.getExecutionId())) {
         final Pair<ExecutionReference, ExecutableFlow> pair =
             this.runningFlows.get(exFlow.getExecutionId());
-        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+        this.apiGateway.callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
             userId);
       } else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
         this.queuedFlows.dequeue(exFlow.getExecutionId());
@@ -818,7 +813,8 @@ public class ExecutorManager extends EventHandler implements
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
             + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+      this.apiGateway
+          .callWithReferenceByUser(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
     }
   }
 
@@ -833,7 +829,8 @@ public class ExecutorManager extends EventHandler implements
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
             + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+      this.apiGateway
+          .callWithReferenceByUser(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
     }
   }
 
@@ -898,7 +895,7 @@ public class ExecutorManager extends EventHandler implements
             + " isn't running.");
       }
 
-      Map<String, Object> response = null;
+      final Map<String, Object> response;
       if (jobIds != null && jobIds.length > 0) {
         for (final String jobId : jobIds) {
           if (!jobId.isEmpty()) {
@@ -912,14 +909,14 @@ public class ExecutorManager extends EventHandler implements
         }
         final String ids = StringUtils.join(jobIds, ',');
         response =
-            callExecutorServer(pair.getFirst(),
+            this.apiGateway.callWithReferenceByUser(pair.getFirst(),
                 ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
                 new Pair<>(
                     ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
                 new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, ids));
       } else {
         response =
-            callExecutorServer(pair.getFirst(),
+            this.apiGateway.callWithReferenceByUser(pair.getFirst(),
                 ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
                 new Pair<>(
                     ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
@@ -1046,109 +1043,10 @@ public class ExecutorManager extends EventHandler implements
             + " seconds.");
   }
 
-  private Map<String, Object> callExecutorServer(final ExecutableFlow exflow,
-      final Executor executor, final String action) throws ExecutorManagerException {
-    try {
-      return callExecutorServer(executor.getHost(), executor.getPort(), action,
-          exflow.getExecutionId(), null, (Pair<String, String>[]) null);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException(e);
-    }
-  }
-
-  private Map<String, Object> callExecutorServer(final ExecutionReference ref,
-      final String action, final String user) throws ExecutorManagerException {
-    try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), user, (Pair<String, String>[]) null);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException(e);
-    }
-  }
-
-  private Map<String, Object> callExecutorServer(final ExecutionReference ref,
-      final String action, final Pair<String, String>... params)
-      throws ExecutorManagerException {
-    try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), null, params);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException(e);
-    }
-  }
-
-  private Map<String, Object> callExecutorServer(final ExecutionReference ref,
-      final String action, final String user, final Pair<String, String>... params)
-      throws ExecutorManagerException {
-    try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), user, params);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException(e);
-    }
-  }
-
-  private Map<String, Object> callExecutorServer(final String host, final int port,
-      final String action, final Integer executionId, final String user,
-      final Pair<String, String>... params) throws IOException {
-    final List<Pair<String, String>> paramList = new ArrayList<>();
-
-    // if params = null
-    if (params != null) {
-      paramList.addAll(Arrays.asList(params));
-    }
-
-    paramList
-        .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
-    paramList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String
-        .valueOf(executionId)));
-    paramList.add(new Pair<>(ConnectorParams.USER_PARAM, user));
-
-    final Map<String, Object> jsonResponse =
-        callExecutorForJsonObject(host, port, "/executor", paramList);
-
-    return jsonResponse;
-  }
-
-  /*
-   * Helper method used by ExecutorManager to call executor and return json
-   * object map
-   */
-  private Map<String, Object> callExecutorForJsonObject(final String host, final int port,
-      final String path, final List<Pair<String, String>> paramList) throws IOException {
-    final String responseString =
-        callExecutorForJsonString(host, port, path, paramList);
-
-    @SuppressWarnings("unchecked") final Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
-    final String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
-    if (error != null) {
-      throw new IOException(error);
-    }
-    return jsonResponse;
-  }
-
-  /*
-   * Helper method used by ExecutorManager to call executor and return raw json
-   * string
-   */
-  private String callExecutorForJsonString(final String host, final int port, final String path,
-      List<Pair<String, String>> paramList) throws IOException {
-    if (paramList == null) {
-      paramList = new ArrayList<>();
-    }
-
-    @SuppressWarnings("unchecked") final URI uri =
-        ExecutorApiClient.buildUri(host, port, path, true,
-            paramList.toArray(new Pair[0]));
-
-    return this.apiClient.httpGet(uri, null);
-  }
-
   /**
    * Manage servlet call for stats servlet in Azkaban execution server {@inheritDoc}
    *
-   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(int, java.lang.String,
    * azkaban.utils.Pair[])
    */
   @Override
@@ -1167,7 +1065,7 @@ public class ExecutorManager extends EventHandler implements
     paramList
         .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
 
-    return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+    return this.apiGateway.callForJsonObjectMap(executor.getHost(), executor.getPort(),
         "/stats", paramList);
   }
 
@@ -1184,7 +1082,7 @@ public class ExecutorManager extends EventHandler implements
     }
 
     final String[] hostPortSplit = hostPort.split(":");
-    return callExecutorForJsonObject(hostPortSplit[0],
+    return this.apiGateway.callForJsonObjectMap(hostPortSplit[0],
         Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
   }
 
@@ -1474,7 +1372,7 @@ public class ExecutorManager extends EventHandler implements
     this.executorLoader.assignExecutor(choosenExecutor.getId(),
         exflow.getExecutionId());
     try {
-      callExecutorServer(exflow, choosenExecutor,
+      this.apiGateway.callWithExecutable(exflow, choosenExecutor,
           ConnectorParams.EXECUTE_ACTION);
     } catch (final ExecutorManagerException ex) {
       logger.error("Rolling back executor assignment for execution id:"
@@ -1554,10 +1452,10 @@ public class ExecutorManager extends EventHandler implements
               Map<String, Object> results = null;
               try {
                 results =
-                    callExecutorServer(executor.getHost(),
+                    ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
                         executor.getPort(), ConnectorParams.UPDATE_ACTION,
                         null, null, executionIds, updateTimes);
-              } catch (final IOException e) {
+              } catch (final ExecutorManagerException e) {
                 logger.error(e);
                 for (final ExecutableFlow flow : entry.getValue()) {
                   final Pair<ExecutionReference, ExecutableFlow> pair =
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
new file mode 100644
index 0000000..7732067
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorApiGatewayTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.mockito.Mockito.when;
+
+import azkaban.utils.JSONUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ExecutorApiGatewayTest {
+
+  private ExecutorApiGateway gateway;
+  private ExecutorApiClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    this.client = Mockito.mock(ExecutorApiClient.class);
+    this.gateway = new ExecutorApiGateway(this.client);
+  }
+
+  @Test
+  public void testExecutorInfoJsonParser() throws Exception {
+    final ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89,
+        10);
+    final String json = JSONUtils.toJSON(exeInfo);
+    when(this.client.httpGet(Mockito.any(), Mockito.any())).thenReturn(json);
+    final ExecutorInfo exeInfo2 = this.gateway
+        .callForJsonType("localhost", 1234, "executor", null, ExecutorInfo.class);
+    Assert.assertTrue(exeInfo.equals(exeInfo2));
+  }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index b4d195c..9ee67e3 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -59,7 +59,7 @@ public class ExecutorManagerTest {
   private ExecutableFlow flow1;
   private ExecutableFlow flow2;
   private AlerterHolder alertHolder;
-  private ExecutorApiClient apiClient;
+  private ExecutorApiGateway apiGateway;
 
   @Before
   public void setup() {
@@ -126,7 +126,7 @@ public class ExecutorManagerTest {
   private ExecutorManager createExecutorManager()
       throws ExecutorManagerException {
     return new ExecutorManager(this.props, this.loader, this.alertHolder, this.commonMetrics,
-        this.apiClient);
+        this.apiGateway);
   }
 
   /*
@@ -321,7 +321,7 @@ public class ExecutorManagerTest {
   private void testSetUpForRunningFlows()
       throws ExecutorManagerException, IOException {
     this.loader = mock(ExecutorLoader.class);
-    this.apiClient = mock(ExecutorApiClient.class);
+    this.apiGateway = mock(ExecutorApiGateway.class);
     this.user = TestUtils.getTestUser();
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index 713eb2e..6f55c07 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -24,7 +24,6 @@ import azkaban.executor.selector.ExecutorFilter;
 import azkaban.executor.selector.ExecutorSelector;
 import azkaban.executor.selector.FactorComparator;
 import azkaban.executor.selector.FactorFilter;
-import azkaban.utils.JSONUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -557,15 +556,6 @@ public class SelectorTest {
     Assert.assertEquals(executorList.get(2), executor);
   }
 
-  @Test
-  public void testExecutorInfoJsonParser() throws Exception {
-    final ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89,
-        10);
-    final String json = JSONUtils.toJSON(exeInfo);
-    final ExecutorInfo exeInfo2 = ExecutorInfo.fromJSONString(json);
-    Assert.assertTrue(exeInfo.equals(exeInfo2));
-  }
-
   // mock executor object.
   static class MockExecutorObject implements Comparable<MockExecutorObject> {
 
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 16fa1c2..61e5208 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -19,7 +19,7 @@ package azkaban.trigger;
 import static org.mockito.Mockito.mock;
 
 import azkaban.executor.AlerterHolder;
-import azkaban.executor.ExecutorApiClient;
+import azkaban.executor.ExecutorApiGateway;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
@@ -45,7 +45,7 @@ public class TriggerManagerDeadlockTest {
   TriggerLoader loader;
   TriggerManager triggerManager;
   ExecutorLoader execLoader;
-  ExecutorApiClient apiClient;
+  ExecutorApiGateway apiGateway;
 
   @Before
   public void setup() throws ExecutorManagerException, TriggerManagerException {
@@ -54,11 +54,11 @@ public class TriggerManagerDeadlockTest {
     props.put("trigger.scan.interval", 1000);
     props.put("executor.port", 12321);
     this.execLoader = new MockExecutorLoader();
-    this.apiClient = mock(ExecutorApiClient.class);
+    this.apiGateway = mock(ExecutorApiGateway.class);
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
         new AlerterHolder(props, new Emailer(props, commonMetrics)),
-        commonMetrics, this.apiClient);
+        commonMetrics, this.apiGateway);
     this.triggerManager = new TriggerManager(props, this.loader, executorManager);
   }