azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/event/BlockingStatus.java b/src/java/azkaban/execapp/event/BlockingStatus.java
index 02c0f44..3a262b4 100644
--- a/src/java/azkaban/execapp/event/BlockingStatus.java
+++ b/src/java/azkaban/execapp/event/BlockingStatus.java
@@ -35,8 +35,10 @@ public class BlockingStatus {
 		return this.status;
 	}
 	
-	public synchronized void unblock() {
-		this.notifyAll();
+	public void unblock() {
+		synchronized(this) {
+			this.notifyAll();
+		}
 	}
 	
 	public void changeStatus(Status status) {
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 1c9a2ad..2e4f576 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -3,11 +3,15 @@ package azkaban.execapp.event;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.log4j.Logger;
+
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
 
 public abstract class FlowWatcher {
+	private static final Logger logger = Logger.getLogger(FlowWatcher.class);
+	
 	private int execId;
 	private ExecutableFlow flow;
 	private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();
@@ -26,6 +30,10 @@ public abstract class FlowWatcher {
 	 * @param jobId
 	 */
 	protected synchronized void handleJobFinished(String jobId, Status status) {
+		if (cancelWatch) {
+			return;
+		}
+
 		BlockingStatus block = map.get(jobId);
 		if (block != null) {
 			block.changeStatus(status);
@@ -65,11 +73,15 @@ public abstract class FlowWatcher {
 	}
 	
 	public synchronized void failAllWatches() {
+		logger.info("Failing all watches on " + execId);
 		cancelWatch = true;
 		
 		for(BlockingStatus status : map.values()) {
+			status.changeStatus(Status.KILLED);
 			status.unblock();
 		}
+		
+		logger.info("Successfully failed all watches on " + execId);
 	}
 	
 	public boolean isWatchCancelled() {
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c80a9ff..c91bb08 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -155,9 +155,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (watcher != null) {
 				watcher.stopWatcher();
 			}
-			
-			closeLogger();
+
 			flow.setEndTime(System.currentTimeMillis());
+			logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
+			closeLogger();
+			
 			updateFlow();
 			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 		}
@@ -347,9 +349,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 				flow.setStatus(Status.FAILED);
 			case FAILED:
 			case KILLED:
+				logger.info("Flow is set to " + flow.getStatus().toString());
 				break;
 			default:
 				flow.setStatus(Status.SUCCEEDED);
+				logger.info("Flow is set to " + flow.getStatus().toString());
 			}
 		}
 	}
@@ -520,18 +524,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private void cancel() {
 		synchronized(mainSyncObj) {
+			logger.info("Cancel has been called on flow " + execId);
 			flowPaused = false;
 			flowCancelled = true;
 			
 			if (watcher != null) {
+				logger.info("Watcher is attached. Stopping watcher.");
 				watcher.stopWatcher();
 			}
 			
+			logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
 			for (JobRunner runner : activeJobRunners.values()) {
 				runner.cancel();
 			}
 			
 			if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
+				logger.info("Setting flow status to " + Status.KILLED.toString());
 				flow.setStatus(Status.KILLED);
 			}
 		}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d4c4d6e..f7ce2c7 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -501,11 +501,12 @@ public class FlowRunnerManager implements EventListener {
 	@Override
 	public void handleEvent(Event event) {
 		if (event.getType() == Event.Type.FLOW_FINISHED) {
+			
 			FlowRunner flowRunner = (FlowRunner)event.getRunner();
 			ExecutableFlow flow = flowRunner.getExecutableFlow();
 
 			recentlyFinishedFlows.put(flow.getExecutionId(), flow);
-			logger.info("Flow " + flow.getFlowId() + " is finished. Adding it to recently finished flows list.");
+			logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
 			runningFlows.remove(flow.getExecutionId());
 		}
 	}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 458ae84..9671a70 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -689,6 +689,10 @@ public class ExecutorManager {
 			}
 
 			// Delete the executing reference.
+			if (flow.getEndTime() == -1) {
+				flow.setEndTime(System.currentTimeMillis());
+				executorLoader.updateExecutableFlow(dsFlow);
+			}
 			executorLoader.removeActiveExecutableReference(execId);
 			
 			runningFlows.remove(execId);