diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c86648e..0bc4f39 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.HttpClient;
@@ -58,23 +59,30 @@ import azkaban.utils.Props;
*
*/
public class ExecutorManager extends EventHandler implements
- ExecutorManagerAdapter {
+ ExecutorManagerAdapter {
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
- private String executorHost;
- private int executorPort;
private CleanerThread cleanerThread;
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
- new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
- new ConcurrentHashMap<Integer, ExecutableFlow>();
+ new ConcurrentHashMap<Integer, ExecutableFlow>();
+ /* all flows ExecutorManager is currently dealing with */
+ private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>>();
+
+ private Set<Executor> activeExecutors = new HashSet<Executor>();
private ExecutingManagerUpdaterThread executingManager;
+ private QueueProcessorThread queueProcessor;
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
- * 24 * 60 * 60 * 1000l;
+ * 24 * 60 * 60 * 1000l;
+
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
@@ -85,11 +93,12 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
public ExecutorManager(Props props, ExecutorLoader loader,
- Map<String, Alerter> alters) throws ExecutorManagerException {
+ Map<String, Alerter> alters) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
- executorHost = props.getString("executor.host", "localhost");
- executorPort = props.getInt("executor.port");
+ this.loadQueuedFlows();
+
+ setupExecutors(props);
alerters = alters;
@@ -98,14 +107,43 @@ public class ExecutorManager extends EventHandler implements
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
+ queueProcessor = new QueueProcessorThread();
+ queueProcessor.start();
+
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ props.getLong("execution.logs.retention.ms",
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
+ /* setup activeExecutors using azkaban.properties and database executors */
+ private void setupExecutors(Props props) throws ExecutorManagerException {
+ // Add local executor, if specified as per properties
+ if (props.containsKey("executor.port")) {
+ String executorHost = props.getString("executor.host", "localhost");
+ int executorPort = props.getInt("executor.port");
+ Executor executor =
+ executorLoader.fetchExecutor(executorHost, executorPort);
+ if (executor == null) {
+ executor = executorLoader.addExecutor(executorHost, executorPort);
+ } else if (!executor.isActive()) {
+ executorLoader.activateExecutor(executor.getId());
+ }
+ activeExecutors.add(new Executor(executor.getId(), executorHost,
+ executorPort));
+ }
+
+ if (props.getBoolean("azkaban.multiple.executors", false)) {
+ activeExecutors.addAll(executorLoader.fetchActiveExecutors());
+ }
+
+ if (activeExecutors.isEmpty()) {
+ throw new ExecutorManagerException("No active executor found");
+ }
+ }
+
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
@@ -131,9 +169,11 @@ public class ExecutorManager extends EventHandler implements
@Override
public Set<String> getPrimaryServerHosts() {
- // Only one for now. More probably later.
+ // TODO: do we want to have a primary
HashSet<String> ports = new HashSet<String>();
- ports.add(executorHost + ":" + executorPort);
+ for (Executor executor : activeExecutors) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
return ports;
}
@@ -141,11 +181,8 @@ public class ExecutorManager extends EventHandler implements
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
HashSet<String> ports = new HashSet<String>();
- ports.add(executorHost + ":" + executorPort);
- for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
- .values()) {
- ExecutionReference ref = running.getFirst();
- ports.add(ref.getHost() + ":" + ref.getPort());
+ for (Executor executor : activeExecutors) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
}
return ports;
@@ -155,50 +192,115 @@ public class ExecutorManager extends EventHandler implements
runningFlows.putAll(executorLoader.fetchActiveFlows());
}
+ /*
+ * load queued flows i.e with active_execution_reference and not assigned to
+ * any executor
+ */
+ private void loadQueuedFlows() throws ExecutorManagerException {
+ queuedFlows.addAll(executorLoader.fetchQueuedFlows());
+ for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlows) {
+ queuedFlowMap.put(ref.getSecond().getExecutionId(), ref);
+ }
+ }
+
+ /**
+ * Gets a list of all the active (running, non-dispatched) executions for a
+ * given project and flow {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
+ * java.lang.String)
+ */
@Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
ArrayList<Integer> executionIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+ if (ref.getSecond().getFlowId().equals(flowId)
+ && ref.getSecond().getProjectId() == projectId) {
+ executionIds.add(ref.getFirst().getExecId());
+ }
+ }
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
if (ref.getSecond().getFlowId().equals(flowId)
- && ref.getSecond().getProjectId() == projectId) {
+ && ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
return executionIds;
}
+ /**
+ * Checks whether the given flow has an active (running, non-dispatched)
+ * executions {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
+ * java.lang.String)
+ */
@Override
public boolean isFlowRunning(int projectId, String flowId) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+ if (ref.getSecond().getProjectId() == projectId
+ && ref.getSecond().getFlowId().equals(flowId)) {
+ return true;
+ }
+ }
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
if (ref.getSecond().getProjectId() == projectId
- && ref.getSecond().getFlowId().equals(flowId)) {
+ && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
return false;
}
+ /**
+ * Fetch ExecutableFlow from an active (running, non-dispatched) or from
+ * database {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
+ */
@Override
public ExecutableFlow getExecutableFlow(int execId)
- throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
- if (active == null) {
+ throws ExecutorManagerException {
+ if (runningFlows.containsKey(execId)) {
+ return runningFlows.get(execId).getSecond();
+ } else if (queuedFlowMap.containsKey(execId)) {
+ return queuedFlowMap.get(execId).getSecond();
+ } else {
return executorLoader.fetchExecutableFlow(execId);
}
- return active.getSecond();
}
+ /**
+ * Get all active (running, non-dispatched) flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
@Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+ flows.add(ref.getSecond());
+ }
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
flows.add(ref.getSecond());
}
return flows;
}
+ /**
+ * Get execution Ids of all active (running, non-dispatched) flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
public String getRunningFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
allIds.add(ref.getSecond().getExecutionId());
}
@@ -206,136 +308,141 @@ public class ExecutorManager extends EventHandler implements
return allIds.toString();
}
+ /**
+ * Get recently finished flows {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRecentlyFinishedFlows()
+ */
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@Override
public List<ExecutableFlow> getExecutableFlows(Project project,
- String flowId, int skip, int size) throws ExecutorManagerException {
+ String flowId, int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+ executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
- int skip, int size) throws ExecutorManagerException {
+ int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
- 0, -1, -1, skip, size);
+ executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+ 0, -1, -1, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(String projContain,
- String flowContain, String userContain, int status, long begin, long end,
- int skip, int size) throws ExecutorManagerException {
+ String flowContain, String userContain, int status, long begin, long end,
+ int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
- status, begin, end, skip, size);
+ executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+ status, begin, end, skip, size);
return flows;
}
@Override
public List<ExecutableJobInfo> getExecutableJobs(Project project,
- String jobId, int skip, int size) throws ExecutorManagerException {
+ String jobId, int skip, int size) throws ExecutorManagerException {
List<ExecutableJobInfo> nodes =
- executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+ executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
return nodes;
}
@Override
public int getNumberOfJobExecutions(Project project, String jobId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
}
@Override
public int getNumberOfExecutions(Project project, String flowId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
}
@Override
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
- int length) throws ExecutorManagerException {
+ int length) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
+ new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
+ new Pair<String, String>("length", String.valueOf(length));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
- typeParam, offsetParam, lengthParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ typeParam, offsetParam, lengthParam);
return LogData.createLogDataFromObject(result);
} else {
LogData value =
- executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
- length);
+ executorLoader
+ .fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
return value;
}
}
@Override
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
- int offset, int length, int attempt) throws ExecutorManagerException {
+ int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
- new Pair<String, String>("jobId", jobId);
+ new Pair<String, String>("jobId", jobId);
Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
+ new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
+ new Pair<String, String>("length", String.valueOf(length));
Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
- typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return LogData.createLogDataFromObject(result);
} else {
LogData value =
- executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
- offset, length);
+ executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+ offset, length);
return value;
}
}
@Override
public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
- int attempt) throws ExecutorManagerException {
+ int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
- attempt);
+ attempt);
}
Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
- jobIdParam, attemptParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+ jobIdParam, attemptParam);
@SuppressWarnings("unchecked")
List<Object> jobStats = (List<Object>) result.get("attachments");
@@ -345,57 +452,74 @@ public class ExecutorManager extends EventHandler implements
@Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
- String jobId, int offset, int length, int attempt)
- throws ExecutorManagerException {
+ String jobId, int offset, int length, int attempt)
+ throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
- new Pair<String, String>("jobId", jobId);
+ new Pair<String, String>("jobId", jobId);
Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
+ new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
+ new Pair<String, String>("length", String.valueOf(length));
Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
- typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+ typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
} else {
return null;
}
}
+ /**
+ * if flows was dispatched to an executor, cancel by calling Executor else if
+ * flow is still in queue, remove from queue and finalize {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
+ * java.lang.String)
+ */
@Override
public void cancelFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
+ if (runningFlows.containsKey(exFlow.getExecutionId())) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
runningFlows.get(exFlow.getExecutionId());
- if (pair == null) {
+ callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+ userId);
+ } else if (queuedFlowMap.containsKey(exFlow.getExecutionId())) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ queuedFlowMap.get(exFlow.getExecutionId());
+ synchronized (pair) {
+ queuedFlows.remove(pair);
+ queuedFlowMap.remove(exFlow.getExecutionId());
+ finalizeFlows(exFlow);
+ }
+ } else {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
}
}
@Override
public void resumeFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
}
@@ -403,14 +527,14 @@ public class ExecutorManager extends EventHandler implements
@Override
public void pauseFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
}
@@ -418,63 +542,63 @@ public class ExecutorManager extends EventHandler implements
@Override
public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void retryFailures(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
}
@Override
public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
- jobIds);
+ jobIds);
}
@SuppressWarnings("unchecked")
private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
- String command, String userId, String... jobIds)
- throws ExecutorManagerException {
+ String command, String userId, String... jobIds)
+ throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
Map<String, Object> response = null;
@@ -484,24 +608,24 @@ public class ExecutorManager extends EventHandler implements
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
throw new ExecutorManagerException("Job " + jobId
- + " doesn't exist in execution " + exFlow.getExecutionId()
- + ".");
+ + " doesn't exist in execution " + exFlow.getExecutionId()
+ + ".");
}
}
}
String ids = StringUtils.join(jobIds, ',');
response =
- callExecutorServer(pair.getFirst(),
- ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
- new Pair<String, String>(
- ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
- new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+ callExecutorServer(pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+ new Pair<String, String>(
+ ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+ new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
} else {
response =
- callExecutorServer(pair.getFirst(),
- ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
- new Pair<String, String>(
- ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+ callExecutorServer(pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+ new Pair<String, String>(
+ ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
}
return response;
@@ -509,7 +633,7 @@ public class ExecutorManager extends EventHandler implements
}
private void applyDisabledJobs(List<Object> disabledJobs,
- ExecutableFlowBase exflow) {
+ ExecutableFlowBase exflow) {
for (Object disabled : disabledJobs) {
if (disabled instanceof String) {
String nodeName = (String) disabled;
@@ -523,7 +647,7 @@ public class ExecutorManager extends EventHandler implements
String nodeName = (String) nestedDisabled.get("id");
@SuppressWarnings("unchecked")
List<Object> subDisabledJobs =
- (List<Object>) nestedDisabled.get("children");
+ (List<Object>) nestedDisabled.get("children");
if (nodeName == null || subDisabledJobs == null) {
return;
@@ -539,10 +663,10 @@ public class ExecutorManager extends EventHandler implements
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exflow) {
logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
- + userId);
+ + userId);
int projectId = exflow.getProjectId();
String flowId = exflow.getFlowId();
@@ -563,31 +687,32 @@ public class ExecutorManager extends EventHandler implements
if (!running.isEmpty()) {
if (options.getConcurrentOption().equals(
- ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+ ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
Integer runningExecId = running.get(running.size() - 1);
options.setPipelineExecutionId(runningExecId);
message =
- "Flow " + flowId + " is already running with exec id "
- + runningExecId + ". Pipelining level "
- + options.getPipelineLevel() + ". \n";
+ "Flow " + flowId + " is already running with exec id "
+ + runningExecId + ". Pipelining level "
+ + options.getPipelineLevel() + ". \n";
} else if (options.getConcurrentOption().equals(
- ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+ ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
throw new ExecutorManagerException("Flow " + flowId
- + " is already running. Skipping execution.",
- ExecutorManagerException.Reason.SkippedExecution);
+ + " is already running. Skipping execution.",
+ ExecutorManagerException.Reason.SkippedExecution);
} else {
// The settings is to run anyways.
message =
- "Flow " + flowId + " is already running with exec id "
- + StringUtils.join(running, ",")
- + ". Will execute concurrently. \n";
+ "Flow " + flowId + " is already running with exec id "
+ + StringUtils.join(running, ",")
+ + ". Will execute concurrently. \n";
}
}
- boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
- ProjectWhitelist.WhitelistType.MemoryCheck);
+ boolean memoryCheck =
+ !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
options.setMemoryCheck(memoryCheck);
// The exflow id is set by the loader. So it's unavailable until after
@@ -597,22 +722,16 @@ public class ExecutorManager extends EventHandler implements
// We create an active flow reference in the datastore. If the upload
// fails, we remove the reference.
ExecutionReference reference =
- new ExecutionReference(exflow.getExecutionId(), executorHost,
- executorPort);
+ new ExecutionReference(exflow.getExecutionId());
+ // Added to db queue
executorLoader.addActiveExecutableReference(reference);
- try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
- message +=
- "Execution submitted successfully with exec id "
- + exflow.getExecutionId();
- } catch (ExecutorManagerException e) {
- executorLoader.removeActiveExecutableReference(reference.getExecId());
- throw e;
- }
-
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow);
+ queuedFlowMap.put(exflow.getExecutionId(), pair);
+ queuedFlows.add(pair);
+ message +=
+ "Execution submitted successfully with exec id "
+ + exflow.getExecutionId();
return message;
}
}
@@ -627,50 +746,50 @@ public class ExecutorManager extends EventHandler implements
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action) throws ExecutorManagerException {
+ String action) throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), null, (Pair<String, String>[]) null);
+ ref.getExecId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, String user) throws ExecutorManagerException {
+ String action, String user) throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), user, (Pair<String, String>[]) null);
+ ref.getExecId(), user, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, Pair<String, String>... params)
- throws ExecutorManagerException {
+ String action, Pair<String, String>... params)
+ throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), null, params);
+ ref.getExecId(), null, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, String user, Pair<String, String>... params)
- throws ExecutorManagerException {
+ String action, String user, Pair<String, String>... params)
+ throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), user, params);
+ ref.getExecId(), user, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(String host, int port,
- String action, Integer executionId, String user,
- Pair<String, String>... params) throws IOException {
+ String action, Integer executionId, String user,
+ Pair<String, String>... params) throws IOException {
URIBuilder builder = new URIBuilder();
builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
@@ -678,7 +797,7 @@ public class ExecutorManager extends EventHandler implements
if (executionId != null) {
builder.setParameter(ConnectorParams.EXECID_PARAM,
- String.valueOf(executionId));
+ String.valueOf(executionId));
}
if (user != null) {
@@ -713,7 +832,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
@@ -725,13 +844,17 @@ public class ExecutorManager extends EventHandler implements
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
- * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+ * azkaban.utils.Pair[])
*/
@Override
- public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
+ public Map<String, Object> callExecutorStats(String action,
+ Pair<String, String>... params) throws IOException {
URIBuilder builder = new URIBuilder();
- builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+ // TODO: fix to take host and port form user
+ // builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
builder.setParameter(ConnectorParams.ACTION_PARAM, action);
@@ -763,20 +886,19 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
return jsonResponse;
}
-
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
- String mBean) throws IOException {
+ String mBean) throws IOException {
URIBuilder builder = new URIBuilder();
String[] hostPortSplit = hostPort.split(":");
builder.setScheme("http").setHost(hostPortSplit[0])
- .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+ .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
builder.setParameter(action, "");
if (mBean != null) {
@@ -805,7 +927,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
@@ -815,6 +937,7 @@ public class ExecutorManager extends EventHandler implements
@Override
public void shutdown() {
+ queueProcessor.shutdown();
executingManager.shutdown();
}
@@ -846,64 +969,64 @@ public class ExecutorManager extends EventHandler implements
lastThreadCheckTime = System.currentTimeMillis();
updaterStage = "Starting update all flows.";
- Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
- getFlowToExecutorMap();
+ Map<Executor, List<ExecutableFlow>> exFlowMap =
+ getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows =
- new ArrayList<ExecutableFlow>();
+ new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows =
- new ArrayList<ExecutableFlow>();
+ new ArrayList<ExecutableFlow>();
if (exFlowMap.size() > 0) {
- for (Map.Entry<ConnectionInfo, List<ExecutableFlow>> entry : exFlowMap
- .entrySet()) {
+ for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+ .entrySet()) {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
- ConnectionInfo connection = entry.getKey();
+ Executor executor = entry.getKey();
updaterStage =
- "Starting update flows on " + connection.getHost() + ":"
- + connection.getPort();
+ "Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort();
// We pack the parameters of the same host together before we
// query.
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
- updateTimesList);
+ updateTimesList);
Pair<String, String> updateTimes =
- new Pair<String, String>(
- ConnectorParams.UPDATE_TIME_LIST_PARAM,
- JSONUtils.toJSON(updateTimesList));
+ new Pair<String, String>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
Pair<String, String> executionIds =
- new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
- JSONUtils.toJSON(executionIdsList));
+ new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
Map<String, Object> results = null;
try {
results =
- callExecutorServer(connection.getHost(),
- connection.getPort(), ConnectorParams.UPDATE_ACTION,
- null, null, executionIds, updateTimes);
+ callExecutorServer(executor.getHost(), executor.getPort(),
+ ConnectorParams.UPDATE_ACTION, null, null, executionIds,
+ updateTimes);
} catch (IOException e) {
logger.error(e);
for (ExecutableFlow flow : entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(flow.getExecutionId());
+ runningFlows.get(flow.getExecutionId());
updaterStage =
- "Failed to get update. Doing some clean up for flow "
- + pair.getSecond().getExecutionId();
+ "Failed to get update. Doing some clean up for flow "
+ + pair.getSecond().getExecutionId();
if (pair != null) {
ExecutionReference ref = pair.getFirst();
int numErrors = ref.getNumErrors();
if (ref.getNumErrors() < this.numErrors) {
ref.setNextCheckTime(System.currentTimeMillis()
- + errorThreshold);
+ + errorThreshold);
ref.setNumErrors(++numErrors);
} else {
logger.error("Evicting flow " + flow.getExecutionId()
- + ". The executor is unresponsive.");
+ + ". The executor is unresponsive.");
// TODO should send out an unresponsive email here.
finalizeFlows.add(pair.getSecond());
}
@@ -914,8 +1037,8 @@ public class ExecutorManager extends EventHandler implements
// We gets results
if (results != null) {
List<Map<String, Object>> executionUpdates =
- (List<Map<String, Object>>) results
- .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+ (List<Map<String, Object>>) results
+ .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
for (Map<String, Object> updateMap : executionUpdates) {
try {
ExecutableFlow flow = updateExecution(updateMap);
@@ -945,16 +1068,16 @@ public class ExecutorManager extends EventHandler implements
// Add new finished
for (ExecutableFlow flow : finishedFlows) {
if (flow.getScheduleId() >= 0
- && flow.getStatus() == Status.SUCCEEDED) {
+ && flow.getStatus() == Status.SUCCEEDED) {
ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
- cacheDir);
+ cacheDir);
}
fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
recentlyFinished.put(flow.getExecutionId(), flow);
}
updaterStage =
- "Finalizing " + finalizeFlows.size() + " error flows.";
+ "Finalizing " + finalizeFlows.size() + " error flows.";
// Kill error flows
for (ExecutableFlow flow : finalizeFlows) {
@@ -1015,6 +1138,7 @@ public class ExecutorManager extends EventHandler implements
updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
+
fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
recentlyFinished.put(execId, dsFlow);
@@ -1032,12 +1156,12 @@ public class ExecutorManager extends EventHandler implements
Alerter mailAlerter = alerters.get("email");
if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
if (options.getFailureEmails() != null
- && !options.getFailureEmails().isEmpty()) {
+ && !options.getFailureEmails().isEmpty()) {
try {
mailAlerter
- .alertOnError(
- flow,
- "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+ .alertOnError(
+ flow,
+ "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
} catch (Exception e) {
logger.error(e);
}
@@ -1048,9 +1172,9 @@ public class ExecutorManager extends EventHandler implements
if (alerter != null) {
try {
alerter
- .alertOnError(
- flow,
- "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+ .alertOnError(
+ flow,
+ "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -1058,12 +1182,12 @@ public class ExecutorManager extends EventHandler implements
}
} else {
logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
+ + " doesn't exist. Failed to alert.");
}
}
} else {
if (options.getSuccessEmails() != null
- && !options.getSuccessEmails().isEmpty()) {
+ && !options.getSuccessEmails().isEmpty()) {
try {
mailAlerter.alertOnSuccess(flow);
@@ -1084,7 +1208,7 @@ public class ExecutorManager extends EventHandler implements
}
} else {
logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
+ + " doesn't exist. Failed to alert.");
}
}
}
@@ -1127,7 +1251,7 @@ public class ExecutorManager extends EventHandler implements
private void evictOldRecentlyFinished(long ageMs) {
ArrayList<Integer> recentlyFinishedKeys =
- new ArrayList<Integer>(recentlyFinished.keySet());
+ new ArrayList<Integer>(recentlyFinished.keySet());
long oldAgeThreshold = System.currentTimeMillis() - ageMs;
for (Integer key : recentlyFinishedKeys) {
ExecutableFlow flow = recentlyFinished.get(key);
@@ -1140,20 +1264,20 @@ public class ExecutorManager extends EventHandler implements
}
private ExecutableFlow updateExecution(Map<String, Object> updateData)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
Integer execId =
- (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+ (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
if (execId == null) {
throw new ExecutorManagerException(
- "Response is malformed. Need exec id to update.");
+ "Response is malformed. Need exec id to update.");
}
Pair<ExecutionReference, ExecutableFlow> refPair =
- this.runningFlows.get(execId);
+ this.runningFlows.get(execId);
if (refPair == null) {
throw new ExecutorManagerException(
- "No running flow found with the execution id. Removing " + execId);
+ "No running flow found with the execution id. Removing " + execId);
}
ExecutionReference ref = refPair.getFirst();
@@ -1195,7 +1319,7 @@ public class ExecutorManager extends EventHandler implements
}
} else {
logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
+ + " doesn't exist. Failed to alert.");
}
}
}
@@ -1215,22 +1339,22 @@ public class ExecutorManager extends EventHandler implements
}
private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
- List<Integer> executionIds, List<Long> updateTimes) {
+ List<Integer> executionIds, List<Long> updateTimes) {
for (ExecutableFlow flow : flows) {
executionIds.add(flow.getExecutionId());
updateTimes.add(flow.getUpdateTime());
}
}
- private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
- HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
- new HashMap<ConnectionInfo, List<ExecutableFlow>>();
+ private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
+ HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+ new HashMap<Executor, List<ExecutableFlow>>();
- ConnectionInfo lastPort = new ConnectionInfo(executorHost, executorPort);
for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
- .values()) {
+ .values()) {
ExecutionReference ref = runningFlow.getFirst();
ExecutableFlow flow = runningFlow.getSecond();
+ Executor executor = ref.getExecutor();
// We can set the next check time to prevent the checking of certain
// flows.
@@ -1238,16 +1362,10 @@ public class ExecutorManager extends EventHandler implements
continue;
}
- // Just a silly way to reduce object creation construction of objects
- // since it's most likely that the values will be the same.
- if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
- lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
- }
-
- List<ExecutableFlow> flows = exFlowMap.get(lastPort);
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
if (flows == null) {
flows = new ArrayList<ExecutableFlow>();
- exFlowMap.put(lastPort, flows);
+ exFlowMap.put(executor, flows);
}
flows.add(flow);
@@ -1256,76 +1374,110 @@ public class ExecutorManager extends EventHandler implements
return exFlowMap;
}
- private static class ConnectionInfo {
- private String host;
- private int port;
+ @Override
+ public int getExecutableFlows(int projectId, String flowId, int from,
+ int length, List<ExecutableFlow> outputList)
+ throws ExecutorManagerException {
+ List<ExecutableFlow> flows =
+ executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+ outputList.addAll(flows);
+ return executorLoader.fetchNumExecutableFlows(projectId, flowId);
+ }
- public ConnectionInfo(String host, int port) {
- this.host = host;
- this.port = port;
+ @Override
+ public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
+ int from, int length, Status status) throws ExecutorManagerException {
+ return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
+ status);
+ }
+
+ /*
+ * This thread is responsible for processing queued flows.
+ */
+ private class QueueProcessorThread extends Thread {
+ private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+ private boolean shutdown = false;
+ private long lastProcessingTime = -1;
+ private long maxContinousSubmission = 10;
+
+ public QueueProcessorThread() {
+ this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
@SuppressWarnings("unused")
- private ConnectionInfo getOuterType() {
- return ConnectionInfo.this;
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
}
- public boolean isEqual(String host, int port) {
- return this.port == port && this.host.equals(host);
+ public void run() {
+ while (!shutdown) {
+ synchronized (this) {
+ try {
+ refreshExecutors();
+ lastCleanerThreadCheckTime = System.currentTimeMillis();
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
+ processQueuedFlows(maxContinousSubmission);
+ lastProcessingTime = currentTime;
+ }
+ wait(QUEUE_PROCESSOR_WAIT_IN_MS);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+ }
+ }
}
+ }
- public String getHost() {
- return host;
- }
+ private void refreshExecutors() {
+ // TODO: rest api call to refresh executor stats
+ }
- public int getPort() {
- return port;
- }
+ private void processQueuedFlows(long maxContinousSubmission) {
+ try {
+ Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+ int submissionNum = 0;
+ while (submissionNum < maxContinousSubmission
+ && (runningCandidate = queuedFlows.peek()) != null) {
+ synchronized (runningCandidate) {
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
- }
+ ExecutionReference reference = runningCandidate.getFirst();
+ ExecutableFlow exflow = runningCandidate.getSecond();
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ConnectionInfo other = (ConnectionInfo) obj;
- if (host == null) {
- if (other.host != null)
- return false;
- } else if (!host.equals(other.host))
- return false;
- if (port != other.port)
- return false;
- return true;
- }
- }
+ // TODO: use dispatcher
+ Executor choosenExecutor;
- @Override
- public int getExecutableFlows(int projectId, String flowId, int from,
- int length, List<ExecutableFlow> outputList)
- throws ExecutorManagerException {
- List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(projectId, flowId, from, length);
- outputList.addAll(flows);
- return executorLoader.fetchNumExecutableFlows(projectId, flowId);
- }
+ synchronized (activeExecutors) {
+ choosenExecutor = activeExecutors.iterator().next();
+ }
- @Override
- public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
- int from, int length, Status status) throws ExecutorManagerException {
- return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
- status);
+ if (choosenExecutor != null) {
+ queuedFlows.poll();
+ queuedFlowMap.remove(exflow.getExecutionId());
+
+ try {
+ reference.setExecutor(choosenExecutor);
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
+ // TODO: ADD rest call to do an actual dispatch
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ runningFlows
+ .put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference,
+ exflow));
+ } catch (ExecutorManagerException e) {
+ logger.error("Failed to process queued flow", e);
+ // TODO: allow N errors and re-try
+ finalizeFlows(exflow);
+ }
+ }
+ }
+ submissionNum++;
+ }
+ } catch (Throwable th) {
+ logger.error("Failed to process the queue", th);
+ }
}
/*
@@ -1336,7 +1488,7 @@ public class ExecutorManager extends EventHandler implements
// check every day
private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
- 24 * 60 * 60 * 1000;
+ 24 * 60 * 60 * 1000;
private final long executionLogsRetentionMs;
@@ -1379,9 +1531,9 @@ public class ExecutorManager extends EventHandler implements
logger.info("Cleaning old logs from execution_logs");
long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
logger.info("Cleaning old log files before "
- + new DateTime(cutoff).toString());
+ + new DateTime(cutoff).toString());
cleanOldExecutionLogs(DateTime.now().getMillis()
- - executionLogsRetentionMs);
+ - executionLogsRetentionMs);
}
}
}