azkaban-uncached
Changes
src/java/azkaban/executor/FlowRunner.java 200(+133 -67)
src/java/azkaban/jobExecutor/ProcessJob.java 242(+54 -188)
src/web/js/azkaban.project.view.js 4(+2 -2)
unit/executions/exectest1/exec2.flow 154(+154 -0)
unit/executions/exectest1/job2d.job 5(+5 -0)
unit/executions/exectest1/job6.job 2(+1 -1)
unit/java/azkaban/test/executor/FlowRunnerTest.java 142(+141 -1)
Details
diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
index 21b1ec7..555fd51 100644
--- a/src/java/azkaban/executor/event/Event.java
+++ b/src/java/azkaban/executor/event/Event.java
@@ -8,7 +8,8 @@ public class Event {
JOB_STARTED,
JOB_SUCCEEDED,
JOB_FAILED,
- JOB_KILLED
+ JOB_KILLED,
+ JOB_SKIPPED
}
private final Object runner;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 7d0efba..5591f87 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -42,7 +42,6 @@ public class ExecutableFlow {
private Integer pipelineLevel = null;
private Map<String, String> flowParameters = new HashMap<String, String>();
- private Props globalProps;
public enum FailureAction {
FINISH_CURRENTLY_RUNNING,
@@ -64,14 +63,6 @@ public class ExecutableFlow {
this.setFlow(flow);
}
- public void setGlobalProps(Props props) {
- globalProps = props;
- }
-
- public Props getGlobalProps() {
- return globalProps;
- }
-
public ExecutableFlow() {
}
src/java/azkaban/executor/FlowRunner.java 200(+133 -67)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index d479d83..a47f4a2 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -5,10 +5,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -25,19 +23,18 @@ import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
import azkaban.executor.event.EventListener;
import azkaban.flow.FlowProps;
-import azkaban.jobExecutor.utils.JobWrappingFactory;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
- private static final Layout DEFAULT_LAYOUT = new PatternLayout(
- "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
public static final int NUM_CONCURRENT_THREADS = 10;
@@ -63,12 +60,12 @@ public class FlowRunner extends EventHandler implements Runnable {
private Thread currentThread;
private List<String> jobsFinished;
-
- public enum FailedFlowOptions {
- FINISH_RUNNING_JOBS, KILL_ALL
- }
-
- private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
+ // Underlying global properties
+ private Props globalProps = null;
+ // Used to override all regular properties.
+ private Props flowOverrideProps = null;
+
+ private FailureAction failedAction;
public FlowRunner(ExecutableFlow flow) {
this.flow = flow;
@@ -78,6 +75,10 @@ public class FlowRunner extends EventHandler implements Runnable {
this.listener = new JobRunnerEventListener(this);
this.jobsFinished = new ArrayList<String>();
+ if (flow.getFlowParameters() != null && !flow.getFlowParameters().isEmpty()) {
+ flowOverrideProps = new Props(null, flow.getFlowParameters());
+ }
+ failedAction = flow.getFailureAction();
createLogger();
}
@@ -120,34 +121,49 @@ public class FlowRunner extends EventHandler implements Runnable {
executorService.shutdownNow();
if (pausedJobsToRun.size() > 0) {
- logger.info("Cancelling... Clearing paused jobs queue of size "
- + pausedJobsToRun.size());
+ logger.info("Cancelling... Clearing paused jobs queue of size " + pausedJobsToRun.size());
pausedJobsToRun.clear();
}
- // Loop through job runners
for (JobRunner runner : runningJobs.values()) {
- if (runner.getStatus() == Status.WAITING
- || runner.getStatus() == Status.RUNNING
- || runner.getStatus() == Status.PAUSED) {
+ if (runner.getStatus() == Status.WAITING ||
+ runner.getStatus() == Status.RUNNING ||
+ runner.getStatus() == Status.PAUSED ) {
+
logger.info("Cancelling... Killing job "
+ runner.getNode().getId() + " with status "
+ runner.getStatus());
runner.cancel();
}
}
-
+
+ runningJobs.clear();
+ long endTime = System.currentTimeMillis();
+ for (ExecutableNode fnode : flow.getExecutableNodes()) {
+ switch (fnode.getStatus()) {
+ case UNKNOWN:
+ case READY:
+ fnode.setStatus(Status.KILLED);
+ fnode.setStartTime(endTime);
+ fnode.setEndTime(endTime);
+ default:
+ }
+ }
+
logger.info("Flow cancelled.");
- if (flow.getStatus() != Status.FAILED) {
+ if (flow.getStatus() == Status.FAILED_FINISHING) {
+ flow.setStatus(Status.FAILED);
+ }
+ else if (flow.getStatus() != Status.FAILED) {
flow.setStatus(Status.KILLED);
}
- flow.setEndTime(System.currentTimeMillis());
- this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ long time = System.currentTimeMillis();
+ flow.setStartTime(time);
+ flow.setEndTime(time);
}
public synchronized void pause(String user) {
- if (flow.getStatus() == Status.RUNNING
- || flow.getStatus() == Status.WAITING) {
+ if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.WAITING) {
logger.info("Flow paused by " + user);
paused = true;
flow.setStatus(Status.PAUSED);
@@ -215,8 +231,7 @@ public class FlowRunner extends EventHandler implements Runnable {
runningJobs.put(startNode, jobRunner);
}
} catch (IOException e) {
- logger.error("Starting job queueing failed due to "
- + e.getMessage());
+ logger.error("Starting job queueing failed due to " + e.getMessage());
flow.setStatus(Status.FAILED);
jobsToRun.clear();
runningJobs.clear();
@@ -242,7 +257,6 @@ public class FlowRunner extends EventHandler implements Runnable {
if (runner != null) {
try {
ExecutableNode node = runner.getNode();
- node.setStatus(Status.WAITING);
executorService.submit(runner);
logger.info("Job Started " + node.getId());
} catch (RejectedExecutionException e) {
@@ -268,23 +282,18 @@ public class FlowRunner extends EventHandler implements Runnable {
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
}
}
flow.setEndTime(System.currentTimeMillis());
if (flow.getStatus() == Status.RUNNING) {
- logger.info("Flow finished successfully in "
- + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.SUCCEEDED);
} else if (flow.getStatus() == Status.KILLED) {
- logger.info("Flow was killed in "
- + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ logger.info("Flow was killed in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.KILLED);
} else {
- logger.info("Flow finished with failures in "
- + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.FAILED);
}
@@ -293,14 +302,20 @@ public class FlowRunner extends EventHandler implements Runnable {
closeLogger();
}
- private JobRunner createJobRunner(ExecutableNode node, Props previousOutput)
- throws IOException {
+ private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
String source = node.getJobPropsSource();
String propsSource = node.getPropsSource();
- Props parentProps = propsSource == null ? flow.getGlobalProps() : sharedProps
- .get(propsSource);
+ // If no properties are set, we just set the global properties.
+ Props parentProps = propsSource == null ? globalProps : sharedProps.get(propsSource);
+ // Set up overrides
+ if (flowOverrideProps != null) {
+ Props clonedOverride = Props.clone(flowOverrideProps);
+ clonedOverride.setParent(parentProps);
+ parentProps = clonedOverride;
+ }
+
// We add the previous job output and put into this props.
if (previousOutput != null) {
Props earliestParent = previousOutput.getEarliestAncestor();
@@ -308,7 +323,8 @@ public class FlowRunner extends EventHandler implements Runnable {
parentProps = earliestParent;
}
-
+
+ // Load job file.
File propsFile = new File(basePath, source);
Props jobProps = new Props(parentProps, propsFile);
@@ -342,11 +358,19 @@ public class FlowRunner extends EventHandler implements Runnable {
else {
String source = fprops.getSource();
Props props = sharedProps.get(source);
- props.setParent(flow.getGlobalProps());
+ props.setParent(globalProps);
}
}
}
+ public void setGlobalProps(Props props) {
+ globalProps = props;
+ }
+
+ public Props getGlobalProps() {
+ return globalProps;
+ }
+
private void interrupt() {
currentThread.interrupt();
}
@@ -356,21 +380,31 @@ public class FlowRunner extends EventHandler implements Runnable {
return;
}
+ // Check killed case.
for (String dependent : node.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-
+
// Check all dependencies
boolean ready = true;
for (String dependency : dependentNode.getInNodes()) {
- ExecutableNode dependencyNode = flow
- .getExecutableNode(dependency);
- if (dependencyNode.getStatus() != Status.SUCCEEDED
- && dependencyNode.getStatus() != Status.DISABLED) {
+ ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+ Status depStatus = dependencyNode.getStatus();
+ if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+ // We trickle failures down the graph.
+ dependencyNode.setStatus(Status.KILLED);
+ }
+ else if (depStatus == Status.SUCCEEDED || depStatus == Status.SKIPPED) {
+ // We do nothing here. We proceed happily.
+ }
+ else {
+ // In this state, it's running, or waiting. Either way, we don't proceed.
ready = false;
break;
}
+
}
+ // Dependency has been met.
if (ready) {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
@@ -386,8 +420,7 @@ public class FlowRunner extends EventHandler implements Runnable {
JobRunner runner = null;
try {
- runner = this
- .createJobRunner(dependentNode, previousOutput);
+ runner = this.createJobRunner(dependentNode, previousOutput);
} catch (IOException e) {
logger.error("JobRunner creation failed due to "
+ e.getMessage());
@@ -414,24 +447,46 @@ public class FlowRunner extends EventHandler implements Runnable {
private void handleFailedJob(ExecutableNode node) {
System.err.println("Job " + node.getId() + " failed.");
- this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
-
- switch (failedOptions) {
+ if (flow.getStatus() != Status.FAILED_FINISHING && flow.getStatus() != Status.FAILED) {
+ this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
+ }
+
+ switch (failedAction) {
// We finish running current jobs and then fail. Do not accept new jobs.
- case FINISH_RUNNING_JOBS:
+ case FINISH_CURRENTLY_RUNNING:
+ logger.info("Failure Action: Finish up remaining running jobs.");
flow.setStatus(Status.FAILED_FINISHING);
runningJobs.clear();
executorService.shutdown();
+
+ // Go through and mark everything else killed.
+ long endTime = System.currentTimeMillis();
+ for (ExecutableNode fnode : flow.getExecutableNodes()) {
+ switch (fnode.getStatus()) {
+ case UNKNOWN:
+ case READY:
+ fnode.setStatus(Status.KILLED);
+ fnode.setStartTime(endTime);
+ fnode.setEndTime(endTime);
+ default:
+ }
+ }
+
break;
// We kill all running jobs and fail immediately
- case KILL_ALL:
+ case CANCEL_ALL:
+ logger.info("Failure Action: Kill flow immediately.");
+ flow.setStatus(Status.FAILED);
this.cancel("azkaban");
break;
+ default:
+ logger.info("Failure Action: Finishing accessible jobs.");
+ flow.setStatus(Status.FAILED_FINISHING);
}
runningJobs.remove(node.getId());
}
-
+
private class JobRunnerEventListener implements EventListener {
private FlowRunner flowRunner;
@@ -447,26 +502,37 @@ public class FlowRunner extends EventHandler implements Runnable {
System.out.println("Event " + jobID + " "
+ event.getType().toString());
-
-
// On Job success, we add the output props and then set up the next
// run.
- if (event.getType() == Type.JOB_SUCCEEDED) {
- logger.info("Job Succeeded " + jobID + " in "
- + (node.getEndTime() - node.getStartTime()) + " ms");
-
+ boolean handleFailure = false;
+ switch(event.getType()) {
+ case JOB_SUCCEEDED:
+ logger.info("Job Succeeded " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+ break;
+ case JOB_FAILED:
+ logger.info("Job Failed " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+ handleFailure = true;
+ break;
+ case JOB_KILLED:
+ logger.info("Job Killed " + jobID + " at " + node.getEndTime() + " ms");
+ break;
+ case JOB_SKIPPED:
+ logger.info("Job Disabled and skipped " + jobID + " at " + node.getEndTime() + " ms");
+ break;
+ default:
+ return;
+ }
+
+ if (handleFailure) {
+ jobsFinished.add(jobID);
+ logger.info(jobID + " FAILED");
+ flowRunner.handleFailedJob(runner.getNode());
+ }
+ else {
jobsFinished.add(jobID);
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
flowRunner.handleSucceededJob(runner.getNode());
- } else if (event.getType() == Type.JOB_FAILED) {
-
- logger.info("Job Failed " + jobID + " in "
- + (node.getEndTime() - node.getStartTime()) + " ms");
-
- jobsFinished.add(jobID);
- logger.info(jobID + " FAILED");
- flowRunner.handleFailedJob(runner.getNode());
}
flowRunner.commitFlow();
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 2e41f11..3107491 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -69,10 +69,10 @@ public class FlowRunnerManager {
File dir = new File(path);
ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
flow.setExecutionPath(path);
- flow.setGlobalProps(globalProps);
FlowRunner runner = new FlowRunner(flow);
runningFlows.put(id, runner);
+ runner.setGlobalProps(globalProps);
runner.addListener(eventListener);
executorService.submit(runner);
}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 71e4e87..1e454c1 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -140,6 +140,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
public synchronized void cancel() {
+ logError("Cancel has been called.");
// Cancel code here
if (job == null) {
logError("Job doesn't exist!");
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 7306a4e..70a35d6 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -66,7 +66,7 @@ public abstract class LongArgJob extends AbstractProcessJob {
info("Environment variables: " + builder.getEnv());
info("Working directory: " + builder.getWorkingDir());
- File [] propFiles = initPropsFiles( );
+ File [] propFiles = initPropsFiles();
//System.err.println("outputfile=" + propFiles[1]);
boolean success = false;
@@ -124,6 +124,4 @@ public abstract class LongArgJob extends AbstractProcessJob {
if(!suppressed.contains(key))
builder.addArg("--" + key, props.get(key));
}
-
-
}
src/java/azkaban/jobExecutor/ProcessJob.java 242(+54 -188)
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 42a1925..8024fdd 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -22,11 +22,17 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import azkaban.jobExecutor.utils.process.AzkabanProcess;
+import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
import azkaban.utils.Props;
/*
@@ -35,105 +41,66 @@ import azkaban.utils.Props;
* @author jkreps
*
*/
-public class ProcessJob extends AbstractProcessJob implements Job {
+public class ProcessJob extends AbstractProcessJob {
public static final String COMMAND = "command";
- public static final int CLEAN_UP_TIME_MS = 1000;
-
- private volatile Process _process;
- private volatile boolean _isComplete;
- private volatile boolean _isCancelled;
+ private static final long KILL_TIME_MS = 5000;
+ private volatile AzkabanProcess process;
public ProcessJob(final String jobId, final Props props, final Logger log) {
super(jobId, props, log);
}
@Override
- public void run() {
- synchronized (this) {
- _isCancelled = false;
- }
+ public void run() throws Exception {
resolveProps();
-
- // Sets a list of all the commands that need to be run.
List<String> commands = getCommandList();
- info(commands.size() + " commands to execute.");
- File[] propFiles = initPropsFiles();
+ long startMs = System.currentTimeMillis();
- // System.err.println("in process job outputFile=" +propFiles[1]);
+ info(commands.size() + " commands to execute.");
+ File[] propFiles = initPropsFiles();
+ Map<String, String> envVars = getEnvironmentVariables();
- // For each of the jobs, set up a process and run them.
for (String command : commands) {
- info("Executing command: " + command);
- String[] cmdPieces = partitionCommandLine(command);
-
- ProcessBuilder builder = new ProcessBuilder(cmdPieces);
+ AzkabanProcessBuilder builder = new AzkabanProcessBuilder(partitionCommandLine(command))
+ .setEnv(envVars)
+ .setWorkingDir(getCwd())
+ .setLogger(getLog());
+
+ info("Command: " + builder.getCommandString());
+ if (builder.getEnv().size() > 0) {
+ info("Environment variables: " + builder.getEnv());
+ }
+ info("Working directory: " + builder.getWorkingDir());
- builder.directory(new File(getCwd()));
- builder.environment().putAll(getEnvironmentVariables());
+ boolean success = false;
+ this.process = builder.build();
try {
- _process = builder.start();
- } catch (IOException e) {
- for (File file : propFiles) {
- if (file != null && file.exists()) {
+ this.process.run();
+ success = true;
+ } catch (Exception e) {
+ for (File file : propFiles)
+ if (file != null && file.exists())
file.delete();
- }
- }
throw new RuntimeException(e);
- }
- LoggingGobbler outputGobbler = new LoggingGobbler(
- new InputStreamReader(_process.getInputStream()),
- Level.INFO);
- LoggingGobbler errorGobbler = new LoggingGobbler(
- new InputStreamReader(_process.getErrorStream()),
- Level.ERROR);
-
- int processId = getProcessId();
- if (processId == 0) {
- info("Spawned thread. Unknowned processId");
- } else {
- info("Spawned thread with processId " + processId);
- }
- outputGobbler.start();
- errorGobbler.start();
- int exitCode = -999;
- try {
- exitCode = _process.waitFor();
-
- _isComplete = true;
- if (exitCode != 0) {
- for (File file : propFiles) {
- if (file != null && file.exists()) {
- file.delete();
- }
- }
- throw new RuntimeException(
- "Processes ended with exit code " + exitCode + ".");
- }
-
- // try to wait for everything to get logged out before exiting
- outputGobbler.join(1000);
- errorGobbler.join(1000);
- } catch (InterruptedException e) {
} finally {
- outputGobbler.close();
- errorGobbler.close();
+ this.process = null;
+ info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
+ + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
}
}
// Get the output properties from this job.
generateProperties(propFiles[1]);
- for (File file : propFiles) {
- if (file != null && file.exists()) {
+ for (File file : propFiles)
+ if (file != null && file.exists())
file.delete();
- }
- }
-
}
+
protected List<String> getCommandList() {
List<String> commands = new ArrayList<String>();
commands.add(_props.getString(COMMAND));
@@ -144,119 +111,24 @@ public class ProcessJob extends AbstractProcessJob implements Job {
return commands;
}
- @Override
- public void cancel() throws Exception {
- if (_process != null) {
- int processId = getProcessId();
- if (processId != 0) {
- warn("Attempting to kill the process " + processId);
- try {
- Runtime.getRuntime().exec("kill " + processId);
- synchronized (this) {
- wait(CLEAN_UP_TIME_MS);
- }
- } catch (InterruptedException e) {
- // Do nothing. We don't really care.
- }
- if (!_isComplete) {
- error("After "
- + CLEAN_UP_TIME_MS
- + " ms, the job hasn't terminated. Will force terminate the job.");
- }
- } else {
- info("Could not get process id");
- }
-
- if (!_isComplete) {
- warn("Force kill the process");
- _process.destroy();
- }
- synchronized (this) {
- _isCancelled = true;
- }
- }
- }
-
+ @Override
+ public void cancel() throws InterruptedException {
+ if(process == null)
+ throw new IllegalStateException("Not started.");
+ boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+ if(!killed) {
+ warn("Kill with signal TERM failed. Killing with KILL signal.");
+ process.hardKill();
+ }
+ }
+
+ @Override
+ public double getProgress() {
+ return process != null && process.isComplete()? 1.0 : 0.0;
+ }
+
public int getProcessId() {
- int processId = 0;
-
- try {
- Field f = _process.getClass().getDeclaredField("pid");
- f.setAccessible(true);
-
- processId = f.getInt(_process);
- } catch (Throwable e) {
- }
-
- return processId;
- }
-
- @Override
- public double getProgress() {
- return _isComplete ? 1.0 : 0.0;
- }
-
- private class LoggingGobbler extends Thread {
-
- private final BufferedReader _inputReader;
- private final Level _loggingLevel;
-
- public LoggingGobbler(final InputStreamReader inputReader,
- final Level level) {
- _inputReader = new BufferedReader(inputReader);
- _loggingLevel = level;
- }
-
- public void close() {
- if (_inputReader != null) {
- try {
- _inputReader.close();
- } catch (IOException e) {
- error("Error cleaning up logging stream reader:", e);
- }
- }
- }
-
- @Override
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- String line = _inputReader.readLine();
- if (line == null) {
- return;
- }
-
- logMessage(line);
- }
- } catch (IOException e) {
- error("Error reading from logging stream:", e);
- }
- }
-
- private void logMessage(final String message) {
- if (message.startsWith(Level.DEBUG.toString())) {
- String newMsg = message.substring(Level.DEBUG.toString().length());
- getLog().debug(newMsg);
- } else if (message.startsWith(Level.ERROR.toString())) {
- String newMsg = message.substring(Level.ERROR.toString().length());
- getLog().error(newMsg);
- } else if (message.startsWith(Level.INFO.toString())) {
- String newMsg = message.substring(Level.INFO.toString().length());
- getLog().info(newMsg);
- } else if (message.startsWith(Level.WARN.toString())) {
- String newMsg = message.substring(Level.WARN.toString().length());
- getLog().warn(newMsg);
- } else if (message.startsWith(Level.FATAL.toString())) {
- String newMsg = message.substring(Level.FATAL.toString().length());
- getLog().fatal(newMsg);
- } else if (message.startsWith(Level.TRACE.toString())) {
- String newMsg = message.substring(Level.TRACE.toString().length());
- getLog().trace(newMsg);
- } else {
- getLog().log(_loggingLevel, message);
- }
-
- }
+ return process.getProcessId();
}
@Override
@@ -331,10 +203,4 @@ public class ProcessJob extends AbstractProcessJob implements Job {
return commands.toArray(new String[commands.size()]);
}
-
- @Override
- public synchronized boolean isCanceled() {
- return _isCancelled;
- }
-
}
src/web/js/azkaban.project.view.js 4(+2 -2)
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index e416a6a..6e0613f 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -30,8 +30,8 @@ azkaban.ProjectView= Backbone.View.extend({
position: ["20%",],
containerId: 'confirm-container',
containerCss: {
- 'height': '220px',
- 'width': '565px'
+ 'height': '240px',
+ 'width': '640px'
},
onShow: function (dialog) {
var modal = this;
unit/executions/exectest1/exec2.flow 154(+154 -0)
diff --git a/unit/executions/exectest1/exec2.flow b/unit/executions/exectest1/exec2.flow
new file mode 100644
index 0000000..f31c99e
--- /dev/null
+++ b/unit/executions/exectest1/exec2.flow
@@ -0,0 +1,154 @@
+{
+ "id" : "derived-member-data",
+ "success.email" : [],
+ "edges" : [ {
+ "source" : "job1",
+ "target" : "job2d"
+ }, {
+ "source" : "job2d",
+ "target" : "job3"
+ },{
+ "source" : "job2d",
+ "target" : "job4"
+ }, {
+ "source" : "job3",
+ "target" : "job5"
+ },{
+ "source" : "job4",
+ "target" : "job5"
+ },{
+ "source" : "job5",
+ "target" : "job7"
+ },{
+ "source" : "job1",
+ "target" : "job6"
+ },{
+ "source" : "job6",
+ "target" : "job7"
+ },{
+ "source" : "job7",
+ "target" : "job8"
+ },{
+ "source" : "job7",
+ "target" : "job9"
+ },
+ {
+ "source" : "job8",
+ "target" : "job10"
+ },
+ {
+ "source" : "job9",
+ "target" : "job10"
+ }
+ ],
+ "failure.email" : [],
+ "nodes" : [ {
+ "propSource" : "prop2.properties",
+ "id" : "job1",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job1.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job2d",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job2d.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job3",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job3.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job4",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job4.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job5",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job5.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job6",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job6.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job7",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job7.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job8",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job8.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job9",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job9.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job10",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job10.job",
+ "expectedRuntime" : 1
+ }
+ ],
+ "layedout" : false,
+ "type" : "flow",
+ "props" : [ {
+ "inherits" : "prop1.properties",
+ "source" : "prop2.properties"
+ },{
+ "source" : "prop1.properties"
+ }]
+}
\ No newline at end of file
unit/executions/exectest1/job2d.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job2d.job b/unit/executions/exectest1/job2d.job
new file mode 100644
index 0000000..b4216ba
--- /dev/null
+++ b/unit/executions/exectest1/job2d.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job1
+seconds=1
+fail=true
unit/executions/exectest1/job6.job 2(+1 -1)
diff --git a/unit/executions/exectest1/job6.job b/unit/executions/exectest1/job6.job
index fc4474d..b5df29c 100644
--- a/unit/executions/exectest1/job6.job
+++ b/unit/executions/exectest1/job6.job
@@ -1,5 +1,5 @@
type=java
job.class=azkaban.test.executor.SleepJavaJob
dependencies=job1
-seconds=1
+seconds=4
fail=false
diff --git a/unit/java/azkaban/test/executor/EventCollectorListener.java b/unit/java/azkaban/test/executor/EventCollectorListener.java
index 6e00681..e648c2a 100644
--- a/unit/java/azkaban/test/executor/EventCollectorListener.java
+++ b/unit/java/azkaban/test/executor/EventCollectorListener.java
@@ -18,6 +18,13 @@ public class EventCollectorListener implements EventListener {
return eventList;
}
+ public void writeAllEvents() {
+ for (Event event: eventList) {
+ System.out.print(event.getType());
+ System.out.print(",");
+ }
+ }
+
public boolean checkOrdering() {
long time = 0;
for (Event event: eventList) {
unit/java/azkaban/test/executor/FlowRunnerTest.java 142(+141 -1)
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
index f6d6786..87a593c 100644
--- a/unit/java/azkaban/test/executor/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -3,6 +3,7 @@ package azkaban.test.executor;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import junit.framework.Assert;
@@ -13,7 +14,11 @@ import org.junit.Before;
import org.junit.Test;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.FlowRunner;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.event.Event.Type;
import azkaban.flow.Flow;
import azkaban.utils.JSONUtils;
@@ -45,13 +50,121 @@ public class FlowRunnerTest {
}
@Test
- public void exec1() throws Exception {
+ public void exec1Normal() throws Exception {
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow = prepareExecDir(testDir, "exec1");
+ EventCollectorListener eventCollector = new EventCollectorListener();
FlowRunner runner = new FlowRunner(exFlow);
+ runner.addListener(eventCollector);
+ Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+ runner.run();
+ Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
+ Assert.assertTrue(runner.getJobsFinished().size() == exFlow.getExecutableNodes().size());
+ compareFinishedRuntime(runner);
+ testStatus(exFlow, "job1", Status.SUCCEEDED);
+ testStatus(exFlow, "job2", Status.SUCCEEDED);
+ testStatus(exFlow, "job3", Status.SUCCEEDED);
+ testStatus(exFlow, "job4", Status.SUCCEEDED);
+ testStatus(exFlow, "job5", Status.SUCCEEDED);
+ testStatus(exFlow, "job6", Status.SUCCEEDED);
+ testStatus(exFlow, "job7", Status.SUCCEEDED);
+ testStatus(exFlow, "job8", Status.SUCCEEDED);
+ testStatus(exFlow, "job9", Status.SUCCEEDED);
+ testStatus(exFlow, "job10", Status.SUCCEEDED);
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void exec1Failed() throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(testDir, "exec2");
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = new FlowRunner(exFlow);
+ runner.addListener(eventCollector);
+
+ Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+ runner.run();
+
+ Assert.assertTrue(exFlow.getStatus() == Status.FAILED);
+
+ testStatus(exFlow, "job1", Status.SUCCEEDED);
+ testStatus(exFlow, "job2d", Status.FAILED);
+ testStatus(exFlow, "job3", Status.KILLED);
+ testStatus(exFlow, "job4", Status.KILLED);
+ testStatus(exFlow, "job5", Status.KILLED);
+ testStatus(exFlow, "job6", Status.SUCCEEDED);
+ testStatus(exFlow, "job7", Status.KILLED);
+ testStatus(exFlow, "job8", Status.KILLED);
+ testStatus(exFlow, "job9", Status.KILLED);
+ testStatus(exFlow, "job10", Status.KILLED);
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FAILED_FINISHING, Type.FLOW_FINISHED});
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void exec1FailedKillAll() throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(testDir, "exec2");
+ exFlow.setFailureAction(FailureAction.CANCEL_ALL);
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = new FlowRunner(exFlow);
+ runner.addListener(eventCollector);
+
+ Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+ runner.run();
+
+ Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+
+ testStatus(exFlow, "job1", Status.SUCCEEDED);
+ testStatus(exFlow, "job2d", Status.FAILED);
+ testStatus(exFlow, "job3", Status.KILLED);
+ testStatus(exFlow, "job4", Status.KILLED);
+ testStatus(exFlow, "job5", Status.KILLED);
+ testStatus(exFlow, "job6", Status.FAILED);
+ testStatus(exFlow, "job7", Status.KILLED);
+ testStatus(exFlow, "job8", Status.KILLED);
+ testStatus(exFlow, "job9", Status.KILLED);
+ testStatus(exFlow, "job10", Status.KILLED);
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FAILED_FINISHING, Type.FLOW_FINISHED});
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+ eventCollector.writeAllEvents();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private void testStatus(ExecutableFlow flow, String name, Status status) {
+ ExecutableNode node = flow.getExecutableNode(name);
+
+ if (node.getStatus() != status) {
+ Assert.fail("Status of job " + node.getId() + " is " + node.getStatus() + " not " + status + " as expected.");
+ }
}
private ExecutableFlow prepareExecDir(File execDir, String execName) throws IOException {
@@ -66,4 +179,31 @@ public class FlowRunnerTest {
return execFlow;
}
+ private void compareFinishedRuntime(FlowRunner runner) throws Exception {
+ ExecutableFlow flow = runner.getFlow();
+ for (String flowName: flow.getStartNodes()) {
+ ExecutableNode node = flow.getExecutableNode(flowName);
+ compareStartFinishTimes(flow, node, 0);
+ }
+ }
+
+ private void compareStartFinishTimes(ExecutableFlow flow, ExecutableNode node, long previousEndTime) throws Exception {
+ long startTime = node.getStartTime();
+ long endTime = node.getEndTime();
+
+ // If start time is < 0, so will the endtime.
+ if (startTime <= 0) {
+ Assert.assertTrue(endTime <=0);
+ return;
+ }
+
+ System.out.println("Node " + node.getId() + " start:" + startTime + " end:" + endTime + " previous:" + previousEndTime);
+ Assert.assertTrue("Checking start and end times", startTime > 0 && endTime >= startTime);
+ Assert.assertTrue("Start time for " + node.getId() + " is " + startTime +" and less than " + previousEndTime, startTime >= previousEndTime);
+
+ for (String outNode : node.getOutNodes()) {
+ ExecutableNode childNode = flow.getExecutableNode(outNode);
+ compareStartFinishTimes(flow, childNode, endTime);
+ }
+ }
}
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
index ffe68d7..b770624 100644
--- a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -101,7 +101,7 @@ public class JavaJobTest
}
@Test
- public void testJavaJob() {
+ public void testJavaJob() throws Exception {
/* initialize the Props */
props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
props.put(ProcessJob.WORKING_DIR, ".");
@@ -112,7 +112,7 @@ public class JavaJobTest
}
@Test
- public void testJavaJobHashmap() {
+ public void testJavaJobHashmap() throws Exception {
/* initialize the Props */
props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
props.put("seconds", 1);
@@ -124,7 +124,7 @@ public class JavaJobTest
}
@Test
- public void testFailedJavaJob() {
+ public void testFailedJavaJob() throws Exception {
props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
props.put(ProcessJob.WORKING_DIR, ".");
props.put("input", errorInputFile);
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index 1ec8c5b..48b1b1b 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -40,7 +40,7 @@ public class ProcessJobTest
}
@Test
- public void testOneUnixCommand() {
+ public void testOneUnixCommand() throws Exception {
/* initialize the Props */
props.put(ProcessJob.COMMAND, "ls -al");
props.put(ProcessJob.WORKING_DIR, ".");
@@ -50,7 +50,7 @@ public class ProcessJobTest
}
@Test
- public void testFailedUnixCommand() {
+ public void testFailedUnixCommand() throws Exception {
/* initialize the Props */
props.put(ProcessJob.COMMAND, "xls -al");
props.put(ProcessJob.WORKING_DIR, ".");
@@ -64,7 +64,7 @@ public class ProcessJobTest
}
@Test
- public void testMultipleUnixCommands( ) {
+ public void testMultipleUnixCommands( ) throws Exception {
/* initialize the Props */
props.put(ProcessJob.WORKING_DIR, ".");
props.put(ProcessJob.COMMAND, "pwd");