diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c9633ba..ed64364 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -24,7 +24,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -32,7 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.lang.StringUtils;
@@ -63,50 +62,53 @@ import azkaban.utils.Props;
*
*/
public class ExecutorManager extends EventHandler implements
- ExecutorManagerAdapter {
+ ExecutorManagerAdapter {
+ static final String AZKABAN_QUEUEPROCESSING_ENABLED =
+ "azkaban.queueprocessing.enabled";
static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
"azkaban.use.multiple.executors";
- static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+ private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
"azkaban.webserver.queue.size";
+
private static Logger logger = Logger.getLogger(ExecutorManager.class);
- final private ExecutorLoader executorLoader;
+ private ExecutorLoader executorLoader;
- final private CleanerThread cleanerThread;
+ private CleanerThread cleanerThread;
- final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
- new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
- final private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
- new ConcurrentHashMap<Integer, ExecutableFlow>();
+ private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
+ new ConcurrentHashMap<Integer, ExecutableFlow>();
/* map to easily access queued flows */
final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
/* web server side queue */
- final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList =
new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
new ExecutableFlowPriorityComparator());
final private Set<Executor> activeExecutors = new HashSet<Executor>();
+ final private long webserverQueueCapacity;
+ private QueueProcessorThread queueProcessor;
- final private ExecutingManagerUpdaterThread executingManager;
- final private QueueProcessorThread queueProcessor;
-
+ private ExecutingManagerUpdaterThread executingManager;
+ // 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
- * 24 * 60 * 60 * 1000l;
-
+ * 24 * 60 * 60 * 1000l;
private long lastCleanerThreadCheckTime = -1;
- final private long webserverQueueCapacity;
+
private long lastThreadCheckTime = -1;
private String updaterStage = "not started";
- final private Map<String, Alerter> alerters;
-
- final Props azkProps;
+ private Map<String, Alerter> alerters;
File cacheDir;
+ final Props azkProps;
+
public ExecutorManager(Props props, ExecutorLoader loader,
- Map<String, Alerter> alters) throws ExecutorManagerException {
+ Map<String, Alerter> alters) throws ExecutorManagerException {
azkProps = props;
this.executorLoader = loader;
@@ -121,28 +123,34 @@ public class ExecutorManager extends EventHandler implements
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
- queueProcessor = new QueueProcessorThread();
- queueProcessor.start();
+ if(isMultiExecutorMode()) {
+ queueProcessor =
+ new QueueProcessorThread(azkProps.getBoolean(
+ AZKABAN_QUEUEPROCESSING_ENABLED, true));
+ queueProcessor.start();
+ }
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ props.getLong("execution.logs.retention.ms",
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
webserverQueueCapacity =
- props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
+ props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
+
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
+
}
/**
* <pre>
* Setup activeExecutors using azkaban.properties and database executors
* Note:
- * 1. If a local executor is specified and it is missing from db,
+ * 1. If azkaban.use.multiple.executors is set true, this method will
+ * load all active executors
+ * 2. In local mode, If a local executor is specified and it is missing from db,
* this method add local executor as active in DB
- * 2. If a local executor is specified and it is marked inactive in db,
+ * 3. In local mode, If a local executor is specified and it is marked inactive in db,
* this method will convert local executor as active in DB
- * 3. If azkaban.use.multiple.executors is set true, this method will
- * load all active projects
* </pre>
*
* @throws ExecutorManagerException
@@ -150,10 +158,15 @@ public class ExecutorManager extends EventHandler implements
public void setupExecutors() throws ExecutorManagerException {
Set<Executor> newExecutors = new HashSet<Executor>();
- // Add local executor, if specified as per properties
- if (azkProps.containsKey("executor.port")) {
+ if (isMultiExecutorMode()) {
+ logger.info("Initializing multi executors from database");
+ newExecutors.addAll(executorLoader.fetchActiveExecutors());
+ } else if (azkProps.containsKey("executor.port")) {
+ // Add local executor, if specified as per properties
String executorHost = azkProps.getString("executor.host", "localhost");
int executorPort = azkProps.getInt("executor.port");
+ logger.info(String.format("Initializing local executor %s:%d",
+ executorHost, executorPort));
Executor executor =
executorLoader.fetchExecutor(executorHost, executorPort);
if (executor == null) {
@@ -166,12 +179,10 @@ public class ExecutorManager extends EventHandler implements
executorPort, true));
}
- if (azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false)) {
- newExecutors.addAll(executorLoader.fetchActiveExecutors());
- }
-
if (newExecutors.isEmpty()) {
throw new ExecutorManagerException("No active executor found");
+ } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+ throw new ExecutorManagerException("Multiple local executors specified");
} else {
// clear all active executors, only if we have at least one new active
// executors
@@ -180,22 +191,52 @@ public class ExecutorManager extends EventHandler implements
}
}
+ private boolean isMultiExecutorMode() {
+ return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+ }
+
+ /**
+ * Refresh Executor stats for all the actie executors in this executorManager
+ */
+ private void refreshExecutors() {
+ synchronized (activeExecutors) {
+ // TODO: rest api call to refresh executor stats
+ }
+ }
+
/**
* Disable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
*/
- public void disableQueueProcessorThread() {
- queueProcessor.setActive(false);
+ public void disableQueueProcessorThread() throws ExecutorManagerException {
+ if (isMultiExecutorMode()) {
+ queueProcessor.setActive(false);
+ } else {
+ throw new ExecutorManagerException(
+ "Cannot disable QueueProcessor in local mode");
+ }
}
/**
* Enable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
*/
- public void enableQueueProcessorThread() {
- queueProcessor.setActive(true);
+ public void enableQueueProcessorThread() throws ExecutorManagerException {
+ if (isMultiExecutorMode()) {
+ queueProcessor.setActive(true);
+ } else {
+ throw new ExecutorManagerException(
+ "Cannot enable QueueProcessor in local mode");
+ }
}
public State getQueueProcessorThreadState() {
- return queueProcessor.getState();
+ if (isMultiExecutorMode())
+ return queueProcessor.getState();
+ else
+ return State.NEW; // not started in local mode
}
/**
@@ -205,7 +246,10 @@ public class ExecutorManager extends EventHandler implements
* @return
*/
public boolean isQueueProcessorThreadActive() {
- return queueProcessor.isActive();
+ if (isMultiExecutorMode())
+ return queueProcessor.isActive();
+ else
+ return false;
}
@Override
@@ -232,8 +276,8 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public Set<Executor> getAllActiveExecutors() {
- return activeExecutors;
+ public Collection<Executor> getAllActiveExecutors() {
+ return Collections.unmodifiableCollection(activeExecutors);
}
/**
@@ -254,7 +298,7 @@ public class ExecutorManager extends EventHandler implements
@Override
public Set<String> getPrimaryServerHosts() {
- // TODO: do we want to have a primary
+ // Only one for now. More probably later.
HashSet<String> ports = new HashSet<String>();
for (Executor executor : activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
@@ -296,32 +340,33 @@ public class ExecutorManager extends EventHandler implements
}
/**
- * Gets a list of all the active (running, non-dispatched) executions for a
- * given project and flow {@inheritDoc}
+ * Gets a list of all the active (running flows and non-dispatched flows)
+ * executions for a given project and flow {@inheritDoc}
*
* @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
* java.lang.String)
*/
@Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
- ArrayList<Integer> executionIds = new ArrayList<Integer>();
- getRunningFlowsHelper(projectId, flowId, executionIds,
- queuedFlowMap.values());
- getRunningFlowsHelper(projectId, flowId, executionIds,
- runningFlows.values());
+ List<Integer> executionIds = new ArrayList<Integer>();
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ queuedFlowMap.values()));
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ runningFlows.values()));
return executionIds;
}
/* Helper method for getRunningFlows */
- private void getRunningFlowsHelper(int projectId, String flowId,
- ArrayList<Integer> executionIds,
+ private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ List<Integer> executionIds = new ArrayList<Integer>();
for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getFlowId().equals(flowId)
&& ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
+ return executionIds;
}
/**
@@ -462,141 +507,136 @@ public class ExecutorManager extends EventHandler implements
}
}
- /**
- * Get recently finished flows {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorManagerAdapter#getRecentlyFinishedFlows()
- */
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@Override
public List<ExecutableFlow> getExecutableFlows(Project project,
- String flowId, int skip, int size) throws ExecutorManagerException {
+ String flowId, int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+ executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
- int skip, int size) throws ExecutorManagerException {
+ int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
- 0, -1, -1, skip, size);
+ executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+ 0, -1, -1, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(String projContain,
- String flowContain, String userContain, int status, long begin, long end,
- int skip, int size) throws ExecutorManagerException {
+ String flowContain, String userContain, int status, long begin, long end,
+ int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
- status, begin, end, skip, size);
+ executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+ status, begin, end, skip, size);
return flows;
}
@Override
public List<ExecutableJobInfo> getExecutableJobs(Project project,
- String jobId, int skip, int size) throws ExecutorManagerException {
+ String jobId, int skip, int size) throws ExecutorManagerException {
List<ExecutableJobInfo> nodes =
- executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+ executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
return nodes;
}
@Override
public int getNumberOfJobExecutions(Project project, String jobId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
}
@Override
public int getNumberOfExecutions(Project project, String flowId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
}
@Override
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
- int length) throws ExecutorManagerException {
+ int length) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
+ new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
+ new Pair<String, String>("length", String.valueOf(length));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
- typeParam, offsetParam, lengthParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ typeParam, offsetParam, lengthParam);
return LogData.createLogDataFromObject(result);
} else {
LogData value =
- executorLoader
- .fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
+ executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
+ length);
return value;
}
}
@Override
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
- int offset, int length, int attempt) throws ExecutorManagerException {
+ int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
- new Pair<String, String>("jobId", jobId);
+ new Pair<String, String>("jobId", jobId);
Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
+ new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
+ new Pair<String, String>("length", String.valueOf(length));
Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
- typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+ typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return LogData.createLogDataFromObject(result);
} else {
LogData value =
- executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
- offset, length);
+ executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+ offset, length);
return value;
}
}
@Override
public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
- int attempt) throws ExecutorManagerException {
+ int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
- attempt);
+ attempt);
}
Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
- jobIdParam, attemptParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+ jobIdParam, attemptParam);
@SuppressWarnings("unchecked")
List<Object> jobStats = (List<Object>) result.get("attachments");
@@ -606,26 +646,26 @@ public class ExecutorManager extends EventHandler implements
@Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
- String jobId, int offset, int length, int attempt)
- throws ExecutorManagerException {
+ String jobId, int offset, int length, int attempt)
+ throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
- new Pair<String, String>("jobId", jobId);
+ new Pair<String, String>("jobId", jobId);
Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
+ new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
+ new Pair<String, String>("length", String.valueOf(length));
Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
- callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
- typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+ typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
} else {
return null;
@@ -665,14 +705,14 @@ public class ExecutorManager extends EventHandler implements
@Override
public void resumeFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
}
@@ -680,14 +720,14 @@ public class ExecutorManager extends EventHandler implements
@Override
public void pauseFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
}
@@ -695,63 +735,63 @@ public class ExecutorManager extends EventHandler implements
@Override
public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void retryFailures(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
}
@Override
public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
- jobIds);
+ jobIds);
}
@Override
public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
- jobIds);
+ jobIds);
}
@SuppressWarnings("unchecked")
private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
- String command, String userId, String... jobIds)
- throws ExecutorManagerException {
+ String command, String userId, String... jobIds)
+ throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
Map<String, Object> response = null;
@@ -761,24 +801,24 @@ public class ExecutorManager extends EventHandler implements
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
throw new ExecutorManagerException("Job " + jobId
- + " doesn't exist in execution " + exFlow.getExecutionId()
- + ".");
+ + " doesn't exist in execution " + exFlow.getExecutionId()
+ + ".");
}
}
}
String ids = StringUtils.join(jobIds, ',');
response =
- callExecutorServer(pair.getFirst(),
- ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
- new Pair<String, String>(
- ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
- new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+ callExecutorServer(pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+ new Pair<String, String>(
+ ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+ new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
} else {
response =
- callExecutorServer(pair.getFirst(),
- ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
- new Pair<String, String>(
- ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+ callExecutorServer(pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+ new Pair<String, String>(
+ ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
}
return response;
@@ -786,7 +826,7 @@ public class ExecutorManager extends EventHandler implements
}
private void applyDisabledJobs(List<Object> disabledJobs,
- ExecutableFlowBase exflow) {
+ ExecutableFlowBase exflow) {
for (Object disabled : disabledJobs) {
if (disabled instanceof String) {
String nodeName = (String) disabled;
@@ -800,7 +840,7 @@ public class ExecutorManager extends EventHandler implements
String nodeName = (String) nestedDisabled.get("id");
@SuppressWarnings("unchecked")
List<Object> subDisabledJobs =
- (List<Object>) nestedDisabled.get("children");
+ (List<Object>) nestedDisabled.get("children");
if (nodeName == null || subDisabledJobs == null) {
return;
@@ -823,7 +863,7 @@ public class ExecutorManager extends EventHandler implements
logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
- if (queuedFlows.size() >= webserverQueueCapacity) {
+ if (queuedFlowList.size() >= webserverQueueCapacity) {
message =
String
.format(
@@ -841,6 +881,7 @@ public class ExecutorManager extends EventHandler implements
if (options == null) {
options = new ExecutionOptions();
}
+
if (options.getDisabledJobs() != null) {
applyDisabledJobs(options.getDisabledJobs(), exflow);
}
@@ -883,9 +924,25 @@ public class ExecutorManager extends EventHandler implements
// fails, we remove the reference.
ExecutionReference reference =
new ExecutionReference(exflow.getExecutionId());
- // Added to db queue
- executorLoader.addActiveExecutableReference(reference);
- enqueueFlow(exflow, reference);
+
+ if (isMultiExecutorMode()) {
+ //Take MultiExecutor route
+ executorLoader.addActiveExecutableReference(reference);
+ enqueueFlow(exflow, reference);
+ } else {
+ // assign only local executor we have
+ reference.setExecutor(activeExecutors.iterator().next());
+ executorLoader.addActiveExecutableReference(reference);
+ try {
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ } catch (ExecutorManagerException e) {
+ executorLoader.removeActiveExecutableReference(reference
+ .getExecId());
+ throw e;
+ }
+ }
message +=
"Execution submitted successfully with exec id "
+ exflow.getExecutionId();
@@ -894,10 +951,27 @@ public class ExecutorManager extends EventHandler implements
}
}
+
+ /**
+ * Wraps BoundedQueue Take method to have a corresponding update in
+ * queuedFlowMap lookup table
+ *
+ * @return
+ * @throws InterruptedException
+ */
+ private Pair<ExecutionReference, ExecutableFlow> waitAndFetchQueueHead()
+ throws InterruptedException {
+ Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+ if (pair != null && pair.getFirst() != null) {
+ queuedFlowMap.remove(pair.getFirst().getExecId());
+ }
+ return pair;
+ }
+
/* Helper method to have a single point of deletion in the queued flows */
private void dequeueFlow(int executionId) {
- if (queuedFlowMap.contains(executionId)) {
- queuedFlows.remove(queuedFlowMap.get(executionId));
+ if (queuedFlowMap.containsKey(executionId)) {
+ queuedFlowList.remove(queuedFlowMap.get(executionId));
queuedFlowMap.remove(executionId);
}
}
@@ -909,7 +983,7 @@ public class ExecutorManager extends EventHandler implements
new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
try {
queuedFlowMap.put(exflow.getExecutionId(), pair);
- queuedFlows.put(pair);
+ queuedFlowList.put(pair);
} catch (InterruptedException e) {
String errMsg = "Failed to queue flow " + exflow.getExecutionId();
logger.error(errMsg, e);
@@ -928,54 +1002,50 @@ public class ExecutorManager extends EventHandler implements
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action) throws ExecutorManagerException {
+ String action) throws ExecutorManagerException {
try {
- Executor executor = ref.getExecutor();
- return callExecutorServer(executor.getHost(), executor.getPort(), action,
- ref.getExecId(), null, (Pair<String, String>[]) null);
+ return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ ref.getExecId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, String user) throws ExecutorManagerException {
+ String action, String user) throws ExecutorManagerException {
try {
- Executor executor = ref.getExecutor();
- return callExecutorServer(executor.getHost(), executor.getPort(), action,
- ref.getExecId(), user, (Pair<String, String>[]) null);
+ return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ ref.getExecId(), user, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, Pair<String, String>... params)
- throws ExecutorManagerException {
+ String action, Pair<String, String>... params)
+ throws ExecutorManagerException {
try {
- Executor executor = ref.getExecutor();
- return callExecutorServer(executor.getHost(), executor.getPort(), action,
- ref.getExecId(), null, params);
+ return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ ref.getExecId(), null, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, String user, Pair<String, String>... params)
- throws ExecutorManagerException {
+ String action, String user, Pair<String, String>... params)
+ throws ExecutorManagerException {
try {
- Executor executor = ref.getExecutor();
- return callExecutorServer(executor.getHost(), executor.getPort(), action,
- ref.getExecId(), user, params);
+ return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ ref.getExecId(), user, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(String host, int port,
- String action, Integer executionId, String user,
- Pair<String, String>... params) throws IOException {
+ String action, Integer executionId, String user,
+ Pair<String, String>... params) throws IOException {
URIBuilder builder = new URIBuilder();
builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
@@ -983,7 +1053,7 @@ public class ExecutorManager extends EventHandler implements
if (executionId != null) {
builder.setParameter(ConnectorParams.EXECID_PARAM,
- String.valueOf(executionId));
+ String.valueOf(executionId));
}
if (user != null) {
@@ -1018,7 +1088,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
@@ -1038,8 +1108,7 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public Map<String, Object> callExecutorStats(int executorId, String action,
- Pair<String, String>... params) throws IOException,
- ExecutorManagerException {
+ Pair<String, String>... params) throws IOException, ExecutorManagerException {
URIBuilder builder = new URIBuilder();
Executor executor = fetchExecutor(executorId);
@@ -1081,14 +1150,15 @@ public class ExecutorManager extends EventHandler implements
return jsonResponse;
}
+
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
- String mBean) throws IOException {
+ String mBean) throws IOException {
URIBuilder builder = new URIBuilder();
String[] hostPortSplit = hostPort.split(":");
builder.setScheme("http").setHost(hostPortSplit[0])
- .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+ .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
builder.setParameter(action, "");
if (mBean != null) {
@@ -1117,7 +1187,7 @@ public class ExecutorManager extends EventHandler implements
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(response);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
@@ -1127,10 +1197,175 @@ public class ExecutorManager extends EventHandler implements
@Override
public void shutdown() {
- queueProcessor.shutdown();
+ if (isMultiExecutorMode()) {
+ queueProcessor.shutdown();
+ }
executingManager.shutdown();
}
+ private class ExecutingManagerUpdaterThread extends Thread {
+ private boolean shutdown = false;
+
+ public ExecutingManagerUpdaterThread() {
+ this.setName("ExecutorManagerUpdaterThread");
+ }
+
+ // 10 mins recently finished threshold.
+ private long recentlyFinishedLifetimeMs = 600000;
+ private int waitTimeIdleMs = 2000;
+ private int waitTimeMs = 500;
+
+ // When we have an http error, for that flow, we'll check every 10 secs, 6
+ // times (1 mins) before we evict.
+ private int numErrors = 6;
+ private long errorThreshold = 10000;
+
+ private void shutdown() {
+ shutdown = true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ while (!shutdown) {
+ try {
+ lastThreadCheckTime = System.currentTimeMillis();
+ updaterStage = "Starting update all flows.";
+
+ Map<Executor, List<ExecutableFlow>> exFlowMap =
+ getFlowToExecutorMap();
+ ArrayList<ExecutableFlow> finishedFlows =
+ new ArrayList<ExecutableFlow>();
+ ArrayList<ExecutableFlow> finalizeFlows =
+ new ArrayList<ExecutableFlow>();
+
+ if (exFlowMap.size() > 0) {
+ for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+ .entrySet()) {
+ List<Long> updateTimesList = new ArrayList<Long>();
+ List<Integer> executionIdsList = new ArrayList<Integer>();
+
+ Executor executor = entry.getKey();
+
+ updaterStage =
+ "Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort();
+
+ // We pack the parameters of the same host together before we
+ // query.
+ fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+ updateTimesList);
+
+ Pair<String, String> updateTimes =
+ new Pair<String, String>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
+ Pair<String, String> executionIds =
+ new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
+
+ Map<String, Object> results = null;
+ try {
+ results =
+ callExecutorServer(executor.getHost(),
+ executor.getPort(), ConnectorParams.UPDATE_ACTION,
+ null, null, executionIds, updateTimes);
+ } catch (IOException e) {
+ logger.error(e);
+ for (ExecutableFlow flow : entry.getValue()) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(flow.getExecutionId());
+
+ updaterStage =
+ "Failed to get update. Doing some clean up for flow "
+ + pair.getSecond().getExecutionId();
+
+ if (pair != null) {
+ ExecutionReference ref = pair.getFirst();
+ int numErrors = ref.getNumErrors();
+ if (ref.getNumErrors() < this.numErrors) {
+ ref.setNextCheckTime(System.currentTimeMillis()
+ + errorThreshold);
+ ref.setNumErrors(++numErrors);
+ } else {
+ logger.error("Evicting flow " + flow.getExecutionId()
+ + ". The executor is unresponsive.");
+ // TODO should send out an unresponsive email here.
+ finalizeFlows.add(pair.getSecond());
+ }
+ }
+ }
+ }
+
+ // We gets results
+ if (results != null) {
+ List<Map<String, Object>> executionUpdates =
+ (List<Map<String, Object>>) results
+ .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+ for (Map<String, Object> updateMap : executionUpdates) {
+ try {
+ ExecutableFlow flow = updateExecution(updateMap);
+
+ updaterStage = "Updated flow " + flow.getExecutionId();
+
+ if (isFinished(flow)) {
+ finishedFlows.add(flow);
+ finalizeFlows.add(flow);
+ }
+ } catch (ExecutorManagerException e) {
+ ExecutableFlow flow = e.getExecutableFlow();
+ logger.error(e);
+
+ if (flow != null) {
+ logger.error("Finalizing flow " + flow.getExecutionId());
+ finalizeFlows.add(flow);
+ }
+ }
+ }
+ }
+ }
+
+ updaterStage = "Evicting old recently finished flows.";
+
+ evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
+ // Add new finished
+ for (ExecutableFlow flow : finishedFlows) {
+ if (flow.getScheduleId() >= 0
+ && flow.getStatus() == Status.SUCCEEDED) {
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
+ cacheDir);
+ }
+ fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
+ recentlyFinished.put(flow.getExecutionId(), flow);
+ }
+
+ updaterStage =
+ "Finalizing " + finalizeFlows.size() + " error flows.";
+
+ // Kill error flows
+ for (ExecutableFlow flow : finalizeFlows) {
+ finalizeFlows(flow);
+ }
+ }
+
+ updaterStage = "Updated all active flows. Waiting for next round.";
+
+ synchronized (this) {
+ try {
+ if (runningFlows.size() > 0) {
+ this.wait(waitTimeMs);
+ } else {
+ this.wait(waitTimeIdleMs);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+
private void finalizeFlows(ExecutableFlow flow) {
int execId = flow.getExecutionId();
@@ -1165,7 +1400,6 @@ public class ExecutorManager extends EventHandler implements
updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
-
fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
recentlyFinished.put(execId, dsFlow);
@@ -1183,12 +1417,12 @@ public class ExecutorManager extends EventHandler implements
Alerter mailAlerter = alerters.get("email");
if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
if (options.getFailureEmails() != null
- && !options.getFailureEmails().isEmpty()) {
+ && !options.getFailureEmails().isEmpty()) {
try {
mailAlerter
- .alertOnError(
- flow,
- "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+ .alertOnError(
+ flow,
+ "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
} catch (Exception e) {
logger.error(e);
}
@@ -1199,9 +1433,9 @@ public class ExecutorManager extends EventHandler implements
if (alerter != null) {
try {
alerter
- .alertOnError(
- flow,
- "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+ .alertOnError(
+ flow,
+ "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -1209,12 +1443,12 @@ public class ExecutorManager extends EventHandler implements
}
} else {
logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
+ + " doesn't exist. Failed to alert.");
}
}
} else {
if (options.getSuccessEmails() != null
- && !options.getSuccessEmails().isEmpty()) {
+ && !options.getSuccessEmails().isEmpty()) {
try {
mailAlerter.alertOnSuccess(flow);
@@ -1235,7 +1469,7 @@ public class ExecutorManager extends EventHandler implements
}
} else {
logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
+ + " doesn't exist. Failed to alert.");
}
}
}
@@ -1278,7 +1512,7 @@ public class ExecutorManager extends EventHandler implements
private void evictOldRecentlyFinished(long ageMs) {
ArrayList<Integer> recentlyFinishedKeys =
- new ArrayList<Integer>(recentlyFinished.keySet());
+ new ArrayList<Integer>(recentlyFinished.keySet());
long oldAgeThreshold = System.currentTimeMillis() - ageMs;
for (Integer key : recentlyFinishedKeys) {
ExecutableFlow flow = recentlyFinished.get(key);
@@ -1291,20 +1525,20 @@ public class ExecutorManager extends EventHandler implements
}
private ExecutableFlow updateExecution(Map<String, Object> updateData)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
Integer execId =
- (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+ (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
if (execId == null) {
throw new ExecutorManagerException(
- "Response is malformed. Need exec id to update.");
+ "Response is malformed. Need exec id to update.");
}
Pair<ExecutionReference, ExecutableFlow> refPair =
- this.runningFlows.get(execId);
+ this.runningFlows.get(execId);
if (refPair == null) {
throw new ExecutorManagerException(
- "No running flow found with the execution id. Removing " + execId);
+ "No running flow found with the execution id. Removing " + execId);
}
ExecutionReference ref = refPair.getFirst();
@@ -1346,7 +1580,7 @@ public class ExecutorManager extends EventHandler implements
}
} else {
logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
+ + " doesn't exist. Failed to alert.");
}
}
}
@@ -1366,13 +1600,14 @@ public class ExecutorManager extends EventHandler implements
}
private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
- List<Integer> executionIds, List<Long> updateTimes) {
+ List<Integer> executionIds, List<Long> updateTimes) {
for (ExecutableFlow flow : flows) {
executionIds.add(flow.getExecutionId());
updateTimes.add(flow.getUpdateTime());
}
}
+ /* Group Executable flow by Executors to reduce number of REST calls */
private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
HashMap<Executor, List<ExecutableFlow>> exFlowMap =
new HashMap<Executor, List<ExecutableFlow>>();
@@ -1403,194 +1638,93 @@ public class ExecutorManager extends EventHandler implements
@Override
public int getExecutableFlows(int projectId, String flowId, int from,
- int length, List<ExecutableFlow> outputList)
- throws ExecutorManagerException {
+ int length, List<ExecutableFlow> outputList)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+ executorLoader.fetchFlowHistory(projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
- int from, int length, Status status) throws ExecutorManagerException {
+ int from, int length, Status status) throws ExecutorManagerException {
return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
- status);
+ status);
}
- private class ExecutingManagerUpdaterThread extends Thread {
- private boolean shutdown = false;
+ /*
+ * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
+ */
+ private class CleanerThread extends Thread {
+ // log file retention is 1 month.
- public ExecutingManagerUpdaterThread() {
- this.setName("ExecutorManagerUpdaterThread");
- }
+ // check every day
+ private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
+ 24 * 60 * 60 * 1000;
- // 10 mins recently finished threshold.
- private long recentlyFinishedLifetimeMs = 600000;
- private int waitTimeIdleMs = 2000;
- private int waitTimeMs = 500;
+ private final long executionLogsRetentionMs;
- // When we have an http error, for that flow, we'll check every 10 secs, 6
- // times (1 mins) before we evict.
- private int numErrors = 6;
- private long errorThreshold = 10000;
+ private boolean shutdown = false;
+ private long lastLogCleanTime = -1;
- private void shutdown() {
+ public CleanerThread(long executionLogsRetentionMs) {
+ this.executionLogsRetentionMs = executionLogsRetentionMs;
+ this.setName("AzkabanWebServer-Cleaner-Thread");
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
shutdown = true;
+ this.interrupt();
}
- @SuppressWarnings("unchecked")
public void run() {
while (!shutdown) {
- try {
- lastThreadCheckTime = System.currentTimeMillis();
- updaterStage = "Starting update all flows.";
-
- Map<Executor, List<ExecutableFlow>> exFlowMap =
- getFlowToExecutorMap();
- ArrayList<ExecutableFlow> finishedFlows =
- new ArrayList<ExecutableFlow>();
- ArrayList<ExecutableFlow> finalizeFlows =
- new ArrayList<ExecutableFlow>();
-
- if (exFlowMap.size() > 0) {
- for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
- .entrySet()) {
- List<Long> updateTimesList = new ArrayList<Long>();
- List<Integer> executionIdsList = new ArrayList<Integer>();
-
- Executor executor = entry.getKey();
-
- updaterStage =
- "Starting update flows on " + executor.getHost() + ":"
- + executor.getPort();
-
- // We pack the parameters of the same host together before we
- // query.
- fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
- updateTimesList);
-
- Pair<String, String> updateTimes =
- new Pair<String, String>(
- ConnectorParams.UPDATE_TIME_LIST_PARAM,
- JSONUtils.toJSON(updateTimesList));
- Pair<String, String> executionIds =
- new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
- JSONUtils.toJSON(executionIdsList));
-
- Map<String, Object> results = null;
- try {
- results =
- callExecutorServer(executor.getHost(), executor.getPort(),
- ConnectorParams.UPDATE_ACTION, null, null, executionIds,
- updateTimes);
- } catch (IOException e) {
- logger.error(e);
- for (ExecutableFlow flow : entry.getValue()) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(flow.getExecutionId());
-
- updaterStage =
- "Failed to get update. Doing some clean up for flow "
- + pair.getSecond().getExecutionId();
-
- if (pair != null) {
- ExecutionReference ref = pair.getFirst();
- int numErrors = ref.getNumErrors();
- if (ref.getNumErrors() < this.numErrors) {
- ref.setNextCheckTime(System.currentTimeMillis()
- + errorThreshold);
- ref.setNumErrors(++numErrors);
- } else {
- logger.error("Evicting flow " + flow.getExecutionId()
- + ". The executor is unresponsive.");
- // TODO should send out an unresponsive email here.
- finalizeFlows.add(pair.getSecond());
- }
- }
- }
- }
-
- // We gets results
- if (results != null) {
- List<Map<String, Object>> executionUpdates =
- (List<Map<String, Object>>) results
- .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
- for (Map<String, Object> updateMap : executionUpdates) {
- try {
- ExecutableFlow flow = updateExecution(updateMap);
-
- updaterStage = "Updated flow " + flow.getExecutionId();
-
- if (isFinished(flow)) {
- finishedFlows.add(flow);
- finalizeFlows.add(flow);
- }
- } catch (ExecutorManagerException e) {
- ExecutableFlow flow = e.getExecutableFlow();
- logger.error(e);
-
- if (flow != null) {
- logger.error("Finalizing flow " + flow.getExecutionId());
- finalizeFlows.add(flow);
- }
- }
- }
- }
- }
-
- updaterStage = "Evicting old recently finished flows.";
-
- evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
- // Add new finished
- for (ExecutableFlow flow : finishedFlows) {
- if (flow.getScheduleId() >= 0
- && flow.getStatus() == Status.SUCCEEDED) {
- ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
- cacheDir);
- }
- fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
- recentlyFinished.put(flow.getExecutionId(), flow);
- }
-
- updaterStage =
- "Finalizing " + finalizeFlows.size() + " error flows.";
+ synchronized (this) {
+ try {
+ lastCleanerThreadCheckTime = System.currentTimeMillis();
- // Kill error flows
- for (ExecutableFlow flow : finalizeFlows) {
- finalizeFlows(flow);
+ // Cleanup old stuff.
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+ cleanExecutionLogs();
+ lastLogCleanTime = currentTime;
}
- }
- updaterStage = "Updated all active flows. Waiting for next round.";
-
- synchronized (this) {
- try {
- if (runningFlows.size() > 0) {
- this.wait(waitTimeMs);
- } else {
- this.wait(waitTimeIdleMs);
- }
- } catch (InterruptedException e) {
- }
+ wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
}
- } catch (Exception e) {
- logger.error(e);
}
}
}
+
+ private void cleanExecutionLogs() {
+ logger.info("Cleaning old logs from execution_logs");
+ long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
+ logger.info("Cleaning old log files before "
+ + new DateTime(cutoff).toString());
+ cleanOldExecutionLogs(DateTime.now().getMillis()
+ - executionLogsRetentionMs);
+ }
}
+
/*
* This thread is responsible for processing queued flows.
*/
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
private static final long ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS = 1000;
+ private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+ private static final int MAX_CONTINUOUS_FLOW_PROCESSED = 10;
+
private boolean shutdown = false;
private boolean isActive = true;
- public QueueProcessorThread() {
+ public QueueProcessorThread(boolean isActive) {
+ setActive(isActive);
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
@@ -1614,130 +1748,117 @@ public class ExecutorManager extends EventHandler implements
try {
// start processing queue if active, other wait for sometime
if (isActive) {
- processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS);
+ processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS,
+ MAX_CONTINUOUS_FLOW_PROCESSED);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
logger.info(
"QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
}
}
- }
- private void refreshExecutors() {
- synchronized (activeExecutors) {
- // TODO: rest api call to refresh executor stats
- }
- }
+ /* Method responsible for processing the non-dispatched flows */
+ private void processQueuedFlows(long activeExecutorsRefreshWindow,
+ int maxContinuousFlowProcessed) throws InterruptedException,
+ ExecutorManagerException {
+ long lastProcessingTime = System.currentTimeMillis();
+ Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+ int currentContinuousFlowProcessed = 0;
+
+ while (isActive() && (runningCandidate = waitAndFetchQueueHead()) != null) {
+ ExecutionReference reference = runningCandidate.getFirst();
+ ExecutableFlow exflow = runningCandidate.getSecond();
+
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow
+ || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+ refreshExecutors(); // Refresh executor stats to be used by selector
+ lastProcessingTime = currentTime;
+ currentContinuousFlowProcessed = 0;
+ }
- private void processQueuedFlows(long activeExecutorsRefreshWindow)
- throws InterruptedException {
- long lastProcessingTime = System.currentTimeMillis();
- Pair<ExecutionReference, ExecutableFlow> runningCandidate;
- while ((runningCandidate = queuedFlows.take()) != null) {
- // stop queue processing, if queueProcessor is marked inactive
- if (!queueProcessor.isActive())
- return;
-
- long currentTime = System.currentTimeMillis();
- if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow) {
- // Refresh executor stats to be used by selector
- refreshExecutors();
- lastProcessingTime = currentTime;
+ // process flow with current snapshot of activeExecutors
+ processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ currentContinuousFlowProcessed++;
}
+ }
- ExecutionReference reference = runningCandidate.getFirst();
- ExecutableFlow exflow = runningCandidate.getSecond();
+ /* process flow with a snapshot of available Executors */
+ private void processFlow(ExecutionReference reference,
+ ExecutableFlow exflow, Set<Executor> availableExecutors)
+ throws ExecutorManagerException {
synchronized (exflow) {
- Executor choosenExecutor;
-
- // TODO: use dispatcher
- synchronized (activeExecutors) {
- choosenExecutor = activeExecutors.iterator().next();
- }
-
- if (choosenExecutor != null) {
-
+ Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+ if (selectedExecutor != null) {
try {
- // TODO: ADD rest call to do an actual dispatch
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-
- executorLoader.assignExecutor(exflow.getExecutionId(),
- choosenExecutor.getId());
- reference.setExecutor(choosenExecutor);
-
- // move from queuedFlows to running flows
- dequeueFlow(exflow.getExecutionId());
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ dispatch(reference, exflow, selectedExecutor);
} catch (ExecutorManagerException e) {
- logger.error("Failed to process queued flow", e);
- // TODO: allow N errors and re-try
- finalizeFlows(exflow);
+ logger.debug(String.format(
+ "Executor %s responded with exception for exec: %d",
+ selectedExecutor, exflow.getExecutionId()), e);
+ handleDispatchExceptionCase(reference, exflow, selectedExecutor,
+ availableExecutors);
}
} else {
- // TODO: handle scenario where dispatcher didn't assigned any executor
+ handleNoExecutorSelectedCase(reference, exflow);
}
}
}
- }
-
- /*
- * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
- */
- private class CleanerThread extends Thread {
- // log file retention is 1 month.
- // check every day
- private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
- 24 * 60 * 60 * 1000;
-
- private final long executionLogsRetentionMs;
-
- private boolean shutdown = false;
- private long lastLogCleanTime = -1;
-
- public CleanerThread(long executionLogsRetentionMs) {
- this.executionLogsRetentionMs = executionLogsRetentionMs;
- this.setName("AzkabanWebServer-Cleaner-Thread");
+ /* Choose Executor for exflow among the available executors */
+ private Executor selectExecutor(ExecutableFlow exflow,
+ Set<Executor> availableExecutors) {
+ Executor choosenExecutor;
+ // TODO: use dispatcher
+ choosenExecutor = availableExecutors.iterator().next();
+ return choosenExecutor;
+ }
+
+ private void handleDispatchExceptionCase(ExecutionReference reference,
+ ExecutableFlow exflow, Executor lastSelectedExecutor,
+ Set<Executor> remainingExecutors) throws ExecutorManagerException {
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ reference.setExecutor(null);
+ if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+ || remainingExecutors.size() <= 1) {
+ logger.error("Failed to process queued flow");
+ finalizeFlows(exflow);
+ } else {
+ remainingExecutors.remove(lastSelectedExecutor);
+ // try other executors except chosenExecutor
+ processFlow(reference, exflow, remainingExecutors);
+ }
}
- @SuppressWarnings("unused")
- public void shutdown() {
- shutdown = true;
- this.interrupt();
+ private void handleNoExecutorSelectedCase(ExecutionReference reference,
+ ExecutableFlow exflow) throws ExecutorManagerException {
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ // Scenario: when dispatcher didn't assigned any executor
+ if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
+ finalizeFlows(exflow);
+ } else {
+ // again queue this flow
+ enqueueFlow(exflow, reference);
+ }
}
- public void run() {
- while (!shutdown) {
- synchronized (this) {
- try {
- lastCleanerThreadCheckTime = System.currentTimeMillis();
+ private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+ Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
- // Cleanup old stuff.
- long currentTime = System.currentTimeMillis();
- if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
- cleanExecutionLogs();
- lastLogCleanTime = currentTime;
- }
-
- wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
- } catch (InterruptedException e) {
- logger.info("Interrupted. Probably to shut down.");
- }
- }
- }
- }
+ // to be moved after db update once we integrate rest api changes
+ reference.setExecutor(choosenExecutor);
+ // TODO: ADD rest call to do an actual dispatch
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ executorLoader.assignExecutor(exflow.getExecutionId(),
+ choosenExecutor.getId());
- private void cleanExecutionLogs() {
- logger.info("Cleaning old logs from execution_logs");
- long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
- logger.info("Cleaning old log files before "
- + new DateTime(cutoff).toString());
- cleanOldExecutionLogs(DateTime.now().getMillis()
- - executionLogsRetentionMs);
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
}
}
-}
+}
\ No newline at end of file