azkaban-aplcache

Details

build.properties 2(+1 -1)

diff --git a/build.properties b/build.properties
index aa6322d..745ac17 100644
--- a/build.properties
+++ b/build.properties
@@ -1,3 +1,3 @@
 name=azkaban
-version=2.2
+version=2.5
 spec.file=azkaban.spec
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index f5f07f4..77c9016 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -114,7 +114,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowPaused = false;
 	private boolean flowFailed = false;
 	private boolean flowFinished = false;
-	private boolean flowCancelled = false;
+	private boolean flowKilled = false;
 	
 	// The following is state that will trigger a retry of all failed jobs
 	private boolean retryFailedJobs = false;
@@ -392,7 +392,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		logger.info("Restarting all failed jobs");
 		
 		this.retryFailedJobs = false;
-		this.flowCancelled = false;
+		this.flowKilled = false;
 		this.flowFailed = false;
 		this.flow.setStatus(Status.RUNNING);
 		
@@ -400,7 +400,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		resetFailedState(this.flow, retryJobs);
 		
 		for (ExecutableNode node: retryJobs) {
-			if(node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+			if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
 				runReadyJob(node);
 			}
 			else if (node.getStatus() == Status.SUCCEEDED){
@@ -415,7 +415,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		updateFlow();
 	}
-	
+
 	private boolean progressGraph() throws IOException {
 		finishedNodes.swap();
 
@@ -433,7 +433,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (!retryJobIfPossible(node)) {
 					propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
 					if (failureAction == FailureAction.CANCEL_ALL) {
-						this.cancel();
+						this.kill();
 					}
 					this.flowFailed = true;
 				}
@@ -495,9 +495,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			return false;
 		}
 		
-		if (nextNodeStatus == Status.KILLED) {
-			logger.info("Killing '" + node.getNestedId() + "' due to prior errors.");
-			node.killNode(System.currentTimeMillis());
+		if (nextNodeStatus == Status.CANCELLED) {
+			logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
+			node.cancelNode(System.currentTimeMillis());
 			finishExecutableNode(node);
 		}
 		else if (nextNodeStatus == Status.SKIPPED) {
@@ -570,10 +570,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 		boolean succeeded = true;
 		Props previousOutput = null;
 		
-		for(String end: flow.getEndNodes()) {
+		for (String end: flow.getEndNodes()) {
 			ExecutableNode node = flow.getExecutableNode(end);
 
-			if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED) {
+			if (node.getStatus() == Status.KILLED || 
+				node.getStatus() == Status.FAILED || 
+				node.getStatus() == Status.CANCELLED) {
 				succeeded = false;
 			}
 			
@@ -600,6 +602,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			break;
 		case FAILED:
 		case KILLED:
+		case CANCELLED:
 		case FAILED_SUCCEEDED:
 			logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
 			break;
@@ -675,14 +678,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// load the override props if any
 		try {
 			props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
-		}
+		} 
 		catch(ProjectManagerException e) {
 			e.printStackTrace();
 			logger.error("Error loading job override property for job " + node.getId());
 		}
 		
 		File path = new File(execDir, source);
-		if(props == null) {
+		if (props == null) {
 			// if no override prop, load the original one on disk
 			try {
 				props = new Props(null, path);				
@@ -741,7 +744,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (!Status.isStatusFinished(depStatus)) {
 				return null;
 			}
-			else if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+			else if (depStatus == Status.FAILED || 
+					depStatus == Status.CANCELLED || 
+					depStatus == Status.KILLED) {
 				// We propagate failures as KILLED states.
 				shouldKill = true;
 			}
@@ -755,10 +760,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// If the flow has failed, and we want to finish only the currently running jobs, we just
 		// kill everything else. We also kill, if the flow has been cancelled.
 		if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
-			return Status.KILLED;
+			return Status.CANCELLED;
 		}
-		else if (shouldKill || isCancelled()) {
-			return Status.KILLED;
+		else if (shouldKill || isKilled()) {
+			return Status.CANCELLED;
 		}
 		
 		// All good to go, ready to run.
@@ -827,7 +832,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (flowFailed) {
 					flow.setStatus(Status.FAILED_FINISHING);
 				}
-				else if (flowCancelled) {
+				else if (flowKilled) {
 					flow.setStatus(Status.KILLED);
 				}
 				else {
@@ -839,20 +844,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	public void cancel(String user) {
+	public void kill(String user) {
 		synchronized(mainSyncObj) {
-			logger.info("Flow cancelled by " + user);
-			cancel();
+			logger.info("Flow killed by " + user);
+			kill();
 			updateFlow();
 		}
 		interrupt();
 	}
 	
-	private void cancel() {
+	private void kill() {
 		synchronized(mainSyncObj) {
-			logger.info("Cancel has been called on flow " + execId);
+			logger.info("Kill has been called on flow " + execId);
 			flowPaused = false;
-			flowCancelled = true;
+			flowKilled = true;
 			
 			if (watcher != null) {
 				logger.info("Watcher is attached. Stopping watcher.");
@@ -860,9 +865,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 				logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
 			}
 			
-			logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
+			logger.info("Killing " + activeJobRunners.size() + " jobs.");
 			for (JobRunner runner : activeJobRunners) {
-				runner.cancel();
+				runner.kill();
 			}
 		}
 	}
@@ -907,25 +912,32 @@ public class FlowRunner extends EventHandler implements Runnable {
 			else if (node instanceof ExecutableFlowBase) {
 				ExecutableFlowBase base = (ExecutableFlowBase)node;
 				switch (base.getStatus()) {
+				case CANCELLED:
+					node.setStatus(Status.READY);
+					node.setEndTime(-1);
+					node.setStartTime(-1);
+					node.setUpdateTime(currentTime);
+					// Break out of the switch. We'll reset the flow just like a normal node
+					break;
 				case KILLED:
 				case FAILED:
 				case FAILED_FINISHING:
-					resetFailedState(base, nodesToRetry);	
+					resetFailedState(base, nodesToRetry);
+					continue;
 				default:
-				}
-				
-				if (base.getStatus() != Status.KILLED) {
+					// Continue the while loop. If the job is in a finished state that's not
+					// a failure, we don't want to reset the job.
 					continue;
 				}
 			}
-			else if (node.getStatus() == Status.KILLED) {
+			else if (node.getStatus() == Status.CANCELLED) {
 				// Not a flow, but killed
 				node.setStatus(Status.READY);
 				node.setStartTime(-1);
 				node.setEndTime(-1);
 				node.setUpdateTime(currentTime);
 			}
-			else if(node.getStatus() == Status.FAILED) {
+			else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
 				node.resetForRetry();
 				nodesToRetry.add(node);
 			}
@@ -934,12 +946,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 				logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
 			}
 			
-			for(String inId: node.getInNodes()) {
+			for (String inId: node.getInNodes()) {
 				ExecutableNode nodeUp = flow.getExecutableNode(inId);
 				queue.add(nodeUp);
 			}
 		}
 		
+		// At this point, the following code will reset the flow
 		Status oldFlowState = flow.getStatus();
 		if (maxStartTime == -1) {
 			// Nothing has run inside the flow, so we assume the flow hasn't even started running yet.
@@ -992,8 +1005,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 
-	public boolean isCancelled() {
-		return flowCancelled;
+	public boolean isKilled() {
+		return flowKilled;
 	}
 	
 	public ExecutableFlow getExecutableFlow() {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 3fbdb15..f886e64 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -463,7 +463,7 @@ public class FlowRunnerManager implements EventListener {
 			throw new ExecutorManagerException("Execution " + execId + " is not running.");
 		}
 		
-		runner.cancel(user);
+		runner.kill(user);
 	}
 	
 	public void pauseFlow(int execId, String user) throws ExecutorManagerException {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 3de3d23..cbdcecc 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -87,7 +87,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	private int jobLogBackupIndex;
 
 	private long delayStartMs = 0;
-	private boolean cancelled = false;
+	private boolean killed = false;
 	private BlockingStatus currentBlockStatus = null;
 	
 	public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
@@ -259,15 +259,15 @@ public class JobRunner extends EventHandler implements Runnable {
 		boolean quickFinish = false;
 		long time = System.currentTimeMillis();
 		
-		if (this.isCancelled() || Status.isStatusFinished(nodeStatus)) {
+		if (this.isKilled() || Status.isStatusFinished(nodeStatus)) {
 			quickFinish = true;
 		}
 		else if (nodeStatus == Status.DISABLED) {
 			changeStatus(Status.SKIPPED, time);
 			quickFinish = true;
 		} 
-		else if (this.cancelled) {
-			changeStatus(Status.FAILED, time);
+		else if (this.killed) {
+			changeStatus(Status.KILLED, time);
 			quickFinish = true;
 		} 
 		
@@ -286,7 +286,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	 * If pipelining is set, will block on another flow's jobs.
 	 */
 	private boolean blockOnPipeLine() {
-		if (this.isCancelled()) {
+		if (this.isKilled()) {
 			return true;
 		}
 		
@@ -309,8 +309,8 @@ public class JobRunner extends EventHandler implements Runnable {
 					logger.info("Waiting on pipelined job " + bStatus.getJobId());
 					currentBlockStatus = bStatus;
 					bStatus.blockOnFinishedStatus();
-					if (this.isCancelled()) {
-						logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+					if (this.isKilled()) {
+						logger.info("Job was killed while waiting on pipeline. Quiting.");
 						return true;
 					}
 					else {
@@ -325,7 +325,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	}
 	
 	private boolean delayExecution() {
-		if (this.isCancelled()) {
+		if (this.isKilled()) {
 			return true;
 		}
 		
@@ -342,8 +342,8 @@ public class JobRunner extends EventHandler implements Runnable {
 				}
 			}
 			
-			if (this.isCancelled()) {
-				logger.info("Job was cancelled while in delay. Quiting.");
+			if (this.isKilled()) {
+				logger.info("Job was killed while in delay. Quiting.");
 				return true;
 			}
 		}
@@ -420,7 +420,7 @@ public class JobRunner extends EventHandler implements Runnable {
 
 		// Start the node.
 		node.setStartTime(System.currentTimeMillis());
-		if (!errorFound && !isCancelled()) {
+		if (!errorFound && !isKilled()) {
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
@@ -443,10 +443,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 		node.setEndTime(System.currentTimeMillis());
 
-		if (isCancelled()) {
-			changeStatus(Status.FAILED);
+		if (isKilled()) {
+			changeStatus(Status.KILLED);
 		}
-		logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
+		logInfo("Finishing job " + this.jobId + " at " + node.getEndTime() + " with status " + node.getStatus());
 		
 		fireEvent(Event.create(this, Type.JOB_FINISHED), false);
 		finalizeLogFile();
@@ -455,13 +455,13 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private boolean prepareJob() throws RuntimeException {
 		// Check pre conditions
-		if (props == null || cancelled) {
+		if (props == null || killed) {
 			logError("Failing job. The job properties don't exist");
 			return false;
 		}
 		
 		synchronized (syncObject) {
-			if (node.getStatus() == Status.FAILED || cancelled) {
+			if (node.getStatus() == Status.FAILED || killed) {
 				return false;
 			}
 
@@ -557,10 +557,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.fireEventListeners(event);
 	}
 	
-	public void cancel() {
+	public void kill() {
 		synchronized (syncObject) {
-			logError("Cancel has been called.");
-			this.cancelled = true;
+			logError("Kill has been called.");
+			this.killed = true;
 			
 			BlockingStatus status = currentBlockStatus;
 			if (status != null) {
@@ -587,8 +587,8 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	public boolean isCancelled() {
-		return cancelled;
+	public boolean isKilled() {
+		return killed;
 	}
 	
 	public Status getStatus() {
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index bacad6c..d444ef6 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -389,15 +389,15 @@ public class ExecutableNode {
 		applyUpdateObject(wrapper);
 	}
 	
-	public void killNode(long killTime) {
+	public void cancelNode(long cancelTime) {
 		if (this.status == Status.DISABLED) {
-			skipNode(killTime);
+			skipNode(cancelTime);
 		}
 		else {
-			this.setStatus(Status.KILLED);
-			this.setStartTime(killTime);
-			this.setEndTime(killTime);
-			this.setUpdateTime(killTime);
+			this.setStatus(Status.CANCELLED);
+			this.setStartTime(cancelTime);
+			this.setEndTime(cancelTime);
+			this.setUpdateTime(cancelTime);
 		}
 	}
 	
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index fa45066..7643d2f 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -17,7 +17,19 @@
 package azkaban.executor;
 
 public enum Status {
-	READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110), FAILED_SUCCEEDED(120);
+	READY(10), 
+	PREPARING(20), 
+	RUNNING(30), 
+	PAUSED(40), 
+	SUCCEEDED(50), 
+	KILLED(60), 
+	FAILED(70), 
+	FAILED_FINISHING(80), 
+	SKIPPED(90), 
+	DISABLED(100), 
+	QUEUED(110), 
+	FAILED_SUCCEEDED(120),
+	CANCELLED(130);
 	
 	private int numVal;
 
@@ -55,6 +67,8 @@ public enum Status {
 			return QUEUED;
 		case 120:
 			return FAILED_SUCCEEDED;
+		case 130:
+			return CANCELLED;
 		default:
 			return READY;
 		}
@@ -67,6 +81,7 @@ public enum Status {
 		case SUCCEEDED:
 		case SKIPPED:
 		case FAILED_SUCCEEDED:
+		case CANCELLED:
 			return true;
 		default:
 			return false;
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 66bc178..7648a05 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -30,9 +30,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	public TriggerBasedScheduleLoader(TriggerManager triggerManager, String triggerSource) {
 		this.triggerManager = triggerManager;
 		this.triggerSource = triggerSource;
-//		// need to init the action types and condition checker types 
-//		ExecuteFlowAction.setExecutorManager(executorManager);
-//		ExecuteFlowAction.setProjectManager(projectManager);
 	}
 	
 	private Trigger scheduleToTrigger(Schedule s) {
@@ -52,15 +49,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
 		ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
 		actions.add(executeAct);
-//		List<SlaOption> slaOptions = s.getSlaOptions();
-//		if(slaOptions != null && slaOptions.size() > 0) {
-//			// insert a trigger to keep watching that execution
-//			for(SlaOption sla : slaOptions) {
-//				// need to create triggers for each sla
-//				SlaChecker slaChecker = new SlaChecker("slaChecker", sla, executeAct.getId());
-//				
-//			}
-//		}
 		
 		return actions;
 	}
@@ -178,9 +166,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	@Override
 	public void updateNextExecTime(Schedule s)
 			throws ScheduleManagerException {
-//		Trigger t = triggersLocalCopy.get(s.getScheduleId());
-//		BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
-//		s.setNextExecTime(ck.getNextCheckTime().getMillis());
+
 	}
 
 	@Override
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index 1f6eb00..f340497 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -166,7 +166,7 @@ public class SlaAlertAction implements TriggerAction{
 
 	@Override
 	public String getDescription() {
-		return type + " with " + slaOption.toString();
+		return type + " for " + execId + " with " + slaOption.toString();
 	}
 
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 594f759..6f88fd6 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,7 +53,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 	
 	private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
 	
-	private Object syncObj = new Object();
+	private final Object syncObj = new Object();
 	
 	private String scannerStage = "";
 	
@@ -85,7 +85,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 	@Override
 	public void start() throws TriggerManagerException{
 		
-		try{
+		try {
 			// expect loader to return valid triggers
 			List<Trigger> triggers = triggerLoader.loadTriggers();
 			for(Trigger t : triggers) {
@@ -185,7 +185,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 			triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
 			justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
 			this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
-			this.scannerInterval = scannerInterval;;
+			this.scannerInterval = scannerInterval;
 		}
 
 		public void shutdown() {
@@ -216,12 +216,12 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 			//while(stillAlive.get()) {
 			while(!shutdown) {
 				synchronized (syncObj) {
-					try{
+					try {
 						lastRunnerThreadCheckTime = System.currentTimeMillis();
 						
 						scannerStage = "Ready to start a new scan cycle at " + lastRunnerThreadCheckTime;
 						
-						try{
+						try {
 							checkAllTriggers();
 							justFinishedFlows.clear();
 						} catch(Exception e) {
@@ -231,15 +231,15 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 							t.printStackTrace();
 							logger.error(t.getMessage());
 						}
-						
+					
 						scannerStage = "Done flipping all triggers.";
 						
 						runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
-
+	
 						if(runnerThreadIdleTime < 0) {
 							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
 						} else {
-							wait(runnerThreadIdleTime);
+							syncObj.wait(runnerThreadIdleTime);
 						}
 					} catch(InterruptedException e) {
 						logger.info("Interrupted. Probably to shut down.");
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5454c99..5582114 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -106,7 +106,7 @@
 				<li id="statsViewLink"><a href="#stats">Stats</a></li>
 				<li class="nav-button pull-right"><button type="button" id="pausebtn" class="btn btn-primary">Pause</button></li>
 				<li class="nav-button pull-right"><button type="button" id="resumebtn" class="btn btn-primary">Resume</button></li>
-				<li class="nav-button pull-right"><button type="button" id="cancelbtn" class="btn btn-danger">Cancel</button></li>
+				<li class="nav-button pull-right"><button type="button" id="cancelbtn" class="btn btn-danger">Kill</button></li>
 				<li class="nav-button pull-right"><button type="button" id="retrybtn" class="btn btn-success">Retry Failed</button></li>
 				<li class="nav-button pull-right"><button type="button" id="executebtn" class="btn btn-success">Prepare Execution</button></li>
 			</ul>
diff --git a/src/less/azkaban-graph.less b/src/less/azkaban-graph.less
index 5a5d213..93fb36c 100644
--- a/src/less/azkaban-graph.less
+++ b/src/less/azkaban-graph.less
@@ -80,11 +80,20 @@
 }
 
 .KILLED > g > rect {
+	fill: #d2322d;
+	stroke: #d2322d;
+}
+
+.KILLED > g > text {
+	fill: #FFF;
+}
+
+.CANCELLED > g > rect {
 	fill: #FF9999;
 	stroke: #FF9999;
 }
 
-.KILLED > g > text {
+.CANCELLED > g > text {
 	fill: #FFF;
 }
 

src/less/flow.less 20(+17 -3)

diff --git a/src/less/flow.less b/src/less/flow.less
index 5e60909..b8dde8e 100644
--- a/src/less/flow.less
+++ b/src/less/flow.less
@@ -45,6 +45,10 @@
   &.FAILED {
     background-color: #d9534f;
   }
+  
+  &.KILLED {
+    background-color: #d9534f;
+  }
 
   &.RUNNING {
     background-color: #3398cc;	
@@ -61,7 +65,7 @@
     background-color: #009fc9;
   }
   
-  &.KILLED {
+  &.CANCELLED {
     background-color: #ff9999;
   }
 }
@@ -91,6 +95,10 @@ td {
     &.FAILED {
       background-color: #d9534f;
     }
+    
+    &.KILLED {
+      background-color: #d9534f;
+    }
 
     &.PAUSED {
       background-color: #c82123;
@@ -114,7 +122,7 @@ td {
       background-color: #aaa;	
     }
 
-    &.KILLED {
+    &.CANCELLED {
       background-color: #ff9999;
     }
   }
@@ -139,7 +147,8 @@ td {
 
   &.FAILED,
   &.FAILED_FINISHING,
-  &.KILLED {
+  &.KILLED,
+  &.CANCELLED {
     color: #cc0000;
   }
 }
@@ -220,6 +229,11 @@ li.tree-list-item {
       background-position: 0px 0px;
     }
     
+    &.CANCELLED .icon {
+      background-position: 0px 0px;
+      opacity: 0.5;
+    }
+    
     &.FAILED_FINISHING .icon {
       background-position: 0px 0px;
     }
diff --git a/src/web/js/azkaban/util/flow-loader.js b/src/web/js/azkaban/util/flow-loader.js
index ee5ed8b..b68dfb5 100644
--- a/src/web/js/azkaban/util/flow-loader.js
+++ b/src/web/js/azkaban/util/flow-loader.js
@@ -14,20 +14,6 @@
  * the License.
  */
 
-var statusStringMap = {
-	"FAILED": "Failed",
-	"SUCCEEDED": "Success",
-	"FAILED_FINISHING": "Running w/Failure",
-	"RUNNING": "Running",
-	"WAITING": "Waiting",
-	"KILLED": "Killed",
-	"DISABLED": "Disabled",
-	"READY": "Ready",
-	"UNKNOWN": "Unknown",
-	"QUEUED": "Queued",
-	"SKIPPED": "Skipped"
-};
-
 var extendedViewPanels = {};
 var extendedDataModels = {};
 var openJobDisplayCallback = function(nodeId, flowId, evt) {
diff --git a/src/web/js/azkaban/util/job-status.js b/src/web/js/azkaban/util/job-status.js
index ee03ae6..deef88e 100644
--- a/src/web/js/azkaban/util/job-status.js
+++ b/src/web/js/azkaban/util/job-status.js
@@ -14,8 +14,9 @@
  * the License.
  */
 
-var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN", "PAUSED", "SKIPPED"];
+var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "CANCELLED", "UNKNOWN", "PAUSED", "SKIPPED", "QUEUED"];
 var statusStringMap = {
+	"QUEUED": "Queued",
 	"SKIPPED": "Skipped",
 	"PREPARING": "Preparing",
 	"FAILED": "Failed",
@@ -24,6 +25,7 @@ var statusStringMap = {
 	"RUNNING": "Running",
 	"WAITING": "Waiting",
 	"KILLED": "Killed",
+	"CANCELLED": "Cancelled",
 	"DISABLED": "Disabled",
 	"READY": "Ready",
 	"UNKNOWN": "Unknown",
diff --git a/src/web/js/azkaban/view/exflow.js b/src/web/js/azkaban/view/exflow.js
index c495584..f6ecc38 100644
--- a/src/web/js/azkaban/view/exflow.js
+++ b/src/web/js/azkaban/view/exflow.js
@@ -439,8 +439,8 @@ var updaterFunction = function() {
 
 		var data = graphModel.get("data");
 		if (data.status == "UNKNOWN" || 
-				data.status == "WAITING" || 
-				data.status == "PREPARING") {
+			data.status == "WAITING" || 
+			data.status == "PREPARING") {
 			setTimeout(function() {updaterFunction();}, 1000);
 		}
 		else if (data.status != "SUCCEEDED" && data.status != "FAILED") {
diff --git a/src/web/js/azkaban/view/flow-execute-dialog.js b/src/web/js/azkaban/view/flow-execute-dialog.js
index e8db80c..6b93bac 100644
--- a/src/web/js/azkaban/view/flow-execute-dialog.js
+++ b/src/web/js/azkaban/view/flow-execute-dialog.js
@@ -495,14 +495,14 @@ var disableFinishedJobs = function(data) {
 		else if (node.status == "SUCCEEDED" || node.status=="RUNNING") {
 			node.disabled = true;
 		}
-		else if (node.status == "KILLED") {
+		else if (node.status == "CANCELLED") {
 			node.disabled = false;
 			node.status="READY";
 		}
 		else {
 			node.disabled = false;
-			if (node.flowData) {
-				disableFinishedJobs(node.flowData);
+			if (node.type == "flow") {
+				disableFinishedJobs(node);
 			}
 		}
 	}
@@ -526,8 +526,8 @@ var recurseTree = function(data, disabled, recurse) {
 		var node = data.nodes[i];
 		node.disabled = disabled;
 		
-		if (node.flowData && recurse) {
-			recurseTree(node.flowData, disabled);
+		if (node.type == "flow" && recurse) {
+			recurseTree(node, disabled);
 		}
 	}
 }
@@ -583,8 +583,8 @@ var gatherDisabledNodes = function(data) {
 			disabled.push(node.id);
 		}
 		else {
-			if (node.flowData) {
-				var array = gatherDisabledNodes(node.flowData);
+			if (node.type == "flow") {
+				var array = gatherDisabledNodes(node);
 				if (array && array.length > 0) {
 					disabled.push({id: node.id, children: array});
 				}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index c0f8025..21b43f1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -73,7 +73,7 @@ public class FlowRunnerTest {
 		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		runner.run();
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
@@ -115,7 +115,7 @@ public class FlowRunnerTest {
 		
 		FlowRunner runner = createFlowRunner(exFlow, loader, eventCollector);
 
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Assert.assertTrue(exFlow.getStatus() == Status.READY);
 		runner.run();
 
@@ -156,19 +156,19 @@ public class FlowRunnerTest {
 		
 		runner.run();
 		ExecutableFlow exFlow = runner.getExecutableFlow();
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Assert.assertTrue("Flow status " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
 		
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
-		testStatus(exFlow, "job3", Status.KILLED);
-		testStatus(exFlow, "job4", Status.KILLED);
-		testStatus(exFlow, "job5", Status.KILLED);
+		testStatus(exFlow, "job3", Status.CANCELLED);
+		testStatus(exFlow, "job4", Status.CANCELLED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
 		testStatus(exFlow, "job6", Status.SUCCEEDED);
-		testStatus(exFlow, "job7", Status.KILLED);
-		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job9", Status.KILLED);
-		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job7", Status.CANCELLED);
+		testStatus(exFlow, "job8", Status.CANCELLED);
+		testStatus(exFlow, "job9", Status.CANCELLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
 
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -195,7 +195,7 @@ public class FlowRunnerTest {
 		runner.run();
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		
-		Assert.assertTrue(runner.isCancelled());
+		Assert.assertTrue(runner.isKilled());
 		
 		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
 		
@@ -209,14 +209,14 @@ public class FlowRunnerTest {
 
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
-		testStatus(exFlow, "job3", Status.KILLED);
-		testStatus(exFlow, "job4", Status.KILLED);
-		testStatus(exFlow, "job5", Status.KILLED);
-		testStatus(exFlow, "job6", Status.FAILED);
-		testStatus(exFlow, "job7", Status.KILLED);
-		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job9", Status.KILLED);
-		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job3", Status.CANCELLED);
+		testStatus(exFlow, "job4", Status.CANCELLED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
+		testStatus(exFlow, "job6", Status.KILLED);
+		testStatus(exFlow, "job7", Status.CANCELLED);
+		testStatus(exFlow, "job8", Status.CANCELLED);
+		testStatus(exFlow, "job9", Status.CANCELLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -246,20 +246,19 @@ public class FlowRunnerTest {
 			try {
 				wait(500);
 			} catch(InterruptedException e) {
-				
 			}
 		}
 		
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
 		testStatus(exFlow, "job3", Status.SUCCEEDED);
-		testStatus(exFlow, "job4", Status.KILLED);
-		testStatus(exFlow, "job5", Status.KILLED);
-		testStatus(exFlow, "job6", Status.KILLED);
+		testStatus(exFlow, "job4", Status.CANCELLED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
+		testStatus(exFlow, "job6", Status.CANCELLED);
 		testStatus(exFlow, "job7", Status.SUCCEEDED);
 		testStatus(exFlow, "job8", Status.SUCCEEDED);
 		testStatus(exFlow, "job9", Status.SUCCEEDED);
-		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
@@ -278,7 +277,7 @@ public class FlowRunnerTest {
 		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Thread thread = new Thread(runner);
 		thread.start();
 
@@ -290,8 +289,8 @@ public class FlowRunnerTest {
 				e.printStackTrace();
 			}
 			
-			runner.cancel("me");
-			Assert.assertTrue(runner.isCancelled());
+			runner.kill("me");
+			Assert.assertTrue(runner.isKilled());
 		}
 		
 
@@ -307,15 +306,15 @@ public class FlowRunnerTest {
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2", Status.SUCCEEDED);
-		testStatus(exFlow, "job5", Status.KILLED);
-		testStatus(exFlow, "job7", Status.KILLED);
-		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job10", Status.KILLED);
-		testStatus(exFlow, "job3", Status.FAILED);
-		testStatus(exFlow, "job4", Status.FAILED);
-		testStatus(exFlow, "job6", Status.FAILED);
+		testStatus(exFlow, "job5", Status.CANCELLED);
+		testStatus(exFlow, "job7", Status.CANCELLED);
+		testStatus(exFlow, "job8", Status.CANCELLED);
+		testStatus(exFlow, "job10", Status.CANCELLED);
+		testStatus(exFlow, "job3", Status.KILLED);
+		testStatus(exFlow, "job4", Status.KILLED);
+		testStatus(exFlow, "job6", Status.KILLED);
 		
-		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index ada74b4..e5fa39f 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -319,21 +319,21 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
 		expectedStateMap.put("joba", Status.FAILED);
 		expectedStateMap.put("joba1", Status.RUNNING);
-		expectedStateMap.put("jobb", Status.KILLED);
-		expectedStateMap.put("jobc", Status.KILLED);
-		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobb", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.CANCELLED);
+		expectedStateMap.put("jobd", Status.CANCELLED);
 		expectedStateMap.put("jobd:innerJobA", Status.READY);
 		expectedStateMap.put("jobd:innerFlow2", Status.READY);
 		expectedStateMap.put("jobb:innerJobA", Status.READY);
 		expectedStateMap.put("jobb:innerFlow", Status.READY);
-		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 
 		// 3. jobb:Inner completes
 		/// innerJobA completes
 		InteractiveTestJob.getTestJob("joba1").succeedJob();
 		pause(250);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		Assert.assertFalse(thread.isAlive());
 	}
@@ -379,10 +379,10 @@ public class FlowRunnerTest2 {
 		pause(250);
 		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-		expectedStateMap.put("jobb:innerJobB", Status.KILLED);
-		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
 		expectedStateMap.put("jobb", Status.KILLED);
 		expectedStateMap.put("jobd", Status.KILLED);
 		compareStates(expectedStateMap, nodeMap);
@@ -391,8 +391,8 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobc").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobc", Status.SUCCEEDED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		
@@ -401,7 +401,7 @@ public class FlowRunnerTest2 {
 	
 	@Test
 	public void testNormalFailure3() throws Exception {
-		// Test propagation of KILLED status to embedded flows different branch
+		// Test propagation of CANCELLED status to embedded flows different branch
 		EventCollectorListener eventCollector = new EventCollectorListener();
 		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
 		ExecutableFlow flow = runner.getExecutableFlow();
@@ -449,10 +449,10 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
 		expectedStateMap.put("jobd", Status.KILLED);
 		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
 		expectedStateMap.put("jobb", Status.FAILED);
 		compareStates(expectedStateMap, nodeMap);
 
@@ -460,8 +460,8 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobc").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobc", Status.SUCCEEDED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		
@@ -521,7 +521,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
 		expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
 		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 
 		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
@@ -533,8 +533,8 @@ public class FlowRunnerTest2 {
 		InteractiveTestJob.getTestJob("jobc").succeedJob();
 		pause(250);
 		expectedStateMap.put("jobc", Status.SUCCEEDED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		Assert.assertFalse(thread.isAlive());
@@ -583,14 +583,14 @@ public class FlowRunnerTest2 {
 		pause(250);
 		expectedStateMap.put("jobb", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.FAILED);
-		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		compareStates(expectedStateMap, nodeMap);
 		
 		Assert.assertFalse(thread.isAlive());
@@ -640,7 +640,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
 		expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
 		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
 		expectedStateMap.put("jobd", Status.KILLED);
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
@@ -739,21 +739,21 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
 		compareStates(expectedStateMap, nodeMap);
 		
-		runner.cancel("me");
+		runner.kill("me");
 		pause(250);
 		
-		expectedStateMap.put("jobb", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.FAILED);
-		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		
-		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertEquals(Status.KILLED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertFalse(thread.isAlive());
 	}
@@ -804,18 +804,18 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		
-		runner.cancel("me");
+		runner.kill("me");
 		pause(1000);
 		
 		expectedStateMap.put("jobb", Status.FAILED);
-		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
-		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.FAILED);
-		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
-		expectedStateMap.put("jobe", Status.KILLED);
-		expectedStateMap.put("jobf", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
 		
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 3201767..69d6296 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -108,7 +108,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		try {
@@ -180,7 +180,7 @@ public class JobRunnerTest {
 		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(runner.getLogFilePath() == null);
-		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(!runner.isKilled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
 		}
@@ -208,7 +208,7 @@ public class JobRunnerTest {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			}
-			runner.cancel();
+			runner.kill();
 			try {
 				wait(500);
 			} catch (InterruptedException e) {
@@ -218,7 +218,7 @@ public class JobRunnerTest {
 		}
 		
 		Assert.assertTrue(runner.getStatus() == node.getStatus());
-		Assert.assertTrue("Status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+		Assert.assertTrue("Status is " + node.getStatus(), node.getStatus() == Status.KILLED);
 		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
 		// Give it 10 ms to fail.
 		Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
@@ -230,7 +230,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		Assert.assertTrue(runner.isCancelled());
+		Assert.assertTrue(runner.isKilled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
 		}
@@ -266,7 +266,7 @@ public class JobRunnerTest {
 		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps != null);
 		Assert.assertTrue(logFile.exists());
-		Assert.assertFalse(runner.isCancelled());
+		Assert.assertFalse(runner.isKilled());
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		Assert.assertTrue(eventCollector.checkOrdering());
@@ -300,7 +300,7 @@ public class JobRunnerTest {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			}
-			runner.cancel();
+			runner.kill();
 			try {
 				wait(500);
 			} catch (InterruptedException e) {
@@ -312,12 +312,12 @@ public class JobRunnerTest {
 		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
 		
 		Assert.assertTrue(runner.getStatus() == node.getStatus());
-		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.KILLED);
 		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
 		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
 		Assert.assertTrue(node.getStartTime() - startTime >= 2000);
 		Assert.assertTrue(node.getStartTime() - startTime <= 5000);
-		Assert.assertTrue(runner.isCancelled());
+		Assert.assertTrue(runner.isKilled());
 		
 		File logFile = new File(runner.getLogFilePath());
 		Props outputProps = runner.getNode().getOutputProps();
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6c1246d..92c1dee 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -23,6 +23,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 	HashMap<Integer, ExecutionReference> refs = new HashMap<Integer, ExecutionReference>();
 	int flowUpdateCount = 0;
 	HashMap<String, Integer> jobUpdateCount = new HashMap<String,Integer>();
+	Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
 	
 	@Override
 	public void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -38,7 +39,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 
 	@Override
 	public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
-		return null;
+		return activeFlows;
 	}
 
 	@Override
diff --git a/unit/java/azkaban/test/trigger/MockTriggerLoader.java b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
new file mode 100644
index 0000000..67ef5c7
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
@@ -0,0 +1,53 @@
+package azkaban.test.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+
+public class MockTriggerLoader implements TriggerLoader {
+
+	Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+	int triggerCount = 0;
+	
+	@Override
+	public synchronized void addTrigger(Trigger t) throws TriggerLoaderException {
+		t.setTriggerId(triggerCount);
+		t.setLastModifyTime(System.currentTimeMillis());
+		triggers.put(t.getTriggerId(), t);
+		triggerCount++;
+	}
+
+	@Override
+	public synchronized void removeTrigger(Trigger s) throws TriggerLoaderException {
+		triggers.remove(s);
+	}
+
+	@Override
+	public synchronized void updateTrigger(Trigger t) throws TriggerLoaderException {
+		t.setLastModifyTime(System.currentTimeMillis());
+		triggers.put(t.getTriggerId(), t);
+	}
+
+	@Override
+	public synchronized List<Trigger> loadTriggers() throws TriggerLoaderException {
+		return new ArrayList<Trigger>(triggers.values());
+	}
+
+	@Override
+	public synchronized Trigger loadTrigger(int triggerId) throws TriggerLoaderException {
+		return triggers.get(triggerId);
+	}
+
+	@Override
+	public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
+			throws TriggerLoaderException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
new file mode 100644
index 0000000..02c7cc1
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
@@ -0,0 +1,186 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.Props;
+
+public class TriggerManagerDeadlockTest {
+	
+	TriggerLoader loader;
+	TriggerManager triggerManager;
+	ExecutorLoader execLoader;
+	
+	@Before
+	public void setup() throws ExecutorManagerException, TriggerManagerException {
+		loader = new MockTriggerLoader();
+		Props props = new Props();
+		props.put("trigger.scan.interval", 1000);
+		props.put("executor.port", 12321);
+		execLoader = new MockExecutorLoader();
+		Map<String, Alerter> alerters = new HashMap<String, Alerter>();
+		ExecutorManager executorManager = new ExecutorManager(props, execLoader, alerters);
+		triggerManager = new TriggerManager(props, loader, executorManager);
+	}
+
+	@After
+	public void tearDown() {
+		
+	}
+	
+	@Test
+	public void deadlockTest() throws TriggerLoaderException, TriggerManagerException {
+		// this should well saturate it
+		for(int i = 0; i < 1000; i++) {
+			Trigger t = createSelfRegenTrigger();
+			loader.addTrigger(t);
+		}
+		// keep going and add more
+		for(int i = 0; i < 10000; i++) {
+			Trigger d = createDummyTrigger();
+			triggerManager.insertTrigger(d);
+			triggerManager.removeTrigger(d);
+		}
+		
+		System.out.println("No dead lock.");
+	}
+	
+	public class AlwaysOnChecker implements ConditionChecker {
+
+		public static final String type = "AlwaysOnChecker";
+		
+		private final String id;
+		private final Boolean alwaysOn;
+		
+		public AlwaysOnChecker(String id, Boolean alwaysOn) {
+			this.id = id;
+			this.alwaysOn = alwaysOn;
+		}
+		
+		@Override
+		public Object eval() {
+			// TODO Auto-generated method stub
+			return alwaysOn;
+		}
+
+		@Override
+		public Object getNum() {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public void reset() {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public String getId() {
+			return id;
+		}
+
+		@Override
+		public String getType() {
+			// TODO Auto-generated method stub
+			return type;
+		}
+
+		@Override
+		public ConditionChecker fromJson(Object obj) throws Exception {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public Object toJson() {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public void stopChecker() {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public void setContext(Map<String, Object> context) {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public long getNextCheckTime() {
+			// TODO Auto-generated method stub
+			return 0;
+		}
+		
+	}
+	
+	private Trigger createSelfRegenTrigger() {
+		ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+		String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+		triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+		Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+		
+		TriggerAction triggerAct = new CreateTriggerAction("dummyTrigger", createDummyTrigger());
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		actions.add(triggerAct);
+		
+		ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+		String expireExpr = alwaysOffChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+		expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+		Condition expireCond = new Condition(expireCheckers, expireExpr);
+			
+		Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+		return t;
+	}
+
+	private Trigger createDummyTrigger() {
+		ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+		String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+		triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+		Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+		
+		TriggerAction triggerAct = new DummyTriggerAction("howdy!");
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		actions.add(triggerAct);
+		
+		ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+		String expireExpr = alwaysOffChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+		expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+		Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+		Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+		return t;
+	}
+	
+}