azkaban-aplcache

Added Cancelled state to represent a job which cancelled due

1/23/2014 1:21:02 AM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index f5f07f4..44b97a3 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -114,7 +114,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowPaused = false;
 	private boolean flowFailed = false;
 	private boolean flowFinished = false;
-	private boolean flowCancelled = false;
+	private boolean flowKilled = false;
 	
 	// The following is state that will trigger a retry of all failed jobs
 	private boolean retryFailedJobs = false;
@@ -392,7 +392,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		logger.info("Restarting all failed jobs");
 		
 		this.retryFailedJobs = false;
-		this.flowCancelled = false;
+		this.flowKilled = false;
 		this.flowFailed = false;
 		this.flow.setStatus(Status.RUNNING);
 		
@@ -415,7 +415,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		updateFlow();
 	}
-	
+
 	private boolean progressGraph() throws IOException {
 		finishedNodes.swap();
 
@@ -433,7 +433,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (!retryJobIfPossible(node)) {
 					propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
 					if (failureAction == FailureAction.CANCEL_ALL) {
-						this.cancel();
+						this.kill();
 					}
 					this.flowFailed = true;
 				}
@@ -495,9 +495,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			return false;
 		}
 		
-		if (nextNodeStatus == Status.KILLED) {
-			logger.info("Killing '" + node.getNestedId() + "' due to prior errors.");
-			node.killNode(System.currentTimeMillis());
+		if (nextNodeStatus == Status.CANCELLED) {
+			logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
+			node.cancelNode(System.currentTimeMillis());
 			finishExecutableNode(node);
 		}
 		else if (nextNodeStatus == Status.SKIPPED) {
@@ -573,7 +573,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		for(String end: flow.getEndNodes()) {
 			ExecutableNode node = flow.getExecutableNode(end);
 
-			if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED) {
+			if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED || node.getStatus() == Status.CANCELLED) {
 				succeeded = false;
 			}
 			
@@ -600,6 +600,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			break;
 		case FAILED:
 		case KILLED:
+		case CANCELLED:
 		case FAILED_SUCCEEDED:
 			logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
 			break;
@@ -675,7 +676,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// load the override props if any
 		try {
 			props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
-		}
+		} 
 		catch(ProjectManagerException e) {
 			e.printStackTrace();
 			logger.error("Error loading job override property for job " + node.getId());
@@ -741,7 +742,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (!Status.isStatusFinished(depStatus)) {
 				return null;
 			}
-			else if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+			else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
 				// We propagate failures as KILLED states.
 				shouldKill = true;
 			}
@@ -755,10 +756,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// If the flow has failed, and we want to finish only the currently running jobs, we just
 		// kill everything else. We also kill, if the flow has been cancelled.
 		if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
-			return Status.KILLED;
+			return Status.CANCELLED;
 		}
