azkaban-uncached
Changes
src/java/azkaban/execapp/ExecutorServlet.java 41(+21 -20)
src/java/azkaban/execapp/FlowRunner.java 612(+230 -382)
src/java/azkaban/executor/ExecutorManager.java 38(+26 -12)
src/web/js/azkaban.exflow.view.js 43(+2 -41)
Details
src/java/azkaban/execapp/ExecutorServlet.java 41(+21 -20)
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 1475a9a..00f19a2 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -58,7 +58,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
HashMap<String,Object> respMap= new HashMap<String,Object>();
- logger.info("ExecutorServer called by " + req.getRemoteAddr());
+ //logger.info("ExecutorServer called by " + req.getRemoteAddr());
try {
if (!hasParam(req, ACTION_PARAM)) {
logger.error("Parameter action not set");
@@ -67,7 +67,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else {
String action = getParam(req, ACTION_PARAM);
if (action.equals(UPDATE_ACTION)) {
- logger.info("Updated called");
+ //logger.info("Updated called");
handleAjaxUpdateRequest(req, respMap);
}
else if (action.equals(PING_ACTION)) {
@@ -122,27 +122,28 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
respMap.put(RESPONSE_ERROR, "Modification type not set.");
}
String modificationType = getParam(req, MODIFY_EXECUTION_ACTION_TYPE);
- String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
- String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+
try {
- if (MODIFY_RETRY_JOBS.equals(modificationType)) {
- flowRunnerManager.retryJobs(execId, user, jobIds);
- }
- else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
-
- }
- else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
-
+ if (MODIFY_RETRY_FAILURES.equals(modificationType)) {
+ flowRunnerManager.retryFailures(execId, user);
}
- else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
-
- }
- else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
-
- }
- else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
-
+ else {
+// String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
+// String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+//
+// if (MODIFY_RETRY_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
+// }
+// else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
+// }
}
} catch (ExecutorManagerException e) {
logger.error(e);
src/java/azkaban/execapp/FlowRunner.java 612(+230 -382)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 0fbe48d..b58ae29 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -3,18 +3,14 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -38,29 +34,32 @@ import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
-import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
- private int execId;
-
- private File execDir;
-
- private ExecutorService executorService;
- private ExecutorLoader executorLoader;
- private ProjectLoader projectLoader;
-
- private ExecutableFlow flow;
- private Thread currentThread;
- private int numThreads = 10;
+ // We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
+ private static final long CHECK_WAIT_MS = 5*60*60*1000;
private Logger logger;
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender flowAppender;
private File logFile;
+ private ExecutorService executorService;
+ private ExecutorLoader executorLoader;
+ private ProjectLoader projectLoader;
+
+ private int execId;
+ private File execDir;
+ private ExecutableFlow flow;
+ private Thread flowRunnerThread;
+ private int numJobThreads = 10;
+
+ // Sync object for queuing
+ private Object mainSyncObj = new Object();
+
// Properties map
private Map<String, Props> sharedProps = new HashMap<String, Props>();
private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
@@ -69,21 +68,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
- private Map<Pair<String, Integer>, JobRunner> allJobs = new ConcurrentHashMap<Pair<String, Integer>, JobRunner>();
- private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
-
- // Used for individual job pausing
- private Map<String, ExecutableNode> pausedNode = new ConcurrentHashMap<String, ExecutableNode>();
-
- private Object actionSyncObj = new Object();
- private boolean flowPaused = false;
- private boolean flowFailed = false;
- private boolean flowFinished = false;
- private boolean flowCancelled = false;
-
// Used for pipelining
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
@@ -97,12 +83,17 @@ public class FlowRunner extends EventHandler implements Runnable {
private String jobLogFileSize = "5MB";
private int jobLogNumFiles = 4;
+ private boolean flowPaused = false;
+ private boolean flowFailed = false;
+ private boolean flowFinished = false;
+ private boolean flowCancelled = false;
+
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
- this.executorService = Executors.newFixedThreadPool(numThreads);
+ this.executorService = Executors.newFixedThreadPool(numJobThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
@@ -139,40 +130,17 @@ public class FlowRunner extends EventHandler implements Runnable {
return execDir;
}
- @Override
public void run() {
try {
- int projectId = flow.getProjectId();
- int version = flow.getVersion();
- String flowId = flow.getFlowId();
-
- // Add a bunch of common azkaban properties
- PropsUtils.produceParentProperties(flow);
-
- // Create execution dir
- createLogger(flowId);
- logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
- if (pipelineExecId != null) {
- logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
- }
-
- // The current thread is used for interrupting blocks
- currentThread = Thread.currentThread();
- currentThread.setName("FlowRunner-exec-" + flow.getExecutionId());
-
+ setupFlowExecution();
flow.setStartTime(System.currentTimeMillis());
- logger.info("Creating active reference");
- if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
- throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
- }
+ updateFlowReference();
+
logger.info("Updating initial flow directory.");
updateFlow();
-
logger.info("Fetching job and shared properties.");
loadAllProperties();
- logger.info("Queuing initial jobs.");
- queueStartingJobs();
this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
runFlow();
@@ -194,6 +162,34 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ private void setupFlowExecution() {
+ int projectId = flow.getProjectId();
+ int version = flow.getVersion();
+ String flowId = flow.getFlowId();
+
+ // Add a bunch of common azkaban properties
+ PropsUtils.addCommonFlowProperties(flow);
+
+ // Create execution dir
+ createLogger(flowId);
+ logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
+ if (pipelineExecId != null) {
+ logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
+ }
+
+ // The current thread is used for interrupting blocks
+ flowRunnerThread = Thread.currentThread();
+ flowRunnerThread.setName("FlowRunner-exec-" + flow.getExecutionId());
+
+ }
+
+ private void updateFlowReference() throws ExecutorManagerException {
+ logger.info("Update active reference");
+ if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
+ throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
+ }
+ }
+
private void updateFlow() {
updateFlow(System.currentTimeMillis());
}
@@ -276,47 +272,69 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Starting flows");
flow.setStatus(Status.RUNNING);
updateFlow();
+
while (!flowFinished) {
- JobRunner runner = null;
- try {
- runner = jobsToRun.poll(5, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- logger.info("FlowRunner thread has been interrupted.");
- continue;
- }
-
- if(runner == null) continue;
-
- try {
- synchronized(actionSyncObj) {
- ExecutableNode node = runner.getNode();
- if (flowPaused) {
- logger.info("Job Paused " + node.getJobId());
- node.setStatus(Status.PAUSED);
- pausedJobsToRun.add(runner);
+ synchronized(mainSyncObj) {
+ if (flowPaused) {
+ try {
+ mainSyncObj.wait(CHECK_WAIT_MS);
+ } catch (InterruptedException e) {
+ }
+
+ continue;
+ }
+ else {
+ List<ExecutableNode> jobsReadyToRun = findReadyJobsToRun();
+
+ if (!jobsReadyToRun.isEmpty()) {
+ for (ExecutableNode node : jobsReadyToRun) {
+ long currentTime = System.currentTimeMillis();
+
+ // Queue a job only if it's ready to run.
+ if (node.getStatus() == Status.READY) {
+ // Collect output props from the job's dependencies.
+ Props outputProps = collectOutputProps(node);
+ node.setStatus(Status.QUEUED);
+ JobRunner runner = createJobRunner(node, outputProps);
+ try {
+ executorService.submit(runner);
+ runningJob.put(node.getJobId(), runner);
+ } catch (RejectedExecutionException e) {
+ logger.error(e);
+ };
+
+ } // If killed, then auto complete and KILL
+ else if (node.getStatus() == Status.KILLED) {
+ node.setStartTime(currentTime);
+ node.setEndTime(currentTime);
+ } // If disabled, then we auto skip
+ else if (node.getStatus() == Status.DISABLED) {
+ node.setStartTime(currentTime);
+ node.setEndTime(currentTime);
+ node.setStatus(Status.SKIPPED);
+ }
+ }
+
+ updateFlow();
}
else {
- runningJob.put(node.getJobId(), runner);
- allJobs.put(new Pair<String, Integer>(node.getJobId(), node.getAttempt()), runner);
- executorService.submit(runner);
- logger.info("Job Started " + node.getJobId());
+ if (isFlowFinished()) {
+ flowFinished = true;
+ break;
+ }
+
+ try {
+ mainSyncObj.wait(CHECK_WAIT_MS);
+ } catch (InterruptedException e) {
+ }
}
}
- } catch (RejectedExecutionException e) {
- logger.error(e);
}
}
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
- while (!executorService.isTerminated()) {
- try {
- executorService.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- }
- };
-
switch(flow.getStatus()) {
case FAILED_FINISHING:
logger.info("Setting flow status to Failed.");
@@ -329,13 +347,50 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private void queueStartingJobs() {
- for (String startNode : flow.getStartNodes()) {
- ExecutableNode node = flow.getExecutableNode(startNode);
- JobRunner jobRunner = createJobRunner(node, null);
- logger.info("Adding initial job " + startNode + " to run queue.");
- jobsToRun.add(jobRunner);
+ private List<ExecutableNode> findReadyJobsToRun() {
+ ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+ for (ExecutableNode node : flow.getExecutableNodes()) {
+ if(Status.isStatusFinished(node.getStatus())) {
+ continue;
+ }
+ else {
+ // Check the dependencies to see if execution conditions are met,
+ // and what the status should be set to.
+ Status impliedStatus = getImpliedStatus(node);
+ if (getImpliedStatus(node) != null) {
+ node.setStatus(impliedStatus);
+ jobsToRun.add(node);
+ }
+ }
+ }
+
+ return jobsToRun;
+ }
+
+ private boolean isFlowFinished() {
+ for (String end: flow.getEndNodes()) {
+ ExecutableNode node = flow.getExecutableNode(end);
+ if (!Status.isStatusFinished(node.getStatus())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private Props collectOutputProps(ExecutableNode node) {
+ Props previousOutput = null;
+ // Iterate the in nodes again and create the dependencies
+ for (String dependency : node.getInNodes()) {
+ Props output = jobOutputProps.get(dependency);
+ if (output != null) {
+ output = Props.clone(output);
+ output.setParent(previousOutput);
+ previousOutput = output;
+ }
}
+
+ return previousOutput;
}
private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
@@ -403,64 +458,62 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public void pause(String user) {
- synchronized(actionSyncObj) {
- if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.PREPARING) {
+ synchronized(mainSyncObj) {
+ if (!flowFinished) {
logger.info("Flow paused by " + user);
flowPaused = true;
flow.setStatus(Status.PAUSED);
updateFlow();
}
+ else {
+ logger.info("Cannot pause finished flow. Called by user " + user);
+ }
}
+
+ interrupt();
}
public void resume(String user) {
- synchronized(actionSyncObj) {
+ synchronized(mainSyncObj) {
if (!flowPaused) {
logger.info("Cannot resume flow that isn't paused");
}
else {
logger.info("Flow resumed by " + user);
flowPaused = false;
- if (!flowCancelled) {
- flow.setStatus(Status.RUNNING);
+ if (flowFailed) {
+ flow.setStatus(Status.FAILED_FINISHING);
}
-
- for (JobRunner runner: pausedJobsToRun) {
- ExecutableNode node = runner.getNode();
- if (flowCancelled) {
- logger.info("Resumed flow is cancelled. Job killed " + node.getJobId());
- node.setStatus(Status.KILLED);
- }
- else {
- node.setStatus(Status.QUEUED);
- }
-
- jobsToRun.add(runner);
+ else if (flowCancelled) {
+ flow.setStatus(Status.KILLED);
}
+ else {
+ flow.setStatus(Status.RUNNING);
+ }
+
updateFlow();
}
}
}
public void cancel(String user) {
- synchronized(actionSyncObj) {
+ synchronized(mainSyncObj) {
logger.info("Flow cancelled by " + user);
+ cancel();
+ updateFlow();
+ }
+ interrupt();
+ }
+
+ private void cancel() {
+ synchronized(mainSyncObj) {
flowPaused = false;
flowCancelled = true;
-
if (watcher != null) {
watcher.stopWatcher();
}
- for (JobRunner runner: pausedJobsToRun) {
- ExecutableNode node = runner.getNode();
- logger.info("Resumed flow is cancelled. Job killed " + node.getJobId() + " by " + user);
- node.setStatus(Status.KILLED);
-
- jobsToRun.add(runner);
- }
-
for (JobRunner runner : runningJob.values()) {
runner.cancel();
}
@@ -468,165 +521,47 @@ public class FlowRunner extends EventHandler implements Runnable {
if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
flow.setStatus(Status.KILLED);
}
-
- for (ExecutableNode node: pausedNode.values()) {
- node.setStatus(Status.KILLED);
- node.setPaused(false);
- queueNextJob(node, "cancel-all-action");
- }
-
- updateFlow();
- interrupt();
}
}
- public void cancelJob(String jobId, String user) throws ExecutorManagerException {
- synchronized(actionSyncObj) {
- logger.info("Cancel of job " + jobId + " called by user " + user);
- JobRunner runner = runningJob.get(jobId);
- ExecutableNode node = flow.getExecutableNode(jobId);
- if (runner != null) {
- runner.cancel();
- }
- else {
- Status status = node.getStatus();
- if(status == Status.FAILED || status == Status.SUCCEEDED || status == Status.SKIPPED) {
- throw new ExecutorManagerException("Can't cancel finished job " + jobId + " with status " + status);
- }
-
- node.setStatus(Status.KILLED);
- if (node.isPaused()) {
- node.setPaused(false);
- queueNextJob(node, "cancel-action");
- }
- }
- }
- }
-
- public void resumeJob(String jobId, String user) throws ExecutorManagerException {
- synchronized(actionSyncObj) {
- if (runningJob.containsKey(jobId)) {
- throw new ExecutorManagerException("Resume of job " + jobId + " failed since it's already running. User " + user);
- }
- else {
- logger.info("Resume of job " + jobId + " requested by " + user);
- ExecutableNode node = flow.getExecutableNode(jobId);
- if (node == null) {
- throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
+ public void retryFailures(String user) {
+ synchronized(mainSyncObj) {
+ logger.info("Retrying failures invoked by " + user);
+ ArrayList<String> failures = new ArrayList<String>();
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ if (node.getStatus() == Status.FAILED) {
+ failures.add(node.getJobId());
}
-
- if (node.isPaused()) {
- node.setPaused(false);
- if (pausedNode.containsKey(jobId)) {
- queueNextJob(node, "resume-action");
- }
-
- updateFlow();
+ else if (node.getStatus() == Status.KILLED) {
+ node.setStartTime(-1);
+ node.setEndTime(-1);
+ node.setStatus(Status.READY);
}
}
- }
- }
-
- public void pauseJob(String jobId, String user) throws ExecutorManagerException {
- synchronized(actionSyncObj) {
- if (runningJob.containsKey(jobId)) {
- throw new ExecutorManagerException("Pause of job " + jobId + " failed since it's already running. User " + user);
- }
- else {
- logger.info("Pause of job " + jobId + " requested by " + user);
- ExecutableNode node = flow.getExecutableNode(jobId);
- if (node == null) {
- throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
- }
- long startTime = node.getStartTime();
- if (startTime < 0) {
- node.setPaused(true);
- updateFlow();
- }
- else {
- throw new ExecutorManagerException("Cannot pause job " + jobId + " that's started.");
- }
- }
+ retryJobs(failures, user);
}
}
- public void disableJob(String jobId, String user) throws ExecutorManagerException {
- // Disable and then check to see if it's set.
- synchronized(actionSyncObj) {
- if (runningJob.containsKey(jobId)) {
- throw new ExecutorManagerException("Disable of job " + jobId + " failed since it's already running. User " + user);
- }
- else {
- logger.info("Disable of job " + jobId + " requested by " + user);
- ExecutableNode node = flow.getExecutableNode(jobId);
- if (node == null) {
- throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
- }
-
- Status status = node.getStatus();
- if (status == Status.DISABLED || status == Status.READY) {
- node.setStatus(Status.DISABLED);
- updateFlow();
- }
- else {
- throw new ExecutorManagerException("Cannot disable job " + jobId + " with status " + status.toString());
- }
- }
- }
- }
-
- public void enableJob(String jobId, String user) throws ExecutorManagerException {
- // Disable and then check to see if it's set.
- synchronized(actionSyncObj) {
- if (runningJob.containsKey(jobId)) {
- throw new ExecutorManagerException("Enable of job " + jobId + " failed since it's already running. User " + user);
- }
- else {
- logger.info("Enable of job " + jobId + " requested by " + user);
+ public void retryJobs(List<String> jobIds, String user) {
+ synchronized(mainSyncObj) {
+ for (String jobId: jobIds) {
ExecutableNode node = flow.getExecutableNode(jobId);
if (node == null) {
- throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot enable.");
+ logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot retry.");
+ continue;
}
-
- Status status = node.getStatus();
- if (status == Status.DISABLED || status == Status.READY) {
- node.setStatus(Status.READY);
- updateFlow();
+
+ if (Status.isStatusFinished(node.getStatus())) {
+ // Resets the status and increments the attempt number
+ node.resetForRetry();
+ reEnableDependents(node);
+ logger.info("Re-enabling job " + node.getJobId() + " attempt " + node.getAttempt());
}
else {
- throw new ExecutorManagerException("Cannot enable job " + jobId + " with status " + status.toString());
- }
- }
- }
- }
-
- public void retryJobs(String[] jobIds, String user) {
- synchronized(actionSyncObj) {
- ArrayList<ExecutableNode> jobsToBeQueued = new ArrayList<ExecutableNode>();
- for (String jobId: jobIds) {
- if (runningJob.containsKey(jobId)) {
- logger.error("Cannot retry job " + jobId + " since it's already running. User " + user);
+ logger.error("Cannot retry job " + jobId + " since it hasn't run yet. User " + user);
continue;
}
- else {
- logger.info("Retry of job " + jobId + " requested by " + user);
- ExecutableNode node = flow.getExecutableNode(jobId);
- if (node == null) {
- logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
- }
-
- Status status = node.getStatus();
- if (status == Status.FAILED || status == Status.READY || status == Status.KILLED) {
- node.resetForRetry();
- reEnableDependents(node);
- }
- else {
- logger.error("Cannot retry a job that hasn't finished. " + jobId);
- }
-
- jobsToBeQueued.add(node);
- }
}
boolean isFailureFound = false;
@@ -644,11 +579,8 @@ public class FlowRunner extends EventHandler implements Runnable {
flowFailed = false;
}
- for (ExecutableNode node: jobsToBeQueued) {
- queueNextJob(node, "retry-action");
- }
-
updateFlow();
+ interrupt();
}
}
@@ -661,11 +593,16 @@ public class FlowRunner extends EventHandler implements Runnable {
dependentNode.setUpdateTime(System.currentTimeMillis());
reEnableDependents(dependentNode);
}
+ else if (dependentNode.getStatus() == Status.SKIPPED) {
+ dependentNode.setStatus(Status.DISABLED);
+ dependentNode.setUpdateTime(System.currentTimeMillis());
+ reEnableDependents(dependentNode);
+ }
}
}
private void interrupt() {
- currentThread.interrupt();
+ flowRunnerThread.interrupt();
}
private Status getImpliedStatus(ExecutableNode node) {
@@ -695,6 +632,7 @@ public class FlowRunner extends EventHandler implements Runnable {
continue;
case RUNNING:
case QUEUED:
+ case DISABLED:
return null;
default:
// Return null means it's not ready to run.
@@ -716,70 +654,6 @@ public class FlowRunner extends EventHandler implements Runnable {
return Status.READY;
}
- /**
- * Iterates through the finished jobs dependents.
- *
- * @param node
- */
- private synchronized void queueNextJobs(ExecutableNode finishedNode) {
- String trigger = finishedNode.getAttempt() > 0 ? finishedNode.getJobId() + "." + finishedNode.getAttempt() : finishedNode.getJobId();
- for (String dependent : finishedNode.getOutNodes()) {
- ExecutableNode dependentNode = flow.getExecutableNode(dependent);
- queueNextJob(dependentNode, trigger);
- }
- }
-
- /**
- * Queues node for running if it's ready to be run.
- *
- * @param node
- */
- private void queueNextJob(ExecutableNode node, String trigger) {
- Status nextStatus = getImpliedStatus(node);
- if (nextStatus == null) {
- // Not yet ready or not applicable
- return;
- }
-
- node.setStatus(nextStatus);
-
- Props previousOutput = null;
- // Iterate the in nodes again and create the dependencies
- for (String dependency : node.getInNodes()) {
- Props output = jobOutputProps.get(dependency);
- if (output != null) {
- output = Props.clone(output);
- output.setParent(previousOutput);
- previousOutput = output;
- }
- }
-
- synchronized(actionSyncObj) {
- //pausedNode
- if (node.isPaused()) {
- pausedNode.put(node.getJobId(), node);
- logger.info("Job Paused " + node.getJobId());
- return;
- }
-
- JobRunner runner = this.createJobRunner(node, previousOutput);
- if (flowPaused) {
- if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
- node.setStatus(Status.PAUSED);
- }
- pausedJobsToRun.add(runner);
- logger.info("Flow Paused. Pausing " + node.getJobId());
- }
- else {
- if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
- node.setStatus(Status.QUEUED);
- }
- logger.info("Adding " + node.getJobId() + " to run queue with status " + node.getStatus().toString() + " triggered by '" + trigger + "'.");
- jobsToRun.add(runner);
- }
- }
- }
-
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
@@ -787,65 +661,34 @@ public class FlowRunner extends EventHandler implements Runnable {
@Override
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner)event.getRunner();
+
if (event.getType() == Type.JOB_FINISHED) {
- ExecutableNode node = runner.getNode();
-
- logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
- synchronized (actionSyncObj) {
+ synchronized(mainSyncObj) {
+ ExecutableNode node = runner.getNode();
+
+ logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
+
if (node.getStatus() == Status.FAILED) {
- // Setting failure
flowFailed = true;
- if (!isFailedStatus(flow.getStatus())) {
+
+ ExecutionOptions options = flow.getExecutionOptions();
+ // The KILLED status occurs when cancel is invoked. We want to keep this
+ // status even in failure conditions.
+ if (flow.getStatus() != Status.KILLED) {
flow.setStatus(Status.FAILED_FINISHING);
- ExecutionOptions options = flow.getExecutionOptions();
- if (options.getFailureAction() == FailureAction.CANCEL_ALL) {
- cancel("azkaban");
+ if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+ logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
+ cancel();
}
}
}
- jobOutputProps.put(node.getJobId(), runner.getOutputProps());
-
- runningJob.remove(node.getJobId());
-
- fireEventListeners(event);
- queueNextJobs(node);
- }
-
- if (isFlowFinished()) {
- logger.info("Flow appears finished. Cleaning up.");
- flowFinished = true;
interrupt();
- }
- }
-
- if (event.isShouldUpdate()) {
- ExecutableNode node = runner.getNode();
- updateFlow(node.getUpdateTime());
- }
- }
- }
- private boolean isFailedStatus(Status status) {
- switch (status) {
- case FAILED_FINISHING:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
- }
-
- private boolean isFlowFinished() {
- for (String end: flow.getEndNodes()) {
- ExecutableNode node = flow.getExecutableNode(end);
- if (!Status.isStatusFinished(node.getStatus())) {
- return false;
+ fireEventListeners(event);
+ }
}
}
-
- return true;
}
public boolean isCancelled() {
@@ -861,17 +704,22 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public File getJobLogFile(String jobId, int attempt) {
- JobRunner runner = allJobs.get(new Pair<String, Integer>(jobId, attempt));
- if (runner == null) {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ File path = new File(execDir, node.getJobPropsSource());
+
+ String logFileName = JobRunner.createLogFileName(execId, jobId, attempt);
+ File logFile = new File(path.getParentFile(), logFileName);
+
+ if (!logFile.exists()) {
return null;
}
- return runner.getLogFile();
+ return logFile;
}
public boolean isRunnerThreadAlive() {
- if (currentThread != null) {
- return currentThread.isAlive();
+ if (flowRunnerThread != null) {
+ return flowRunnerThread.isAlive();
}
return false;
}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 58beb8e..79192c0 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -468,27 +469,17 @@ public class FlowRunnerManager implements EventListener {
runner.resume(user);
}
- public void pauseJob(int execId, String jobId, String user) throws ExecutorManagerException {
+ public void retryFailures(int execId, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
throw new ExecutorManagerException("Execution " + execId + " is not running.");
}
- runner.pauseJob(jobId, user);
+ runner.retryFailures(user);
}
- public void resumeJob(int execId, String jobId, String user) throws ExecutorManagerException {
- FlowRunner runner = runningFlows.get(execId);
-
- if (runner == null) {
- throw new ExecutorManagerException("Execution " + execId + " is not running.");
- }
-
- runner.resumeJob(jobId, user);
- }
-
- public void retryJobs(int execId, String user, String ... jobId) throws ExecutorManagerException {
+ public void retryJobs(int execId, String user, List<String> jobId) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
@@ -498,36 +489,6 @@ public class FlowRunnerManager implements EventListener {
runner.retryJobs(jobId, user);
}
- public void disableJob(int execId, String user, String jobId) throws ExecutorManagerException {
- FlowRunner runner = runningFlows.get(execId);
-
- if (runner == null) {
- throw new ExecutorManagerException("Execution " + execId + " is not running.");
- }
-
- runner.disableJob(jobId, user);
- }
-
- public void enableJob(int execId, String user, String jobId) throws ExecutorManagerException {
- FlowRunner runner = runningFlows.get(execId);
-
- if (runner == null) {
- throw new ExecutorManagerException("Execution " + execId + " is not running.");
- }
-
- runner.enableJob(jobId, user);
- }
-
- public void cancelJob(int execId, String user, String jobId) throws ExecutorManagerException {
- FlowRunner runner = runningFlows.get(execId);
-
- if (runner == null) {
- throw new ExecutorManagerException("Execution " + execId + " is not running.");
- }
-
- runner.cancelJob(jobId, user);
- }
-
public ExecutableFlow getExecutableFlow(int execId) {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index e5e0d02..9ceb7c0 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -130,7 +130,7 @@ public class JobRunner extends EventHandler implements Runnable {
logger = Logger.getLogger(loggerName);
// Create file appender
- String logName = node.getAttempt() > 0 ? "_job." + executionId + "." + node.getAttempt() + "." + node.getJobId() + ".log" : "_job." + executionId + "." + node.getJobId() + ".log";
+ String logName = createLogFileName(node.getExecutionId(), node.getJobId(), node.getAttempt());
logFile = new File(workingDir, logName);
String absolutePath = logFile.getAbsolutePath();
@@ -378,4 +378,8 @@ public class JobRunner extends EventHandler implements Runnable {
public File getLogFile() {
return logFile;
}
+
+ public static String createLogFileName(int executionId, String jobId, int attempt) {
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
+ }
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 6bda5cd..7bac5d1 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -33,6 +33,7 @@ public interface ConnectorParams {
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
+ public static final String MODIFY_RETRY_FAILURES = "retryFailures";
public static final String MODIFY_RETRY_JOBS = "retryJobs";
public static final String MODIFY_CANCEL_JOBS = "cancelJobs";
public static final String MODIFY_DISABLE_JOBS = "skipJobs";
src/java/azkaban/executor/ExecutorManager.java 38(+26 -12)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 84781a4..76f76ec 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -261,6 +261,10 @@ public class ExecutorManager {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
}
+ public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
+ }
+
public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
}
@@ -285,21 +289,31 @@ public class ExecutorManager {
throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
}
- for (String jobId: jobIds) {
- if (!jobId.isEmpty()) {
- ExecutableNode node = exFlow.getExecutableNode(jobId);
- if (node == null) {
- throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+ Map<String, Object> response = null;
+ if (jobIds != null && jobIds.length > 0) {
+ for (String jobId: jobIds) {
+ if (!jobId.isEmpty()) {
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+ }
}
}
+ String ids = StringUtils.join(jobIds, ',');
+ response = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION,
+ userId,
+ new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+ new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+ }
+ else {
+ response = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION,
+ userId,
+ new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
}
- String ids = StringUtils.join(jobIds, ',');
- Map<String, Object> response = callExecutorServer(
- pair.getFirst(),
- ConnectorParams.MODIFY_EXECUTION_ACTION,
- userId,
- new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
- new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
return response;
}
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 86109bf..e3c9c2d 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -187,7 +187,7 @@ public class PropsUtils {
return resolvedProps;
}
- public static Props produceParentProperties(final ExecutableFlow flow) {
+ public static Props addCommonFlowProperties(final ExecutableFlow flow) {
Props parentProps = new Props();
parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f5f8c51..2536fbd 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -351,11 +351,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- String jobs = getParam(req, "jobIds");
- String[] jobIds = jobs.split("\\s*,\\s*");
-
try {
- executorManager.retryExecutingJobs(exFlow, user.getUserId(), jobIds);
+ executorManager.retryFailures(exFlow, user.getUserId());
} catch (ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
src/web/js/azkaban.exflow.view.js 43(+2 -41)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 3b6ed15..59dd6cb 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -180,50 +180,11 @@ azkaban.FlowTabView= Backbone.View.extend({
},
handleRetryClick : function(evt) {
var graphData = graphModel.get("data");
-
- var failedJobs = new Array();
- var failedJobStr = "";
- var nodes = graphData.nodes;
-
- for (var i = 0; i < nodes.length; ++i) {
- var node = nodes[i];
- if(node.status=='FAILED') {
- failedJobs.push(node.id);
- }
- else if (node.status=='KILLED') {
- // Nodes can be in a killed state, even if the parents have succeeded due to failure option Finish running
- // We want to re-enable those.
- var shouldAdd = true;
- if (node.in) {
- var size = 0;
- for(var key in node.in) {
- size++;
- var dependency = node.in[key];
- if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
- shouldAdd = false;
- break;
- }
- }
-
- if (size == 0) {
- shouldAdd = false;
- }
- }
- else {
- shouldAdd = false;
- }
-
- if (shouldAdd) {
- failedJobs.push(node.id);
- }
- }
- }
- failedJobStr = failedJobs.join();
-
+
var requestURL = contextURL + "/executor";
ajaxCall(
requestURL,
- {"execid": execId, "ajax":"retryFailedJobs", "jobIds":failedJobStr},
+ {"execid": execId, "ajax":"retryFailedJobs"},
function(data) {
console.log("cancel clicked");
if (data.error) {
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index 689dee1..b75f7c6 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -8,11 +8,13 @@ public class SleepJavaJob {
private boolean fail;
private String seconds;
private int attempts;
+ private int currentAttempt;
private String id;
public SleepJavaJob(String id, Map<String, String> parameters) {
this.id = id;
String failStr = parameters.get("fail");
+
if (failStr == null || failStr.equals("false")) {
fail = false;
}
@@ -20,6 +22,7 @@ public class SleepJavaJob {
fail = true;
}
+ currentAttempt = parameters.containsKey("azkaban.job.attempt") ? Integer.parseInt(parameters.get("azkaban.job.attempt")) : 0;
String attemptString = parameters.get("passRetry");
if (attemptString == null) {
attempts = -1;
@@ -28,7 +31,13 @@ public class SleepJavaJob {
attempts = Integer.valueOf(attemptString);
}
seconds = parameters.get("seconds");
- System.out.println("Properly created");
+
+ if (fail) {
+ System.out.println("Planning to fail after " + seconds + " seconds. Attempts left " + currentAttempt + " of " + attempts);
+ }
+ else {
+ System.out.println("Planning to succeed after " + seconds + " seconds.");
+ }
}
public void run() throws Exception {
@@ -45,19 +54,9 @@ public class SleepJavaJob {
System.out.println("Interrupted " + fail);
}
}
-
- File file = new File("");
- File[] attemptFiles = file.listFiles(new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- return pathname.getName().startsWith(id);
- }});
-
- if (fail) {
- if (attempts <= 0 || attemptFiles == null || attemptFiles.length > attempts) {
- File attemptFile = new File(file, id + "." + (attemptFiles == null ? 0 : attemptFiles.length));
- attemptFile.mkdirs();
+ if (fail) {
+ if (attempts <= 0 || currentAttempt <= attempts) {
throw new Exception("I failed because I had to.");
}
}