diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
new file mode 100644
index 0000000..30fc44b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Singleton;
+import org.codehaus.jackson.map.ObjectMapper;
+
+@Singleton
+public class ExecutorApiGateway {
+
+ private final ExecutorApiClient apiClient;
+
+ @Inject
+ public ExecutorApiGateway(final ExecutorApiClient apiClient) {
+ this.apiClient = apiClient;
+ }
+
+ Map<String, Object> callWithExecutable(final ExecutableFlow exflow,
+ final Executor executor, final String action) throws ExecutorManagerException {
+ return callWithExecutionId(executor.getHost(), executor.getPort(), action,
+ exflow.getExecutionId(), null, (Pair<String, String>[]) null);
+ }
+
+ Map<String, Object> callWithReference(final ExecutionReference ref, final String action,
+ final Pair<String, String>... params) throws ExecutorManagerException {
+ return callWithExecutionId(ref.getHost(), ref.getPort(), action, ref.getExecId(),
+ null, params);
+ }
+
+ Map<String, Object> callWithReferenceByUser(final ExecutionReference ref,
+ final String action, final String user, final Pair<String, String>... params)
+ throws ExecutorManagerException {
+ return callWithExecutionId(ref.getHost(), ref.getPort(), action,
+ ref.getExecId(), user, params);
+ }
+
+ Map<String, Object> callWithExecutionId(final String host, final int port,
+ final String action, final Integer executionId, final String user,
+ final Pair<String, String>... params) throws ExecutorManagerException {
+ try {
+ final List<Pair<String, String>> paramList = new ArrayList<>();
+
+ if (params != null) {
+ paramList.addAll(Arrays.asList(params));
+ }
+
+ paramList
+ .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
+ paramList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String
+ .valueOf(executionId)));
+ paramList.add(new Pair<>(ConnectorParams.USER_PARAM, user));
+
+ return callForJsonObjectMap(host, port, "/executor", paramList);
+ } catch (final IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ /**
+ * Call executor and parse the JSON response as an instance of the class given as an argument.
+ */
+ <T> T callForJsonType(final String host, final int port, final String path,
+ final List<Pair<String, String>> paramList, final Class<T> valueType) throws IOException {
+ final String responseString = callForJsonString(host, port, path, paramList);
+ if (null == responseString || responseString.length() == 0) {
+ return null;
+ }
+ return new ObjectMapper().readValue(responseString, valueType);
+ }
+
+ /*
+ * Call executor and return json object map.
+ */
+ Map<String, Object> callForJsonObjectMap(final String host, final int port,
+ final String path, final List<Pair<String, String>> paramList) throws IOException {
+ final String responseString =
+ callForJsonString(host, port, path, paramList);
+
+ @SuppressWarnings("unchecked") final Map<String, Object> jsonResponse =
+ (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
+ final String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ throw new IOException(error);
+ }
+ return jsonResponse;
+ }
+
+ /*
+ * Call executor and return raw json string.
+ */
+ private String callForJsonString(final String host, final int port, final String path,
+ List<Pair<String, String>> paramList) throws IOException {
+ if (paramList == null) {
+ paramList = new ArrayList<>();
+ }
+
+ @SuppressWarnings("unchecked") final URI uri =
+ ExecutorApiClient.buildUri(host, port, path, true,
+ paramList.toArray(new Pair[0]));
+
+ return this.apiClient.httpGet(uri, null);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 65eb46f..c89ed33 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -32,12 +32,9 @@ import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.collect.Lists;
-import javax.inject.Inject;
-import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
-import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,13 +46,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -99,7 +97,7 @@ public class ExecutorManager extends EventHandler implements
private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
new ConcurrentHashMap<>();
private final ExecutingManagerUpdaterThread executingManager;
- private final ExecutorApiClient apiClient;
+ private final ExecutorApiGateway apiGateway;
QueuedExecutions queuedFlows;
File cacheDir;
private QueueProcessorThread queueProcessor;
@@ -116,12 +114,12 @@ public class ExecutorManager extends EventHandler implements
public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
final AlerterHolder alerterHolder,
final CommonMetrics commonMetrics,
- final ExecutorApiClient apiClient) throws ExecutorManagerException {
+ final ExecutorApiGateway apiGateway) throws ExecutorManagerException {
this.alerterHolder = alerterHolder;
this.azkProps = azkProps;
this.commonMetrics = commonMetrics;
this.executorLoader = loader;
- this.apiClient = apiClient;
+ this.apiGateway = apiGateway;
this.setupExecutors();
this.loadRunningFlows();
@@ -234,33 +232,30 @@ public class ExecutorManager extends EventHandler implements
private void refreshExecutors() {
synchronized (this.activeExecutors) {
- final List<Pair<Executor, Future<String>>> futures =
+ final List<Pair<Executor, Future<ExecutorInfo>>> futures =
new ArrayList<>();
for (final Executor executor : this.activeExecutors) {
// execute each executorInfo refresh task to fetch
- final Future<String> fetchExecutionInfo =
- this.executorInforRefresherService.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return callExecutorForJsonString(executor.getHost(),
- executor.getPort(), "/serverStatistics", null);
- }
- });
+ final Future<ExecutorInfo> fetchExecutionInfo =
+ this.executorInforRefresherService.submit(
+ () -> this.apiGateway.callForJsonType(executor.getHost(),
+ executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
futures.add(new Pair<>(executor,
fetchExecutionInfo));
}
boolean wasSuccess = true;
- for (final Pair<Executor, Future<String>> refreshPair : futures) {
+ for (final Pair<Executor, Future<ExecutorInfo>> refreshPair : futures) {
final Executor executor = refreshPair.getFirst();
- executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
+ executor.setExecutorInfo(null); // invalidate cached ExecutorInfo
try {
// max 5 secs
- final String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
- executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+ final ExecutorInfo executorInfo = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+ // executorInfo is null if the response was empty
+ executor.setExecutorInfo(executorInfo);
logger.info(String.format(
"Successfully refreshed executor: %s with executor info : %s",
- executor, jsonString));
+ executor, executorInfo));
} catch (final TimeoutException e) {
wasSuccess = false;
logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -690,7 +685,7 @@ public class ExecutorManager extends EventHandler implements
new Pair<>("length", String.valueOf(length));
@SuppressWarnings("unchecked") final Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
typeParam, offsetParam, lengthParam);
return LogData.createLogDataFromObject(result);
} else {
@@ -718,7 +713,7 @@ public class ExecutorManager extends EventHandler implements
new Pair<>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked") final Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return LogData.createLogDataFromObject(result);
} else {
@@ -744,7 +739,7 @@ public class ExecutorManager extends EventHandler implements
new Pair<>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked") final Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
jobIdParam, attemptParam);
@SuppressWarnings("unchecked") final List<Object> jobStats = (List<Object>) result
@@ -772,7 +767,7 @@ public class ExecutorManager extends EventHandler implements
new Pair<>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked") final Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+ this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.METADATA_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
} else {
@@ -794,7 +789,7 @@ public class ExecutorManager extends EventHandler implements
if (this.runningFlows.containsKey(exFlow.getExecutionId())) {
final Pair<ExecutionReference, ExecutableFlow> pair =
this.runningFlows.get(exFlow.getExecutionId());
- callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+ this.apiGateway.callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
userId);
} else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
this.queuedFlows.dequeue(exFlow.getExecutionId());
@@ -818,7 +813,8 @@ public class ExecutorManager extends EventHandler implements
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+ this.apiGateway
+ .callWithReferenceByUser(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
}
}
@@ -833,7 +829,8 @@ public class ExecutorManager extends EventHandler implements
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+ this.apiGateway
+ .callWithReferenceByUser(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
}
}
@@ -898,7 +895,7 @@ public class ExecutorManager extends EventHandler implements
+ " isn't running.");
}
- Map<String, Object> response = null;
+ final Map<String, Object> response;
if (jobIds != null && jobIds.length > 0) {
for (final String jobId : jobIds) {
if (!jobId.isEmpty()) {
@@ -912,14 +909,14 @@ public class ExecutorManager extends EventHandler implements
}
final String ids = StringUtils.join(jobIds, ',');
response =
- callExecutorServer(pair.getFirst(),
+ this.apiGateway.callWithReferenceByUser(pair.getFirst(),
ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
new Pair<>(
ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, ids));
} else {
response =
- callExecutorServer(pair.getFirst(),
+ this.apiGateway.callWithReferenceByUser(pair.getFirst(),
ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
new Pair<>(
ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
@@ -1046,109 +1043,10 @@ public class ExecutorManager extends EventHandler implements
+ " seconds.");
}
- private Map<String, Object> callExecutorServer(final ExecutableFlow exflow,
- final Executor executor, final String action) throws ExecutorManagerException {
- try {
- return callExecutorServer(executor.getHost(), executor.getPort(), action,
- exflow.getExecutionId(), null, (Pair<String, String>[]) null);
- } catch (final IOException e) {
- throw new ExecutorManagerException(e);
- }
- }
-
- private Map<String, Object> callExecutorServer(final ExecutionReference ref,
- final String action, final String user) throws ExecutorManagerException {
- try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), user, (Pair<String, String>[]) null);
- } catch (final IOException e) {
- throw new ExecutorManagerException(e);
- }
- }
-
- private Map<String, Object> callExecutorServer(final ExecutionReference ref,
- final String action, final Pair<String, String>... params)
- throws ExecutorManagerException {
- try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), null, params);
- } catch (final IOException e) {
- throw new ExecutorManagerException(e);
- }
- }
-
- private Map<String, Object> callExecutorServer(final ExecutionReference ref,
- final String action, final String user, final Pair<String, String>... params)
- throws ExecutorManagerException {
- try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), user, params);
- } catch (final IOException e) {
- throw new ExecutorManagerException(e);
- }
- }
-
- private Map<String, Object> callExecutorServer(final String host, final int port,
- final String action, final Integer executionId, final String user,
- final Pair<String, String>... params) throws IOException {
- final List<Pair<String, String>> paramList = new ArrayList<>();
-
- // if params = null
- if (params != null) {
- paramList.addAll(Arrays.asList(params));
- }
-
- paramList
- .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
- paramList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String
- .valueOf(executionId)));
- paramList.add(new Pair<>(ConnectorParams.USER_PARAM, user));
-
- final Map<String, Object> jsonResponse =
- callExecutorForJsonObject(host, port, "/executor", paramList);
-
- return jsonResponse;
- }
-
- /*
- * Helper method used by ExecutorManager to call executor and return json
- * object map
- */
- private Map<String, Object> callExecutorForJsonObject(final String host, final int port,
- final String path, final List<Pair<String, String>> paramList) throws IOException {
- final String responseString =
- callExecutorForJsonString(host, port, path, paramList);
-
- @SuppressWarnings("unchecked") final Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
- final String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
- if (error != null) {
- throw new IOException(error);
- }
- return jsonResponse;
- }
-
- /*
- * Helper method used by ExecutorManager to call executor and return raw json
- * string
- */
- private String callExecutorForJsonString(final String host, final int port, final String path,
- List<Pair<String, String>> paramList) throws IOException {
- if (paramList == null) {
- paramList = new ArrayList<>();
- }
-
- @SuppressWarnings("unchecked") final URI uri =
- ExecutorApiClient.buildUri(host, port, path, true,
- paramList.toArray(new Pair[0]));
-
- return this.apiClient.httpGet(uri, null);
- }
-
/**
* Manage servlet call for stats servlet in Azkaban execution server {@inheritDoc}
*
- * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+ * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(int, java.lang.String,
* azkaban.utils.Pair[])
*/
@Override
@@ -1167,7 +1065,7 @@ public class ExecutorManager extends EventHandler implements
paramList
.add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
- return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+ return this.apiGateway.callForJsonObjectMap(executor.getHost(), executor.getPort(),
"/stats", paramList);
}
@@ -1184,7 +1082,7 @@ public class ExecutorManager extends EventHandler implements
}
final String[] hostPortSplit = hostPort.split(":");
- return callExecutorForJsonObject(hostPortSplit[0],
+ return this.apiGateway.callForJsonObjectMap(hostPortSplit[0],
Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
@@ -1474,7 +1372,7 @@ public class ExecutorManager extends EventHandler implements
this.executorLoader.assignExecutor(choosenExecutor.getId(),
exflow.getExecutionId());
try {
- callExecutorServer(exflow, choosenExecutor,
+ this.apiGateway.callWithExecutable(exflow, choosenExecutor,
ConnectorParams.EXECUTE_ACTION);
} catch (final ExecutorManagerException ex) {
logger.error("Rolling back executor assignment for execution id:"
@@ -1554,10 +1452,10 @@ public class ExecutorManager extends EventHandler implements
Map<String, Object> results = null;
try {
results =
- callExecutorServer(executor.getHost(),
+ ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
executor.getPort(), ConnectorParams.UPDATE_ACTION,
null, null, executionIds, updateTimes);
- } catch (final IOException e) {
+ } catch (final ExecutorManagerException e) {
logger.error(e);
for (final ExecutableFlow flow : entry.getValue()) {
final Pair<ExecutionReference, ExecutableFlow> pair =