azkaban-memoizeit

Cancel does a harder kill now.

5/7/2013 9:50:35 PM

Details

diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 2e4f576..4a13a4b 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -10,7 +10,7 @@ import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
 
 public abstract class FlowWatcher {
-	private static final Logger logger = Logger.getLogger(FlowWatcher.class);
+	private Logger logger;
 	
 	private int execId;
 	private ExecutableFlow flow;
@@ -25,15 +25,19 @@ public abstract class FlowWatcher {
 		this.flow = flow;
 	}
 	
+	public void setLogger(Logger logger) {
+		this.logger = logger;
+	}
+	
+	protected Logger getLogger() {
+		return this.logger;
+	}
+	
 	/**
 	 * Called to fire events to the JobRunner listeners
 	 * @param jobId
 	 */
 	protected synchronized void handleJobFinished(String jobId, Status status) {
-		if (cancelWatch) {
-			return;
-		}
-
 		BlockingStatus block = map.get(jobId);
 		if (block != null) {
 			block.changeStatus(status);
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index ea78174..afe9248 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -3,6 +3,7 @@ package azkaban.execapp.event;
 
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.JobRunner;
+import azkaban.execapp.event.Event.Type;
 import azkaban.executor.ExecutableNode;
 
 public class LocalFlowWatcher extends FlowWatcher {
@@ -30,17 +31,30 @@ public class LocalFlowWatcher extends FlowWatcher {
 		runner.removeListener(watcherListener);
 		runner = null;
 		
+		getLogger().info("Stopping watcher, and unblocking pipeline");
 		super.failAllWatches();
 	}
 
 	public class LocalFlowWatcherListener implements EventListener {
 		@Override
 		public void handleEvent(Event event) {
-			if (event.getRunner() instanceof JobRunner) {
-				JobRunner runner = (JobRunner)event.getRunner();
-				ExecutableNode node = runner.getNode();
-				
-				handleJobFinished(node.getJobId(), node.getStatus());
+			if (event.getType() == Type.JOB_FINISHED) {
+				if (event.getRunner() instanceof FlowRunner) {
+					Object data = event.getData();
+					if (data instanceof ExecutableNode) {
+						ExecutableNode node = (ExecutableNode)data;
+						handleJobFinished(node.getJobId(), node.getStatus());
+					}
+				}
+				else if (event.getRunner() instanceof JobRunner) {
+					JobRunner runner = (JobRunner)event.getRunner();
+					ExecutableNode node = runner.getNode();
+					
+					handleJobFinished(node.getJobId(), node.getStatus());
+				}
+			}
+			else if (event.getType() == Type.FLOW_FINISHED) {
+				stopWatcher();
 			}
 		}
 	}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 7d68d80..99c1173 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -40,7 +40,7 @@ import azkaban.utils.PropsUtils;
 public class FlowRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 	// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
-	private static final long CHECK_WAIT_MS = 5*60*60*1000;
+	private static final long CHECK_WAIT_MS = 5*60*1000;
 	
 	private Logger logger;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
@@ -160,7 +160,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 		finally {
 			if (watcher != null) {
+				logger.info("Watcher is attached. Stopping watcher.");
 				watcher.stopWatcher();
+				logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
 			}
 
 			flow.setEndTime(System.currentTimeMillis());
@@ -182,6 +184,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		// Create execution dir
 		createLogger(flowId);
+		
+		if (this.watcher != null) {
+			this.watcher.setLogger(logger);
+		}
+		
 		logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
 		if (pipelineExecId != null) {
 			logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
@@ -296,7 +303,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				else {
 					List<ExecutableNode> jobsReadyToRun = findReadyJobsToRun();
 					
-					if (!jobsReadyToRun.isEmpty()) {
+					if (!jobsReadyToRun.isEmpty() && !flowCancelled) {
 						for (ExecutableNode node : jobsReadyToRun) {
 							long currentTime = System.currentTimeMillis();
 							
@@ -320,19 +327,21 @@ public class FlowRunner extends EventHandler implements Runnable {
 								logger.info("Killing " + node.getJobId() + " due to prior errors.");
 								node.setStartTime(currentTime);
 								node.setEndTime(currentTime);
+								fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 							} // If disabled, then we auto skip
 							else if (node.getStatus() == Status.DISABLED) {
 								logger.info("Skipping disabled job " + node.getJobId() + ".");
 								node.setStartTime(currentTime);
 								node.setEndTime(currentTime);
 								node.setStatus(Status.SKIPPED);
+								fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 							}
 						}
 						
 						updateFlow();
 					}
 					else {
-						if (isFlowFinished()) {
+						if (isFlowFinished() || flowCancelled ) {
 							flowFinished = true;
 							break;
 						}
@@ -346,6 +355,32 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 		}
 		
+		if (flowCancelled) {
+			try {
+				logger.info("Flow was force cancelled cleaning up.");
+				for(JobRunner activeRunner : activeJobRunners.values()) {
+					activeRunner.cancel();
+				}
+				
+				for (ExecutableNode node: flow.getExecutableNodes()) {
+					if (Status.isStatusFinished(node.getStatus())) {
+						continue;
+					}
+					else if (node.getStatus() == Status.DISABLED) {
+						node.setStatus(Status.SKIPPED);
+					}
+					else {
+						node.setStatus(Status.KILLED);
+					}
+					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+				}
+			} catch (Exception e) {
+				logger.error(e);
+			}
+	
+			updateFlow();
+		}
+		
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
 		
@@ -538,6 +573,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (watcher != null) {
 				logger.info("Watcher is attached. Stopping watcher.");
 				watcher.stopWatcher();
+				logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
 			}
 			
 			logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 16d80a5..4e7413b 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -85,6 +85,7 @@ public class JobRunner extends EventHandler implements Runnable {
 
 	private long delayStartMs = 0;
 	private boolean cancelled = false;
+	private BlockingStatus currentBlockStatus = null;
 	
 	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
 		this.props = props;
@@ -223,8 +224,12 @@ public class JobRunner extends EventHandler implements Runnable {
 					
 					for(BlockingStatus bStatus: blockingStatus) {
 						logger.info("Waiting on pipelined job " + bStatus.getJobId());
+						currentBlockStatus = bStatus;
 						bStatus.blockOnFinishedStatus();
 						logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
+						if (watcher.isWatchCancelled()) {
+							break;
+						}
 					}
 				}
 				if (watcher.isWatchCancelled()) {
@@ -237,6 +242,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				}
 			}
 			
+			currentBlockStatus = null;
 			long currentTime = System.currentTimeMillis();
 			if (delayStartMs > 0) {
 				logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
@@ -390,6 +396,11 @@ public class JobRunner extends EventHandler implements Runnable {
 			logError("Cancel has been called.");
 			this.cancelled = true;
 			
+			BlockingStatus status = currentBlockStatus;
+			if (status != null) {
+				status.unblock();
+			}
+			
 			// Cancel code here
 			if (job == null) {
 				logError("Job hasn't started yet.");
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 8567895..0b2d413 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1170,6 +1170,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 				if (perm.isPermissionSet(Type.EXECUTE) || adminPerm) {
 					page.add("exec", true);
 				}
+				else {
+					page.add("exec", false);
+				}
 				
 				List<Flow> flows = project.getFlows();
 				if (!flows.isEmpty()) {