-		else if (shouldKill || isCancelled()) {
-			return Status.KILLED;
+		else if (shouldKill || isKilled()) {
+			return Status.CANCELLED;
 		}
 		
 		// All good to go, ready to run.
@@ -827,7 +828,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (flowFailed) {
 					flow.setStatus(Status.FAILED_FINISHING);
 				}
-				else if (flowCancelled) {
+				else if (flowKilled) {
 					flow.setStatus(Status.KILLED);
 				}
 				else {
@@ -839,20 +840,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	public void cancel(String user) {
+	public void kill(String user) {
 		synchronized(mainSyncObj) {
-			logger.info("Flow cancelled by " + user);
-			cancel();
+			logger.info("Flow killed by " + user);
+			kill();
 			updateFlow();
 		}
 		interrupt();
 	}
 	
-	private void cancel() {
+	private void kill() {
 		synchronized(mainSyncObj) {
-			logger.info("Cancel has been called on flow " + execId);
+			logger.info("Kill has been called on flow " + execId);
 			flowPaused = false;
-			flowCancelled = true;
+			flowKilled = true;
 			
 			if (watcher != null) {
 				logger.info("Watcher is attached. Stopping watcher.");
@@ -860,9 +861,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 				logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
 			}
 			
-			logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
+			logger.info("Killing " + activeJobRunners.size() + " jobs.");
 			for (JobRunner runner : activeJobRunners) {
-				runner.cancel();
+				runner.kill();
 			}
 		}
 	}
@@ -910,22 +911,26 @@ public class FlowRunner extends EventHandler implements Runnable {
 				case KILLED:
 				case FAILED:
 				case FAILED_FINISHING:
-					resetFailedState(base, nodesToRetry);	
+					resetFailedState(base, nodesToRetry);
+					continue;
+				case CANCELLED:
+					node.setStatus(Status.READY);
+					node.setEndTime(-1);
+					node.setStartTime(-1);
+					node.setUpdateTime(currentTime);
+					break;
 				default:
-				}
-				
-				if (base.getStatus() != Status.KILLED) {
 					continue;
 				}
 			}
-			else if (node.getStatus() == Status.KILLED) {
+			else if (node.getStatus() == Status.CANCELLED) {
 				// Not a flow, but killed
 				node.setStatus(Status.READY);
 				node.setStartTime(-1);
 				node.setEndTime(-1);
 				node.setUpdateTime(currentTime);
 			}
-			else if(node.getStatus() == Status.FAILED) {
+			else if(node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
 				node.resetForRetry();
 				nodesToRetry.add(node);
 			}
@@ -992,8 +997,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 
-	public boolean isCancelled() {
-		return flowCancelled;
+	public boolean isKilled() {
+		return flowKilled;
 	}
 	
 	public ExecutableFlow getExecutableFlow() {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 3fbdb15..f886e64 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -463,7 +463,7 @@ public class FlowRunnerManager implements EventListener {
 			throw new ExecutorManagerException("Execution " + execId + " is not running.");
 		}
 		
-		runner.cancel(user);
+		runner.kill(user);
 	}
 	
 	public void pauseFlow(int execId, String user) throws ExecutorManagerException {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 3de3d23..cbdcecc 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -87,7 +87,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	private int jobLogBackupIndex;
 
 	private long delayStartMs = 0;
-	private boolean cancelled = false;
+	private boolean killed = false;
 	private BlockingStatus currentBlockStatus = null;
 	
 	public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
@@ -259,15 +259,15 @@ public class JobRunner extends EventHandler implements Runnable {
 		boolean quickFinish = false;
 		long time = System.currentTimeMillis();
 		
-		if (this.isCancelled() || Status.isStatusFinished(nodeStatus)) {
+		if (this.isKilled() || Status.isStatusFinished(nodeStatus)) {
 			quickFinish = true;
 		}
 		else if (nodeStatus == Status.DISABLED) {
 			changeStatus(Status.SKIPPED, time);
 			quickFinish = true;
 		} 
-		else if (this.cancelled) {
-			changeStatus(Status.FAILED, time);
+		else if (this.killed) {
+			changeStatus(Status.KILLED, time);
 			quickFinish = true;
 		} 
 		
@@ -286,7 +286,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	 * If pipelining is set, will block on another flow's jobs.
 	 */
 	private boolean blockOnPipeLine() {
-		if (this.isCancelled()) {
+		if (this.isKilled()) {
 			return true;
 		}
 		
@@ -309,8 +309,8 @@ public class JobRunner extends EventHandler implements Runnable {
 					logger.info("Waiting on pipelined job " + bStatus.getJobId());
 					currentBlockStatus = bStatus;
 					bStatus.blockOnFinishedStatus();
-					if (this.isCancelled()) {
-						logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+					if (this.isKilled()) {
+						logger.info("Job was killed while waiting on pipeline. Quiting.");
 						return true;
 					}
 					else {
@@ -325,7 +325,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	}
 	
 	private boolean delayExecution() {
-		if (this.isCancelled()) {
+		if (this.isKilled()) {
 			return true;
 		}
 		
@@ -342,8 +342,8 @@ public class JobRunner extends EventHandler implements Runnable {
 				}
 			}
 			
-			if (this.isCancelled()) {
-				logger.info("Job was cancelled while in delay. Quiting.");
+			if (this.isKilled()) {
+				logger.info("Job was killed while in delay. Quiting.");
 				return true;
 			}
 		}
@@ -420,7 +420,7 @@ public class JobRunner extends EventHandler implements Runnable {
 
 		// Start the node.
 		node.setStartTime(System.currentTimeMillis());
-		if (!errorFound && !isCancelled()) {
+		if (!errorFound && !isKilled()) {
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
@@ -443,10 +443,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 		node.setEndTime(System.currentTimeMillis());
 
-		if (isCancelled()) {
-			changeStatus(Status.FAILED);
+		if (isKilled()) {
+			changeStatus(Status.KILLED);
 		}
-		logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
+		logInfo("Finishing job " + this.jobId + " at " + node.getEndTime() + " with status " + node.getStatus());
 		
 		fireEvent(Event.create(this, Type.JOB_FINISHED), false);
 		finalizeLogFile();
@@ -455,13 +455,13 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private boolean prepareJob() throws RuntimeException {
 		// Check pre conditions
-		if (props == null || cancelled) {
+		if (props == null || killed) {
 			logError("Failing job. The job properties don't exist");
 			return false;
 		}
 		
 		synchronized (syncObject) {
-			if (node.getStatus() == Status.FAILED || cancelled) {
+			if (node.getStatus() == Status.FAILED || killed) {
 				return false;
 			}
 
@@ -557,10 +557,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.fireEventListeners(event);
 	}
 	
-	public void cancel() {
+	public void kill() {
 		synchronized (syncObject) {
-			logError("Cancel has been called.");
-			this.cancelled = true;
+			logError("Kill has been called.");
+			this.killed = true;
 			
 			BlockingStatus status = currentBlockStatus;
 			if (status != null) {
@@ -587,8 +587,8 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	public boolean isCancelled() {
-		return cancelled;
+	public boolean isKilled() {
+		return killed;
 	}
 	
 	public Status getStatus() {
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index bacad6c..d444ef6 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -389,15 +389,15 @@ public class ExecutableNode {
 		applyUpdateObject(wrapper);
 	}
 	
-	public void killNode(long killTime) {
+	public void cancelNode(long cancelTime) {
 		if (this.status == Status.DISABLED) {
-			skipNode(killTime);
+			skipNode(cancelTime);
 		}
 		else {
-			this.setStatus(Status.KILLED);
-			this.setStartTime(killTime);
-			this.setEndTime(killTime);
-			this.setUpdateTime(killTime);
+			this.setStatus(Status.CANCELLED);
+			this.setStartTime(cancelTime);
+			this.setEndTime(cancelTime);
+			this.setUpdateTime(cancelTime);
 		}
 	}
 	
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index fa45066..7643d2f 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -17,7 +17,19 @@
 package azkaban.executor;
 
 public enum Status {
-	READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110), FAILED_SUCCEEDED(120);
+	READY(10), 
+	PREPARING(20), 
+	RUNNING(30), 
+	PAUSED(40), 
+	SUCCEEDED(50), 
+	KILLED(60), 
+	FAILED(70), 
+	FAILED_FINISHING(80), 
+	SKIPPED(90), 
+	DISABLED(100), 
+	QUEUED(110), 
+	FAILED_SUCCEEDED(120),
+	CANCELLED(130);
 	
 	private int numVal;
 
@@ -55,6 +67,8 @@ public enum Status {
 			return QUEUED;
 		case 120:
 			return FAILED_SUCCEEDED;
+		case 130:
+			return CANCELLED;
 		default:
 			return READY;
 		}
@@ -67,6 +81,7 @@ public enum Status {
 		case SUCCEEDED:
 		case SKIPPED:
 		case FAILED_SUCCEEDED:
+		case CANCELLED:
 			return true;
 		default:
 			return false;
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5454c99..5582114 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -106,7 +106,7 @@
 				<li id="statsViewLink"><a href="#stats">Stats</a></li>
 				<li class="nav-button pull-right"><button type="button" id="pausebtn" class="btn btn-primary">Pause</button></li>
 				<li class="nav-button pull-right"><button type="button" id="resumebtn" class="btn btn-primary">Resume</button></li>
-				<li class="nav-button pull-right"><button type="button" id="cancelbtn" class="btn btn-danger">Cancel</button></li>
+				<li class="nav-button pull-right"><button type="button" id="cancelbtn" class="btn btn-danger">Kill</button></li>
 				<li class="nav-button pull-right"><button type="button" id="retrybtn" class="btn btn-success">Retry Failed</button></li>
 				<li class="nav-button pull-right"><button type="button" id="executebtn" class="btn btn-success">Prepare Execution</button></li>
 			</ul>
diff --git a/src/less/azkaban-graph.less b/src/less/azkaban-graph.less
index 5a5d213..93fb36c 100644
--- a/src/less/azkaban-graph.less
+++ b/src/less/azkaban-graph.less
@@ -80,11 +80,20 @@
 }
 
 .KILLED > g > rect {
+	fill: #d2322d;
+	stroke: #d2322d;
+}
+
+.KILLED > g > text {
+	fill: #FFF;
+}
+
+.CANCELLED > g > rect {
 	fill: #FF9999;
 	stroke: #FF9999;
 }
 
-.KILLED > g > text {
+.CANCELLED > g > text {
 	fill: #FFF;
 }
 

src/less/flow.less 20(+17 -3)

diff --git a/src/less/flow.less b/src/less/flow.less
index 5e60909..b8dde8e 100644
--- a/src/less/flow.less
+++ b/src/less/flow.less
@@ -45,6 +45,10 @@
   &.FAILED {
     background-color: #d9534f;
   }
+  
+  &.KILLED {
+    background-color: #d9534f;
+  }
 
   &.RUNNING {
     background-color: #3398cc;	
@@ -61,7 +65,7 @@
     background-color: #009fc9;
   }
   
-  &.KILLED {
+  &.CANCELLED {
     background-color: #ff9999;
   }
 }
@@ -91,6 +95,10 @@ td {
     &.FAILED {
       background-color: #d9534f;
     }
+    
+    &.KILLED {
+      background-color: #d9534f;
+    }
 
     &.PAUSED {
       background-color: #c82123;
@@ -114,7 +122,7 @@ td {
       background-color: #aaa;	
     }
 
-    &.KILLED {
+    &.CANCELLED {
       background-color: #ff9999;
     }
   }
@@ -139,7 +147,8 @@ td {
 
   &.FAILED,
   &.FAILED_FINISHING,
-  &.KILLED {
+  &.KILLED,
+  &.CANCELLED {
     color: #cc0000;
   }
 }
@@ -220,6 +229,11 @@ li.tree-list-item {
       background-position: 0px 0px;
     }
     
+    &.CANCELLED .icon {
+      background-position: 0px 0px;
+      opacity: 0.5;
+    }
+    
     &.FAILED_FINISHING .icon {
       background-position: 0px 0px;
     }
diff --git a/src/web/js/azkaban/util/flow-loader.js b/src/web/js/azkaban/util/flow-loader.js
index ee5ed8b..b68dfb5 100644
--- a/src/web/js/azkaban/util/flow-loader.js
+++ b/src/web/js/azkaban/util/flow-loader.js
@@ -14,20 +14,6 @@
  * the License.
  */
 
-var statusStringMap = {
-	"FAILED": "Failed",
-	"SUCCEEDED": "Success",
-	"FAILED_FINISHING": "Running w/Failure",
-	"RUNNING": "Running",
-	"WAITING": "Waiting",
-	"KILLED": "Killed",
-	"DISABLED": "Disabled",
-	"READY": "Ready",
-	"UNKNOWN": "Unknown",
-	"QUEUED": "Queued",
-	"SKIPPED": "Skipped"
-};
-
 var extendedViewPanels = {};
 var extendedDataModels = {};
 var openJobDisplayCallback = function(nodeId, flowId, evt) {
diff --git a/src/web/js/azkaban/util/job-status.js b/src/web/js/azkaban/util/job-status.js
index ee03ae6..deef88e 100644
--- a/src/web/js/azkaban/util/job-status.js
+++ b/src/web/js/azkaban/util/job-status.js
@@ -14,8 +14,9 @@
  * the License.
  */
 
-var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN", "PAUSED", "SKIPPED"];
+var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "CANCELLED", "UNKNOWN", "PAUSED", "SKIPPED", "QUEUED"];
 var statusStringMap = {
+	"QUEUED": "Queued",
 	"SKIPPED": "Skipped",
 	"PREPARING": "Preparing",
 	"FAILED": "Failed",
@@ -24,6 +25,7 @@ var statusStringMap = {
 	"RUNNING": "Running",
 	"WAITING": "Waiting",
 	"KILLED": "Killed",
+	"CANCELLED": "Cancelled",
 	"DISABLED": "Disabled",
 	"READY": "Ready",
 	"UNKNOWN": "Unknown",
diff --git a/src/web/js/azkaban/view/exflow.js b/src/web/js/azkaban/view/exflow.js
index c495584..f6ecc38 100644
--- a/src/web/js/azkaban/view/exflow.js
+++ b/src/web/js/azkaban/view/exflow.js
@@ -439,8 +439,8 @@ var updaterFunction = function() {
 
 		var data = graphModel.get("data");
 		if (data.status == "UNKNOWN" || 
-				data.status == "WAITING" || 
-				data.status == "PREPARING") {
+			data.status == "WAITING" || 
+			data.status == "PREPARING") {
 			setTimeout(function() {updaterFunction();}, 1000);
 		}
 		else if (data.status != "SUCCEEDED" && data.status != "FAILED") {
diff --git a/src/web/js/azkaban/view/flow-execute-dialog.js b/src/web/js/azkaban/view/flow-execute-dialog.js
index e8db80c..6b93bac 100644
--- a/src/web/js/azkaban/view/flow-execute-dialog.js
+++ b/src/web/js/azkaban/view/flow-execute-dialog.js
@@ -495,14 +495,14 @@ var disableFinishedJobs = function(data) {
 		else if (node.status == "SUCCEEDED" || node.status=="RUNNING") {
 			node.disabled = true;
 		}
-		else if (node.status == "KILLED") {
+		else if (node.status == "CANCELLED") {
 			node.disabled = false;
 			node.status="READY";
 		}
 		else {
 			node.disabled = false;
-			if (node.flowData) {
-				disableFinishedJobs(node.flowData);
+			if (node.type == "flow") {
+				disableFinishedJobs(node);
 			}
 		}
 	}
@@ -526,8 +526,8 @@ var recurseTree = function(data, disabled, recurse) {
 		var node = data.nodes[i];
 		node.disabled = disabled;
 		
-		if (node.flowData && recurse) {
-			recurseTree(node.flowData, disabled);
+		if (node.type == "flow" && recurse) {
+			recurseTree(node, disabled);
 		}
 	}
 }
@@ -583,8 +583,8 @@ var gatherDisabledNodes = function(data) {
 			disabled.push(node.id);
 		}
 		else {
-			if (node.flowData) {
-				var array = gatherDisabledNodes(node.flowData);
+			if (node.type == "flow") {
+				var array = gatherDisabledNodes(node);
 				if (array && array.length > 0) {
 					disabled.push({id: node.id, children: array});
 				}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index c0f8025..21b43f1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -73,7 +73,7 @@ public class FlowRunnerTest {
 		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		runner.run();
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
@@ -115,7 +115,7 @@ public class FlowRunnerTest {
 		
 		FlowRunner runner = createFlowRunner(exFlow, loader, eventCollector);
 
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Assert.assertTrue(exFlow.getStatus() == Status.READY);
 		runner.run();
 
@@ -156,19 +156,19 @@ public class FlowRunnerTest {
 		
 		runner.run();
 		ExecutableFlow exFlow = runner.getExecutableFlow();
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Assert.assertTrue("Flow status " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
 		
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
-		testStatus(exFlow, "job3", Status.KILLED);
-		testStatus(exFlow, "job4", Status.KILLED);
-		testStatus(exFlow, "job5", Status.KILLED);
+		testStatus(exFlow, "job3", Status.CANCELLED);
+		testStatus(exFlow, "job4", Status.CANCELLED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
 		testStatus(exFlow, "job6", Status.SUCCEEDED);
-		testStatus(exFlow, "job7", Status.KILLED);
-		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job9", Status.KILLED);
-		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job7", Status.CANCELLED);
+		testStatus(exFlow, "job8", Status.CANCELLED);
+		testStatus(exFlow, "job9", Status.CANCELLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
 
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -195,7 +195,7 @@ public class FlowRunnerTest {
 		runner.run();
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		
-		Assert.assertTrue(runner.isCancelled());
+		Assert.assertTrue(runner.isKilled());
 		
 		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
 		
@@ -209,14 +209,14 @@ public class FlowRunnerTest {
 
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
-		testStatus(exFlow, "job3", Status.KILLED);
-		testStatus(exFlow, "job4", Status.KILLED);
-		testStatus(exFlow, "job5", Status.KILLED);
-		testStatus(exFlow, "job6", Status.FAILED);
-		testStatus(exFlow, "job7", Status.KILLED);
-		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job9", Status.KILLED);
-		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job3", Status.CANCELLED);
+		testStatus(exFlow, "job4", Status.CANCELLED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
+		testStatus(exFlow, "job6", Status.KILLED);
+		testStatus(exFlow, "job7", Status.CANCELLED);
+		testStatus(exFlow, "job8", Status.CANCELLED);
+		testStatus(exFlow, "job9", Status.CANCELLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -246,20 +246,19 @@ public class FlowRunnerTest {
 			try {
 				wait(500);
 			} catch(InterruptedException e) {
-				
 			}
 		}
 		
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
 		testStatus(exFlow, "job3", Status.SUCCEEDED);
-		testStatus(exFlow, "job4", Status.KILLED);
-		testStatus(exFlow, "job5", Status.KILLED);
-		testStatus(exFlow, "job6", Status.KILLED);
+		testStatus(exFlow, "job4", Status.CANCELLED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
+		testStatus(exFlow, "job6", Status.CANCELLED);
 		testStatus(exFlow, "job7", Status.SUCCEEDED);
 		testStatus(exFlow, "job8", Status.SUCCEEDED);
 		testStatus(exFlow, "job9", Status.SUCCEEDED);
-		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -278,7 +277,7 @@ public class FlowRunnerTest {
 		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Thread thread = new Thread(runner);
 		thread.start();
 
@@ -290,8 +289,8 @@ public class FlowRunnerTest {
 				e.printStackTrace();
 			}
 			
-			runner.cancel("me");
-			Assert.assertTrue(runner.isCancelled());
+			runner.kill("me");
+			Assert.assertTrue(runner.isKilled());
 		}
 		
 
@@ -307,15 +306,15 @@ public class FlowRunnerTest {
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2", Status.SUCCEEDED);
-		testStatus(exFlow, "job5", Status.KILLED);
-		testStatus(exFlow, "job7", Status.KILLED);
-		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job10", Status.KILLED);
-		testStatus(exFlow, "job3", Status.FAILED);
-		testStatus(exFlow, "job4", Status.FAILED);
-		testStatus(exFlow, "job6", Status.FAILED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
+		testStatus(exFlow, "job7", Status.CANCELLED);
+		testStatus(exFlow, "job8", Status.CANCELLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
+		testStatus(exFlow, "job3", Status.KILLED);
+		testStatus(exFlow, "job4", Status.KILLED);
+		testStatus(exFlow, "job6", Status.KILLED);
 		
-		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index ada74b4..e5fa39f 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -319,21 +319,21 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
 		expectedStateMap.put("joba", Status.FAILED);
 		expectedStateMap.put("joba1", Status.RUNNING);
-		expectedStateMap.put("jobb", Status.KILLED);
-		expectedStateMap.put("jobc", Status.KILLED);
-		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobb", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.CANCELLED);
+		expectedStateMap.put("jobd", Status.CANCELLED);
 		expectedStateMap.put("jobd:innerJobA", Status.READY);
 		expectedStateMap.put("jobd:innerFlow2", Status.READY);
 		expectedStateMap.put("jobb:innerJobA", Status.READY);
 		expectedStateMap.put("jobb:innerFlow", Status.READY);
-		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 
 		// 3. jobb:Inner completes
 		/// innerJobA completes
 		InteractiveTestJob.getTestJob("joba1").succeedJob();
 		pause(250);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		Assert.assertFalse(thread.isAlive());
 	}
@@ -379,10 +379,10 @@ public class FlowRunnerTest2 {
 		pause(250);
 		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-		expectedStateMap.put("jobb:innerJobB", Status.KILLED);
-		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
 		expectedStateMap.put("jobb", Status.KILLED);
 		expectedStateMap.put("jobd", Status.KILLED);
 		compareStates(expectedStateMap, nodeMap);
@@ -391,8 +391,8 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobc").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobc", Status.SUCCEEDED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		
@@ -401,7 +401,7 @@ public class FlowRunnerTest2 {
 	
 	@Test
 	public void testNormalFailure3() throws Exception {
-		// Test propagation of KILLED status to embedded flows different branch
+		// Test propagation of CANCELLED status to embedded flows different branch
 		EventCollectorListener eventCollector = new EventCollectorListener();
 		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
 		ExecutableFlow flow = runner.getExecutableFlow();
@@ -449,10 +449,10 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
 		expectedStateMap.put("jobd", Status.KILLED);
 		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
 		expectedStateMap.put("jobb", Status.FAILED);
 		compareStates(expectedStateMap, nodeMap);
 
@@ -460,8 +460,8 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobc").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobc", Status.SUCCEEDED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		
@@ -521,7 +521,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
 		expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
 		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 
 		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
@@ -533,8 +533,8 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobc").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobc", Status.SUCCEEDED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		Assert.assertFalse(thread.isAlive());
@@ -583,14 +583,14 @@ public class FlowRunnerTest2 {
 		pause(250);
 		expectedStateMap.put("jobb", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.FAILED);
-		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		
 		Assert.assertFalse(thread.isAlive());
@@ -640,7 +640,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
 		expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
 		expectedStateMap.put("jobd", Status.KILLED);
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
@@ -739,21 +739,21 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
 		compareStates(expectedStateMap, nodeMap);
 		
-		runner.cancel("me");
+		runner.kill("me");
 		pause(250);
 		
-		expectedStateMap.put("jobb", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.FAILED);
-		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		
-		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertEquals(Status.KILLED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertFalse(thread.isAlive());
 	}
@@ -804,18 +804,18 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		
-		runner.cancel("me");
+		runner.kill("me");
 		pause(1000);
 		
 		expectedStateMap.put("jobb", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.FAILED);
-		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 3201767..69d6296 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -108,7 +108,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		try {
@@ -180,7 +180,7 @@ public class JobRunnerTest {
 		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(runner.getLogFilePath() == null);
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
 		}
@@ -208,7 +208,7 @@ public class JobRunnerTest {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			}
-			runner.cancel();
+			runner.kill();
 			try {
 				wait(500);
 			} catch (InterruptedException e) {
@@ -218,7 +218,7 @@ public class JobRunnerTest {
 		}
 		
 		Assert.assertTrue(runner.getStatus() == node.getStatus());
-		Assert.assertTrue("Status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+		Assert.assertTrue("Status is " + node.getStatus(), node.getStatus() == Status.KILLED);
 		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
 		// Give it 10 ms to fail.
 		Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
@@ -230,7 +230,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		Assert.assertTrue(runner.isCancelled());
+		Assert.assertTrue(runner.isKilled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
 		}
@@ -266,7 +266,7 @@ public class JobRunnerTest {
 		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps != null);
 		Assert.assertTrue(logFile.exists());
-		Assert.assertFalse(runner.isCancelled());
+		Assert.assertFalse(runner.isKilled());
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		Assert.assertTrue(eventCollector.checkOrdering());
@@ -300,7 +300,7 @@ public class JobRunnerTest {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			}
-			runner.cancel();
+			runner.kill();
 			try {
 				wait(500);
 			} catch (InterruptedException e) {
@@ -312,12 +312,12 @@ public class JobRunnerTest {
 		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
 		
 		Assert.assertTrue(runner.getStatus() == node.getStatus());
-		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.KILLED);
 		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
 		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
 		Assert.assertTrue(node.getStartTime() - startTime >= 2000);
 		Assert.assertTrue(node.getStartTime() - startTime <= 5000);
-		Assert.assertTrue(runner.isCancelled());
+		Assert.assertTrue(runner.isKilled());
 		
 		File logFile = new File(runner.getLogFilePath());
 		Props outputProps = runner.getNode().getOutputProps();