package azkaban.executor;
import azkaban.Constants;
import azkaban.metrics.CommonMetrics;
import azkaban.utils.FlowUtils;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
import azkaban.executor.selector.ExecutorSelector;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
"azkaban.executorselector.filters";
static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
"azkaban.executorselector.comparator.";
static final String AZKABAN_QUEUEPROCESSING_ENABLED =
"azkaban.queueprocessing.enabled";
static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
"azkaban.use.multiple.executors";
private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
"azkaban.webserver.queue.size";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
"azkaban.activeexecutor.refresh.milisecinterval";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
"azkaban.maxDispatchingErrors";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
private CleanerThread cleanerThread;
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
new ConcurrentHashMap<>();
QueuedExecutions queuedFlows;
final private Set<Executor> activeExecutors = new HashSet<>();
private QueueProcessorThread queueProcessor;
private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
private ExecutingManagerUpdaterThread executingManager;
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
private String updaterStage = "not started";
private Map<String, Alerter> alerters;
File cacheDir;
private final Props azkProps;
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
public ExecutorManager(Props azkProps, ExecutorLoader loader,
Map<String, Alerter> alerters) throws ExecutorManagerException {
this.alerters = alerters;
this.azkProps = azkProps;
this.executorLoader = loader;
this.setupExecutors();
queuedFlows =
new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
cacheDir = new File(azkProps.getString("cache.directory", "cache"));
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
if(isMultiExecutorMode()) {
setupMultiExecutorMode();
}
long executionLogsRetentionMs =
azkProps.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
private void setupMultiExecutorMode() {
String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
if (filters != null) {
filterList = Arrays.asList(StringUtils.split(filters, ","));
}
Map<String, String> compListStrings =
azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
if (compListStrings != null) {
comparatorWeightsMap = new TreeMap<String, Integer>();
for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
}
}
executorInforRefresherService =
Executors.newFixedThreadPool(azkProps.getInt(
AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
queueProcessor.start();
}
@Override
public void setupExecutors() throws ExecutorManagerException {
Set<Executor> newExecutors = new HashSet<Executor>();
if (isMultiExecutorMode()) {
logger.info("Initializing multi executors from database");
newExecutors.addAll(executorLoader.fetchActiveExecutors());
} else if (azkProps.containsKey("executor.port")) {
String executorHost = azkProps.getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
int executorPort = azkProps.getInt("executor.port");
logger.info(String.format("Initializing local executor %s:%d",
executorHost, executorPort));
Executor executor =
executorLoader.fetchExecutor(executorHost, executorPort);
if (executor == null) {
executor = executorLoader.addExecutor(executorHost, executorPort);
} else if (!executor.isActive()) {
executor.setActive(true);
executorLoader.updateExecutor(executor);
}
newExecutors.add(new Executor(executor.getId(), executorHost,
executorPort, true));
}
if (newExecutors.isEmpty()) {
logger.error("No active executor found");
throw new ExecutorManagerException("No active executor found");
} else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
logger.error("Multiple local executors specified");
throw new ExecutorManagerException("Multiple local executors specified");
} else {
activeExecutors.clear();
activeExecutors.addAll(newExecutors);
}
}
private boolean isMultiExecutorMode() {
return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
}
private void refreshExecutors() {
synchronized (activeExecutors) {
List<Pair<Executor, Future<String>>> futures =
new ArrayList<Pair<Executor, Future<String>>>();
for (final Executor executor : activeExecutors) {
Future<String> fetchExecutionInfo =
executorInforRefresherService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return callExecutorForJsonString(executor.getHost(),
executor.getPort(), "/serverStatistics", null);
}
});
futures.add(new Pair<Executor, Future<String>>(executor,
fetchExecutionInfo));
}
boolean wasSuccess = true;
for (Pair<Executor, Future<String>> refreshPair : futures) {
Executor executor = refreshPair.getFirst();
executor.setExecutorInfo(null);
try {
String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
logger.info(String.format(
"Successfully refreshed executor: %s with executor info : %s",
executor, jsonString));
} catch (TimeoutException e) {
wasSuccess = false;
logger.error("Timed out while waiting for ExecutorInfo refresh"
+ executor, e);
} catch (Exception e) {
wasSuccess = false;
logger.error("Failed to update ExecutorInfo for executor : "
+ executor, e);
}
}
if (wasSuccess) {
lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
}
}
}
@Override
public void disableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
queueProcessor.setActive(false);
} else {
throw new ExecutorManagerException(
"Cannot disable QueueProcessor in local mode");
}
}
@Override
public void enableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
queueProcessor.setActive(true);
} else {
throw new ExecutorManagerException(
"Cannot enable QueueProcessor in local mode");
}
}
public State getQueueProcessorThreadState() {
if (isMultiExecutorMode())
return queueProcessor.getState();
else
return State.NEW;
}
public boolean isQueueProcessorThreadActive() {
if (isMultiExecutorMode())
return queueProcessor.isActive();
else
return false;
}
public long getLastSuccessfulExecutorInfoRefresh() {
return this.lastSuccessfulExecutorInfoRefresh;
}
public Set<String> getAvailableExecutorComparatorNames() {
return ExecutorComparator.getAvailableComparatorNames();
}
public Set<String> getAvailableExecutorFilterNames() {
return ExecutorFilter.getAvailableFilterNames();
}
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
}
public String getExecutorThreadStage() {
return updaterStage;
}
@Override
public boolean isExecutorManagerThreadActive() {
return executingManager.isAlive();
}
@Override
public long getLastExecutorManagerThreadCheckTime() {
return lastThreadCheckTime;
}
public long getLastCleanerThreadCheckTime() {
return this.lastCleanerThreadCheckTime;
}
@Override
public Collection<Executor> getAllActiveExecutors() {
return Collections.unmodifiableCollection(activeExecutors);
}
@Override
public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
for (Executor executor : activeExecutors) {
if (executor.getId() == executorId) {
return executor;
}
}
return executorLoader.fetchExecutor(executorId);
}
@Override
public Set<String> getPrimaryServerHosts() {
HashSet<String> ports = new HashSet<String>();
for (Executor executor : activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
return ports;
}
@Override
public Set<String> getAllActiveExecutorServerHosts() {
HashSet<String> ports = new HashSet<>();
for (Executor executor : activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
try {
for (Pair<ExecutionReference, ExecutableFlow> running :
executorLoader.fetchActiveFlows().values()) {
ExecutionReference ref = running.getFirst();
ports.add(ref.getHost() + ":" + ref.getPort());
}
} catch(ExecutorManagerException e) {
logger.error(e);
}
return ports;
}
private void loadQueuedFlows() throws ExecutorManagerException {
List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
executorLoader.fetchQueuedFlows();
if (retrievedExecutions != null) {
for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
}
}
}
@Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
List<Integer> executionIds = new ArrayList<Integer>();
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
queuedFlows.getAllEntries()));
if (runningCandidate != null) {
executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
}
try {
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
executorLoader.fetchActiveFlows().values()));
} catch(ExecutorManagerException e) {
logger.error(e);
}
Collections.sort(executionIds);
return executionIds;
}
private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
List<Integer> executionIds = new ArrayList<Integer>();
for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getFlowId().equals(flowId)
&& ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
return executionIds;
}
@Override
public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
throws IOException {
List<Pair<ExecutableFlow, Executor>> flows = new ArrayList<>();
getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
try {
getActiveFlowsWithExecutorHelper(flows, executorLoader.fetchActiveFlows().values());
} catch(ExecutorManagerException e) {
logger.error(e);
}
return flows;
}
private void getActiveFlowsWithExecutorHelper(
List<Pair<ExecutableFlow, Executor>> flows,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
.getFirst().getExecutor()));
}
}
@Override
public boolean isFlowRunning(int projectId, String flowId) {
boolean isRunning = false;
isRunning =
isRunning
|| isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
try {
isRunning = isRunning || isFlowRunningHelper(projectId, flowId,
executorLoader.fetchActiveFlows().values());
} catch(ExecutorManagerException e) {
logger.error(e);
}
return isRunning;
}
private boolean isFlowRunningHelper(int projectId, String flowId,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getProjectId() == projectId
&& ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
return false;
}
@Override
public ExecutableFlow getExecutableFlow(int execId)
throws ExecutorManagerException {
return executorLoader.fetchExecutableFlow(execId);
}
@Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<>();
getActiveFlowHelper(flows, queuedFlows.getAllEntries());
try {
getActiveFlowHelper(flows, executorLoader.fetchActiveFlows().values());
} catch(ExecutorManagerException e) {
logger.error(e);
}
return flows;
}
private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(ref.getSecond());
}
}
public String getRunningFlowIds() {
List<Integer> allIds = new ArrayList<>();
getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
try {
getRunningFlowsIdsHelper(allIds, executorLoader.fetchActiveFlows().values());
} catch(ExecutorManagerException e) {
logger.error(e);
}
Collections.sort(allIds);
return allIds.toString();
}
public String getQueuedFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
Collections.sort(allIds);
return allIds.toString();
}
public long getQueuedFlowSize() {
return queuedFlows.size();
}
private void getRunningFlowsIdsHelper(List<Integer> allIds,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
allIds.add(ref.getSecond().getExecutionId());
}
}
@Override
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@Override
public List<ExecutableFlow> getExecutableFlows(Project project,
String flowId, int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(int skip, int size)
throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
0, -1, -1, skip, size);
return flows;
}
@Override
public List<ExecutableFlow> getExecutableFlows(String projContain,
String flowContain, String userContain, int status, long begin, long end,
int skip, int size) throws ExecutorManagerException {
List<ExecutableFlow> flows =
executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
status, begin, end, skip, size);
return flows;
}
@Override
public List<ExecutableJobInfo> getExecutableJobs(Project project,
String jobId, int skip, int size) throws ExecutorManagerException {
List<ExecutableJobInfo> nodes =
executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
return nodes;
}
@Override
public int getNumberOfJobExecutions(Project project, String jobId)
throws ExecutorManagerException {
return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
}
@Override
public int getNumberOfExecutions(Project project, String flowId)
throws ExecutorManagerException {
return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
}
@Override
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
int length) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
Pair<String, String> offsetParam =
new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
new Pair<String, String>("length", String.valueOf(length));
@SuppressWarnings("unchecked")
Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
typeParam, offsetParam, lengthParam);
return LogData.createLogDataFromObject(result);
} else {
LogData value =
executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
length);
return value;
}
}
@Override
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
new Pair<String, String>("jobId", jobId);
Pair<String, String> offsetParam =
new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
new Pair<String, String>("length", String.valueOf(length));
Pair<String, String> attemptParam =
new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return LogData.createLogDataFromObject(result);
} else {
LogData value =
executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
offset, length);
return value;
}
}
@Override
public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair == null) {
return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
attempt);
}
Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
Pair<String, String> attemptParam =
new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
jobIdParam, attemptParam);
@SuppressWarnings("unchecked")
List<Object> jobStats = (List<Object>) result.get("attachments");
return jobStats;
}
@Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
String jobId, int offset, int length, int attempt)
throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> activeFlow =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (activeFlow != null) {
Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam =
new Pair<String, String>("jobId", jobId);
Pair<String, String> offsetParam =
new Pair<String, String>("offset", String.valueOf(offset));
Pair<String, String> lengthParam =
new Pair<String, String>("length", String.valueOf(length));
Pair<String, String> attemptParam =
new Pair<String, String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result =
callExecutorServer(activeFlow.getFirst(), ConnectorParams.METADATA_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
} else {
return null;
}
}
@Override
public void cancelFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> activeFlow =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if(activeFlow != null) {
callExecutorServer(activeFlow.getFirst(), ConnectorParams.CANCEL_ACTION,
userId);
} else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
queuedFlows.dequeue(exFlow.getExecutionId());
finalizeFlows(exFlow);
} else {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
}
}
@Override
public void resumeFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> activeFlow =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (activeFlow == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
callExecutorServer(activeFlow.getFirst(), ConnectorParams.RESUME_ACTION, userId);
}
}
@Override
public void pauseFlow(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> activeFlow =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (activeFlow == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
callExecutorServer(activeFlow.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
}
}
@Override
public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
jobIds);
}
@Override
public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
jobIds);
}
@Override
public void retryFailures(ExecutableFlow exFlow, String userId)
throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
}
@Override
public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
jobIds);
}
@Override
public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
jobIds);
}
@Override
public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
jobIds);
}
@Override
public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
jobIds);
}
@SuppressWarnings("unchecked")
private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
String command, String userId, String... jobIds)
throws ExecutorManagerException {
synchronized (exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair =
executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ " isn't running.");
}
Map<String, Object> response = null;
if (jobIds != null && jobIds.length > 0) {
for (String jobId : jobIds) {
if (!jobId.isEmpty()) {
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
throw new ExecutorManagerException("Job " + jobId
+ " doesn't exist in execution " + exFlow.getExecutionId()
+ ".");
}
}
}
String ids = StringUtils.join(jobIds, ',');
response =
callExecutorServer(pair.getFirst(),
ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
new Pair<String, String>(
ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
} else {
response =
callExecutorServer(pair.getFirst(),
ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
new Pair<String, String>(
ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
}
return response;
}
}
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
throws ExecutorManagerException {
String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
synchronized (exFlowKey.intern()) {
String flowId = exflow.getFlowId();
logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
if (queuedFlows.isFull()) {
message =
String
.format(
"Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
flowId, exflow.getProjectName());
logger.error(message);
} else {
int projectId = exflow.getProjectId();
exflow.setSubmitUser(userId);
exflow.setSubmitTime(System.currentTimeMillis());
List<Integer> running = getRunningFlows(projectId, flowId);
ExecutionOptions options = exflow.getExecutionOptions();
if (options == null) {
options = new ExecutionOptions();
}
if (options.getDisabledJobs() != null) {
FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
}
if (!running.isEmpty()) {
if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
Integer runningExecId = running.get(running.size() - 1);
options.setPipelineExecutionId(runningExecId);
message =
"Flow " + flowId + " is already running with exec id "
+ runningExecId + ". Pipelining level "
+ options.getPipelineLevel() + ". \n";
} else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
throw new ExecutorManagerException("Flow " + flowId
+ " is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution);
} else {
message =
"Flow " + flowId + " is already running with exec id "
+ StringUtils.join(running, ",")
+ ". Will execute concurrently. \n";
}
}
boolean memoryCheck =
!ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
ProjectWhitelist.WhitelistType.MemoryCheck);
options.setMemoryCheck(memoryCheck);
executorLoader.uploadExecutableFlow(exflow);
ExecutionReference reference =
new ExecutionReference(exflow.getExecutionId());
if (isMultiExecutorMode()) {
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference
.getExecId());
throw e;
}
}
message +=
"Execution submitted successfully with exec id "
+ exflow.getExecutionId();
}
return message;
}
}
private void cleanOldExecutionLogs(long millis) {
long beforeDeleteLogsTimestamp = System.currentTimeMillis();
try {
int count = executorLoader.removeExecutionLogsByTime(millis);
logger.info("Cleaned up " + count + " log entries.");
} catch (ExecutorManagerException e) {
logger.error("log clean up failed. ", e);
}
logger.info("log clean up time: " + (System.currentTimeMillis() - beforeDeleteLogsTimestamp)/1000 + " seconds.");
}
private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
Executor executor, String action) throws ExecutorManagerException {
try {
return callExecutorServer(executor.getHost(), executor.getPort(), action,
exflow.getExecutionId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
String action, String user) throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
ref.getExecId(), user, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
String action, Pair<String, String>... params)
throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
ref.getExecId(), null, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(ExecutionReference ref,
String action, String user, Pair<String, String>... params)
throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
ref.getExecId(), user, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
}
private Map<String, Object> callExecutorServer(String host, int port,
String action, Integer executionId, String user,
Pair<String, String>... params) throws IOException {
List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
if(params != null) {
paramList.addAll(Arrays.asList(params));
}
paramList
.add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
.valueOf(executionId)));
paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
Map<String, Object> jsonResponse =
callExecutorForJsonObject(host, port, "/executor", paramList);
return jsonResponse;
}
private Map<String, Object> callExecutorForJsonObject(String host, int port,
String path, List<Pair<String, String>> paramList) throws IOException {
String responseString =
callExecutorForJsonString(host, port, path, paramList);
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
(Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
return jsonResponse;
}
private String callExecutorForJsonString(String host, int port, String path,
List<Pair<String, String>> paramList) throws IOException {
if (paramList == null) {
paramList = new ArrayList<Pair<String, String>>();
}
ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
@SuppressWarnings("unchecked")
URI uri =
ExecutorApiClient.buildUri(host, port, path, true,
paramList.toArray(new Pair[0]));
return apiclient.httpGet(uri, null);
}
@Override
public Map<String, Object> callExecutorStats(int executorId, String action,
Pair<String, String>... params) throws IOException, ExecutorManagerException {
Executor executor = fetchExecutor(executorId);
List<Pair<String, String>> paramList =
new ArrayList<Pair<String, String>>();
if (params != null) {
paramList.addAll(Arrays.asList(params));
}
paramList
.add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
"/stats", paramList);
}
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException {
List<Pair<String, String>> paramList =
new ArrayList<Pair<String, String>>();
paramList.add(new Pair<String, String>(action, ""));
if(mBean != null) {
paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
}
String[] hostPortSplit = hostPort.split(":");
return callExecutorForJsonObject(hostPortSplit[0],
Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
@Override
public void shutdown() {
if (isMultiExecutorMode()) {
queueProcessor.shutdown();
}
executingManager.shutdown();
}
private class ExecutingManagerUpdaterThread extends Thread {
private boolean shutdown = false;
public ExecutingManagerUpdaterThread() {
this.setName("ExecutorManagerUpdaterThread");
}
private long recentlyFinishedLifetimeMs = 600000;
private int waitTimeIdleMs = 2000;
private int waitTimeMs = 500;
private int numErrors = 6;
private long errorThreshold = 10000;
private void shutdown() {
shutdown = true;
}
@Override
@SuppressWarnings("unchecked")
public void run() {
while (!shutdown) {
try {
lastThreadCheckTime = System.currentTimeMillis();
updaterStage = "Starting update all flows.";
Map<Executor, List<ExecutableFlow>> exFlowMap =
getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows =
new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows =
new ArrayList<ExecutableFlow>();
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
executorLoader.fetchActiveFlows();
if (exFlowMap.size() > 0) {
for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
.entrySet()) {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
Executor executor = entry.getKey();
updaterStage =
"Starting update flows on " + executor.getHost() + ":"
+ executor.getPort();
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
updateTimesList);
Pair<String, String> updateTimes =
new Pair<String, String>(
ConnectorParams.UPDATE_TIME_LIST_PARAM,
JSONUtils.toJSON(updateTimesList));
Pair<String, String> executionIds =
new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
JSONUtils.toJSON(executionIdsList));
Map<String, Object> results = null;
try {
results =
callExecutorServer(executor.getHost(),
executor.getPort(), ConnectorParams.UPDATE_ACTION,
null, null, executionIds, updateTimes);
} catch (IOException e) {
logger.error(e);
for (ExecutableFlow flow : entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> activeFlow =
activeFlows.get(flow.getExecutionId());
updaterStage =
"Failed to get update. Doing some clean up for flow "
+ flow.getExecutionId();
if (activeFlow != null) {
ExecutionReference ref = activeFlow.getFirst();
int numErrors = ref.getNumErrors();
if (ref.getNumErrors() < this.numErrors) {
ref.setNextCheckTime(System.currentTimeMillis()
+ errorThreshold);
ref.setNumErrors(++numErrors);
} else {
logger.error("Evicting flow " + flow.getExecutionId()
+ ". The executor is unresponsive.");
finalizeFlows.add(activeFlow.getSecond());
}
}
}
}
if (results != null) {
List<Map<String, Object>> executionUpdates =
(List<Map<String, Object>>) results
.get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
for (Map<String, Object> updateMap : executionUpdates) {
try {
ExecutableFlow flow = updateExecution(updateMap);
updaterStage = "Updated flow " + flow.getExecutionId();
if (isFinished(flow)) {
finishedFlows.add(flow);
finalizeFlows.add(flow);
}
} catch (ExecutorManagerException e) {
ExecutableFlow flow = e.getExecutableFlow();
logger.error(e);
if (flow != null) {
logger.error("Finalizing flow " + flow.getExecutionId());
finalizeFlows.add(flow);
}
}
}
}
}
updaterStage = "Evicting old recently finished flows.";
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
for (ExecutableFlow flow : finishedFlows) {
if (flow.getScheduleId() >= 0
&& flow.getStatus() == Status.SUCCEEDED) {
ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
cacheDir);
}
fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
recentlyFinished.put(flow.getExecutionId(), flow);
}
updaterStage =
"Finalizing " + finalizeFlows.size() + " error flows.";
for (ExecutableFlow flow : finalizeFlows) {
finalizeFlows(flow);
}
}
updaterStage = "Updated all active flows. Waiting for next round.";
synchronized (this) {
try {
if (activeFlows.size() > 0) {
this.wait(waitTimeMs);
} else {
this.wait(waitTimeIdleMs);
}
} catch (InterruptedException e) {
}
}
} catch (Exception e) {
logger.error(e);
}
}
}
}
private void finalizeFlows(ExecutableFlow flow) {
int execId = flow.getExecutionId();
boolean alertUser = true;
updaterStage = "finalizing flow " + execId;
try {
ExecutableFlow dsFlow;
if (isFinished(flow)) {
dsFlow = flow;
} else {
updaterStage = "finalizing flow " + execId + " loading from db";
dsFlow = executorLoader.fetchExecutableFlow(execId);
if (!isFinished(dsFlow)) {
updaterStage = "finalizing flow " + execId + " failing the flow";
failEverything(dsFlow);
executorLoader.updateExecutableFlow(dsFlow);
}
}
updaterStage = "finalizing flow " + execId + " deleting active reference";
if (flow.getEndTime() == -1) {
flow.setEndTime(System.currentTimeMillis());
executorLoader.updateExecutableFlow(dsFlow);
}
executorLoader.removeActiveExecutableReference(execId);
updaterStage = "finalizing flow " + execId + " cleaning from memory";
fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
alertUser = false;
logger.error(e);
}
updaterStage = "finalizing flow " + execId + " alerting and emailing";
if(alertUser) {
ExecutionOptions options = flow.getExecutionOptions();
Alerter mailAlerter = alerters.get("email");
if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
try {
mailAlerter.alertOnError(flow);
} catch (Exception e) {
logger.error(e);
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
String alertType = options.getFlowParameters().get("alert.type");
Alerter alerter = alerters.get(alertType);
if (alerter != null) {
try {
alerter.alertOnError(flow);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
}
} else {
logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
}
}
} else {
if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
try {
mailAlerter.alertOnSuccess(flow);
} catch (Exception e) {
logger.error(e);
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
String alertType = options.getFlowParameters().get("alert.type");
Alerter alerter = alerters.get(alertType);
if (alerter != null) {
try {
alerter.alertOnSuccess(flow);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
}
} else {
logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
}
}
}
}
}
private void failEverything(ExecutableFlow exFlow) {
long time = System.currentTimeMillis();
for (ExecutableNode node : exFlow.getExecutableNodes()) {
switch (node.getStatus()) {
case SUCCEEDED:
case FAILED:
case KILLED:
case SKIPPED:
case DISABLED:
continue;
case READY:
node.setStatus(Status.KILLED);
break;
default:
node.setStatus(Status.FAILED);
break;
}
if (node.getStartTime() == -1) {
node.setStartTime(time);
}
if (node.getEndTime() == -1) {
node.setEndTime(time);
}
}
if (exFlow.getEndTime() == -1) {
exFlow.setEndTime(time);
}
exFlow.setStatus(Status.FAILED);
}
private void evictOldRecentlyFinished(long ageMs) {
ArrayList<Integer> recentlyFinishedKeys =
new ArrayList<Integer>(recentlyFinished.keySet());
long oldAgeThreshold = System.currentTimeMillis() - ageMs;
for (Integer key : recentlyFinishedKeys) {
ExecutableFlow flow = recentlyFinished.get(key);
if (flow.getEndTime() < oldAgeThreshold) {
recentlyFinished.remove(key);
}
}
}
private ExecutableFlow updateExecution(Map<String, Object> updateData)
throws ExecutorManagerException {
Integer execId =
(Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
if (execId == null) {
throw new ExecutorManagerException(
"Response is malformed. Need exec id to update.");
}
Pair<ExecutionReference, ExecutableFlow> activeFlow =
executorLoader.fetchActiveFlowByExecId(execId);
if (activeFlow == null) {
throw new ExecutorManagerException(
"No running flow found with the execution id. Removing " + execId);
}
ExecutionReference ref = activeFlow.getFirst();
ExecutableFlow flow = activeFlow.getSecond();
if (updateData.containsKey("error")) {
throw new ExecutorManagerException((String) updateData.get("error"), flow);
}
ref.setNextCheckTime(0);
ref.setNumErrors(0);
Status oldStatus = flow.getStatus();
flow.applyUpdateObject(updateData);
Status newStatus = flow.getStatus();
if(oldStatus != newStatus && newStatus == Status.FAILED) {
CommonMetrics.INSTANCE.markFlowFail();
}
ExecutionOptions options = flow.getExecutionOptions();
if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
if (options.getNotifyOnFirstFailure()) {
Alerter mailAlerter = alerters.get("email");
try {
mailAlerter.alertOnFirstError(flow);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to send first error email." + e.getMessage());
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
String alertType = options.getFlowParameters().get("alert.type");
Alerter alerter = alerters.get(alertType);
if (alerter != null) {
try {
alerter.alertOnFirstError(flow);
} catch (Exception e) {
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
}
} else {
logger.error("Alerter type " + alertType
+ " doesn't exist. Failed to alert.");
}
}
}
return flow;
}
public boolean isFinished(ExecutableFlow flow) {
switch (flow.getStatus()) {
case SUCCEEDED:
case FAILED:
case KILLED:
return true;
default:
return false;
}
}
private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
List<Integer> executionIds, List<Long> updateTimes) {
for (ExecutableFlow flow : flows) {
executionIds.add(flow.getExecutionId());
updateTimes.add(flow.getUpdateTime());
}
}
private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
HashMap<Executor, List<ExecutableFlow>> exFlowMap =
new HashMap<>();
try {
for (Pair<ExecutionReference, ExecutableFlow> runningFlow :
executorLoader.fetchActiveFlows().values()) {
ExecutionReference ref = runningFlow.getFirst();
ExecutableFlow flow = runningFlow.getSecond();
Executor executor = ref.getExecutor();
if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
continue;
}
List<ExecutableFlow> flows = exFlowMap.get(executor);
if (flows == null) {
flows = new ArrayList<>();
exFlowMap.put(executor, flows);
}
flows.add(flow);
}
} catch(ExecutorManagerException e) {
logger.error(e);
}
return exFlowMap;
}
@Override
public int getExecutableFlows(int projectId, String flowId, int from,
int length, List<ExecutableFlow> outputList)
throws ExecutorManagerException {
List<ExecutableFlow> flows =
executorLoader.fetchFlowHistory(projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
int from, int length, Status status) throws ExecutorManagerException {
return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
status);
}
private class CleanerThread extends Thread {
private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
24 * 60 * 60 * 1000;
private final long executionLogsRetentionMs;
private boolean shutdown = false;
private long lastLogCleanTime = -1;
public CleanerThread(long executionLogsRetentionMs) {
this.executionLogsRetentionMs = executionLogsRetentionMs;
this.setName("AzkabanWebServer-Cleaner-Thread");
}
@SuppressWarnings("unused")
public void shutdown() {
shutdown = true;
this.interrupt();
}
@Override
public void run() {
while (!shutdown) {
synchronized (this) {
try {
lastCleanerThreadCheckTime = System.currentTimeMillis();
long currentTime = System.currentTimeMillis();
if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
cleanExecutionLogs();
lastLogCleanTime = currentTime;
}
wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
} catch (InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
}
}
}
private void cleanExecutionLogs() {
logger.info("Cleaning old logs from execution_logs");
long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
logger.info("Cleaning old log files before "
+ new DateTime(cutoff).toString());
cleanOldExecutionLogs(DateTime.now().getMillis()
- executionLogsRetentionMs);
}
}
private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
Executor choosenExecutor) throws ExecutorManagerException {
exflow.setUpdateTime(System.currentTimeMillis());
executorLoader.assignExecutor(choosenExecutor.getId(),
exflow.getExecutionId());
try {
callExecutorServer(exflow, choosenExecutor,
ConnectorParams.EXECUTE_ACTION);
} catch (ExecutorManagerException ex) {
logger.error("Rolling back executor assignment for execution id:"
+ exflow.getExecutionId(), ex);
executorLoader.unassignExecutor(exflow.getExecutionId());
throw new ExecutorManagerException(ex);
}
reference.setExecutor(choosenExecutor);
logger.info(String.format(
"Successfully dispatched exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
}
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
private final int maxDispatchingErrors;
private final long activeExecutorRefreshWindowInMilisec;
private final int activeExecutorRefreshWindowInFlows;
private volatile boolean shutdown = false;
private volatile boolean isActive = true;
public QueueProcessorThread(boolean isActive,
long activeExecutorRefreshWindowInTime,
int activeExecutorRefreshWindowInFlows,
int maxDispatchingErrors) {
setActive(isActive);
this.maxDispatchingErrors = maxDispatchingErrors;
this.activeExecutorRefreshWindowInFlows =
activeExecutorRefreshWindowInFlows;
this.activeExecutorRefreshWindowInMilisec =
activeExecutorRefreshWindowInTime;
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
public void setActive(boolean isActive) {
this.isActive = isActive;
logger.info("QueueProcessorThread active turned " + this.isActive);
}
public boolean isActive() {
return isActive;
}
public void shutdown() {
shutdown = true;
this.interrupt();
}
@Override
public void run() {
while (!shutdown) {
synchronized (this) {
try {
if (isActive) {
processQueuedFlows(activeExecutorRefreshWindowInMilisec,
activeExecutorRefreshWindowInFlows);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
} catch (Exception e) {
logger.error(
"QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
}
}
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
long lastExecutorRefreshTime = 0;
int currentContinuousFlowProcessed = 0;
while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
ExecutionReference reference = runningCandidate.getFirst();
ExecutableFlow exflow = runningCandidate.getSecond();
long currentTime = System.currentTimeMillis();
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
queuedFlows.enqueue(exflow, reference);
runningCandidate = null;
long sleepInterval =
activeExecutorsRefreshWindow
- (currentTime - lastExecutorRefreshTime);
sleep(sleepInterval);
} else {
exflow.setUpdateTime(currentTime);
selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
runningCandidate = null;
}
if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
currentContinuousFlowProcessed++;
}
}
}
private void selectExecutorAndDispatchFlow(ExecutionReference reference,
ExecutableFlow exflow, Set<Executor> availableExecutors)
throws ExecutorManagerException {
synchronized (exflow) {
Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
if (selectedExecutor != null) {
try {
dispatch(reference, exflow, selectedExecutor);
} catch (ExecutorManagerException e) {
logger.warn(String.format(
"Executor %s responded with exception for exec: %d",
selectedExecutor, exflow.getExecutionId()), e);
handleDispatchExceptionCase(reference, exflow, selectedExecutor,
availableExecutors);
}
} else {
handleNoExecutorSelectedCase(reference, exflow);
}
}
}
private Executor getUserSpecifiedExecutor(ExecutionOptions options,
int executionId) {
Executor executor = null;
if (options != null
&& options.getFlowParameters() != null
&& options.getFlowParameters().containsKey(
ExecutionOptions.USE_EXECUTOR)) {
try {
int executorId =
Integer.valueOf(options.getFlowParameters().get(
ExecutionOptions.USE_EXECUTOR));
executor = fetchExecutor(executorId);
if (executor == null) {
logger
.warn(String
.format(
"User specified executor id: %d for execution id: %d is not active, Looking up db.",
executorId, executionId));
executor = executorLoader.fetchExecutor(executorId);
if (executor == null) {
logger
.warn(String
.format(
"User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
executorId, executionId));
}
}
} catch (ExecutorManagerException ex) {
logger.error("Failed to fetch user specified executor for exec_id = "
+ executionId, ex);
}
}
return executor;
}
private Executor selectExecutor(ExecutableFlow exflow,
Set<Executor> availableExecutors) {
Executor choosenExecutor =
getUserSpecifiedExecutor(exflow.getExecutionOptions(),
exflow.getExecutionId());
if (choosenExecutor == null) {
logger.info("Using dispatcher for execution id :"
+ exflow.getExecutionId());
ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
choosenExecutor = selector.getBest(availableExecutors, exflow);
}
return choosenExecutor;
}
private void handleDispatchExceptionCase(ExecutionReference reference,
ExecutableFlow exflow, Executor lastSelectedExecutor,
Set<Executor> remainingExecutors) throws ExecutorManagerException {
logger
.info(String
.format(
"Reached handleDispatchExceptionCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
if (reference.getNumErrors() > this.maxDispatchingErrors
|| remainingExecutors.size() <= 1) {
logger.error("Failed to process queued flow");
finalizeFlows(exflow);
} else {
remainingExecutors.remove(lastSelectedExecutor);
selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
}
}
private void handleNoExecutorSelectedCase(ExecutionReference reference,
ExecutableFlow exflow) throws ExecutorManagerException {
logger
.info(String
.format(
"Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
queuedFlows.enqueue(exflow, reference);
}
}
}