diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 92feba9..4549ef0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -94,12 +94,14 @@ public class ExecutorManager extends EventHandler implements
private CleanerThread cleanerThread;
+ private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
- new ConcurrentHashMap<>();
+ new ConcurrentHashMap<Integer, ExecutableFlow>();
QueuedExecutions queuedFlows;
- final private Set<Executor> activeExecutors = new HashSet<>();
+ final private Set<Executor> activeExecutors = new HashSet<Executor>();
private QueueProcessorThread queueProcessor;
private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
@@ -128,6 +130,7 @@ public class ExecutorManager extends EventHandler implements
this.azkProps = azkProps;
this.executorLoader = loader;
this.setupExecutors();
+ this.loadRunningFlows();
queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
@@ -417,23 +420,23 @@ public class ExecutorManager extends EventHandler implements
@Override
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
- HashSet<String> ports = new HashSet<>();
+ HashSet<String> ports = new HashSet<String>();
for (Executor executor : activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
- try {
- // include executor which were initially active and still has flows running
- for (Pair<ExecutionReference, ExecutableFlow> running :
- executorLoader.fetchActiveFlows().values()) {
- ExecutionReference ref = running.getFirst();
- ports.add(ref.getHost() + ":" + ref.getPort());
- }
- } catch(ExecutorManagerException e) {
- logger.error(e);
+ // include executor which were initially active and still has flows running
+ for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
+ .values()) {
+ ExecutionReference ref = running.getFirst();
+ ports.add(ref.getHost() + ":" + ref.getPort());
}
return ports;
}
+ private void loadRunningFlows() throws ExecutorManagerException {
+ runningFlows.putAll(executorLoader.fetchActiveFlows());
+ }
+
/*
* load queued flows i.e with active_execution_reference and not assigned to
* any executor
@@ -466,12 +469,8 @@ public class ExecutorManager extends EventHandler implements
if (runningCandidate != null) {
executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
}
- try {
- executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
- executorLoader.fetchActiveFlows().values()));
- } catch(ExecutorManagerException e) {
- logger.error(e);
- }
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ runningFlows.values()));
Collections.sort(executionIds);
return executionIds;
}
@@ -498,13 +497,10 @@ public class ExecutorManager extends EventHandler implements
@Override
public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
throws IOException {
- List<Pair<ExecutableFlow, Executor>> flows = new ArrayList<>();
+ List<Pair<ExecutableFlow, Executor>> flows =
+ new ArrayList<Pair<ExecutableFlow, Executor>>();
getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
- try {
- getActiveFlowsWithExecutorHelper(flows, executorLoader.fetchActiveFlows().values());
- } catch(ExecutorManagerException e) {
- logger.error(e);
- }
+ getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
return flows;
}
@@ -531,12 +527,9 @@ public class ExecutorManager extends EventHandler implements
isRunning =
isRunning
|| isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
- try {
- isRunning = isRunning || isFlowRunningHelper(projectId, flowId,
- executorLoader.fetchActiveFlows().values());
- } catch(ExecutorManagerException e) {
- logger.error(e);
- }
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, runningFlows.values());
return isRunning;
}
@@ -572,13 +565,9 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public List<ExecutableFlow> getRunningFlows() {
- ArrayList<ExecutableFlow> flows = new ArrayList<>();
+ ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
getActiveFlowHelper(flows, queuedFlows.getAllEntries());
- try {
- getActiveFlowHelper(flows, executorLoader.fetchActiveFlows().values());
- } catch(ExecutorManagerException e) {
- logger.error(e);
- }
+ getActiveFlowHelper(flows, runningFlows.values());
return flows;
}
@@ -601,13 +590,9 @@ public class ExecutorManager extends EventHandler implements
* @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
*/
public String getRunningFlowIds() {
- List<Integer> allIds = new ArrayList<>();
+ List<Integer> allIds = new ArrayList<Integer>();
getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
- try {
- getRunningFlowsIdsHelper(allIds, executorLoader.fetchActiveFlows().values());
- } catch(ExecutorManagerException e) {
- logger.error(e);
- }
+ getRunningFlowsIdsHelper(allIds, runningFlows.values());
Collections.sort(allIds);
return allIds.toString();
}
@@ -702,7 +687,7 @@ public class ExecutorManager extends EventHandler implements
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
int length) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
Pair<String, String> offsetParam =
@@ -727,7 +712,7 @@ public class ExecutorManager extends EventHandler implements
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
@@ -756,7 +741,7 @@ public class ExecutorManager extends EventHandler implements
public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
attempt);
@@ -781,9 +766,9 @@ public class ExecutorManager extends EventHandler implements
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
String jobId, int offset, int length, int attempt)
throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> activeFlow =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
- if (activeFlow != null) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
@@ -797,7 +782,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(activeFlow.getFirst(), ConnectorParams.METADATA_ACTION,
+ callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
} else {
@@ -816,10 +801,10 @@ public class ExecutorManager extends EventHandler implements
public void cancelFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> activeFlow =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
- if(activeFlow != null) {
- callExecutorServer(activeFlow.getFirst(), ConnectorParams.CANCEL_ACTION,
+ if (runningFlows.containsKey(exFlow.getExecutionId())) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
userId);
} else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
queuedFlows.dequeue(exFlow.getExecutionId());
@@ -836,14 +821,14 @@ public class ExecutorManager extends EventHandler implements
public void resumeFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> activeFlow =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
- if (activeFlow == null) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
- callExecutorServer(activeFlow.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+ callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
}
}
@@ -851,14 +836,14 @@ public class ExecutorManager extends EventHandler implements
public void pauseFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> activeFlow =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
- if (activeFlow == null) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
- callExecutorServer(activeFlow.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+ callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
}
}
@@ -916,7 +901,7 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1157,7 +1142,6 @@ public class ExecutorManager extends EventHandler implements
paramList = new ArrayList<Pair<String, String>>();
}
- // TODO: refactor using Guice, inject ExecutorApiClient in ExecutorManager
ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
@SuppressWarnings("unchecked")
URI uri =
@@ -1256,8 +1240,6 @@ public class ExecutorManager extends EventHandler implements
new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows =
new ArrayList<ExecutableFlow>();
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
- executorLoader.fetchActiveFlows();
if (exFlowMap.size() > 0) {
for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
@@ -1293,22 +1275,15 @@ public class ExecutorManager extends EventHandler implements
} catch (IOException e) {
logger.error(e);
for (ExecutableFlow flow : entry.getValue()) {
- Pair<ExecutionReference, ExecutableFlow> activeFlow =
- activeFlows.get(flow.getExecutionId());
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(flow.getExecutionId());
updaterStage =
"Failed to get update. Doing some clean up for flow "
- + flow.getExecutionId();
-
- // The failure retry logic below won't work after removing the runningFlow
- // cache. numErrors and nextCheckTime are not stored in DB. So whenever we
- // fetch active flows from DB, numErrors will be initialized to default 0
- // and nexCheckTime will be -1. numErrors will never reach threshold and
- // flows will never be finalized in below case.
- // todo: jamiesjc will remove updaterThread and add separate clean up code
- // to handle errors.
- if (activeFlow != null) {
- ExecutionReference ref = activeFlow.getFirst();
+ + pair.getSecond().getExecutionId();
+
+ if (pair != null) {
+ ExecutionReference ref = pair.getFirst();
int numErrors = ref.getNumErrors();
if (ref.getNumErrors() < this.numErrors) {
ref.setNextCheckTime(System.currentTimeMillis()
@@ -1318,7 +1293,7 @@ public class ExecutorManager extends EventHandler implements
logger.error("Evicting flow " + flow.getExecutionId()
+ ". The executor is unresponsive.");
// TODO should send out an unresponsive email here.
- finalizeFlows.add(activeFlow.getSecond());
+ finalizeFlows.add(pair.getSecond());
}
}
}
@@ -1340,9 +1315,13 @@ public class ExecutorManager extends EventHandler implements
finalizeFlows.add(flow);
}
} catch (ExecutorManagerException e) {
- // Currently just ignore the update error. Will remove UpdaterThread and
- // add separate clean up code to handle errors.
- logger.error("Update execution failed. Ignored. ", e);
+ ExecutableFlow flow = e.getExecutableFlow();
+ logger.error(e);
+
+ if (flow != null) {
+ logger.error("Finalizing flow " + flow.getExecutionId());
+ finalizeFlows.add(flow);
+ }
}
}
}
@@ -1375,7 +1354,7 @@ public class ExecutorManager extends EventHandler implements
synchronized (this) {
try {
- if (activeFlows.size() > 0) {
+ if (runningFlows.size() > 0) {
this.wait(waitTimeMs);
} else {
this.wait(waitTimeIdleMs);
@@ -1423,6 +1402,7 @@ public class ExecutorManager extends EventHandler implements
executorLoader.removeActiveExecutableReference(execId);
updaterStage = "finalizing flow " + execId + " cleaning from memory";
+ runningFlows.remove(execId);
fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
recentlyFinished.put(execId, dsFlow);
@@ -1550,15 +1530,15 @@ public class ExecutorManager extends EventHandler implements
"Response is malformed. Need exec id to update.");
}
- Pair<ExecutionReference, ExecutableFlow> activeFlow =
- executorLoader.fetchActiveFlowByExecId(execId);
- if (activeFlow == null) {
+ Pair<ExecutionReference, ExecutableFlow> refPair =
+ this.runningFlows.get(execId);
+ if (refPair == null) {
throw new ExecutorManagerException(
"No running flow found with the execution id. Removing " + execId);
}
- ExecutionReference ref = activeFlow.getFirst();
- ExecutableFlow flow = activeFlow.getSecond();
+ ExecutionReference ref = refPair.getFirst();
+ ExecutableFlow flow = refPair.getSecond();
if (updateData.containsKey("error")) {
// The flow should be finished here.
throw new ExecutorManagerException((String) updateData.get("error"), flow);
@@ -1630,31 +1610,27 @@ public class ExecutorManager extends EventHandler implements
/* Group Executable flow by Executors to reduce number of REST calls */
private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
HashMap<Executor, List<ExecutableFlow>> exFlowMap =
- new HashMap<>();
+ new HashMap<Executor, List<ExecutableFlow>>();
- try {
- for (Pair<ExecutionReference, ExecutableFlow> runningFlow :
- executorLoader.fetchActiveFlows().values()) {
- ExecutionReference ref = runningFlow.getFirst();
- ExecutableFlow flow = runningFlow.getSecond();
- Executor executor = ref.getExecutor();
-
- // We can set the next check time to prevent the checking of certain
- // flows.
- if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
- continue;
- }
+ for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
+ .values()) {
+ ExecutionReference ref = runningFlow.getFirst();
+ ExecutableFlow flow = runningFlow.getSecond();
+ Executor executor = ref.getExecutor();
- List<ExecutableFlow> flows = exFlowMap.get(executor);
- if (flows == null) {
- flows = new ArrayList<>();
- exFlowMap.put(executor, flows);
- }
+ // We can set the next check time to prevent the checking of certain
+ // flows.
+ if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+ continue;
+ }
- flows.add(flow);
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
+ if (flows == null) {
+ flows = new ArrayList<ExecutableFlow>();
+ exFlowMap.put(executor, flows);
}
- } catch(ExecutorManagerException e) {
- logger.error(e);
+
+ flows.add(flow);
}
return exFlowMap;
@@ -1756,6 +1732,10 @@ public class ExecutorManager extends EventHandler implements
}
reference.setExecutor(choosenExecutor);
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
logger.info(String.format(
"Successfully dispatched exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));