azkaban-memoizeit

Merge branch 'release-2.1'

4/30/2013 7:43:24 PM

Details

diff --git a/src/java/azkaban/execapp/event/BlockingStatus.java b/src/java/azkaban/execapp/event/BlockingStatus.java
index 02c0f44..3a262b4 100644
--- a/src/java/azkaban/execapp/event/BlockingStatus.java
+++ b/src/java/azkaban/execapp/event/BlockingStatus.java
@@ -35,8 +35,10 @@ public class BlockingStatus {
 		return this.status;
 	}
 	
-	public synchronized void unblock() {
-		this.notifyAll();
+	public void unblock() {
+		synchronized(this) {
+			this.notifyAll();
+		}
 	}
 	
 	public void changeStatus(Status status) {
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 1c9a2ad..2e4f576 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -3,11 +3,15 @@ package azkaban.execapp.event;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.log4j.Logger;
+
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
 
 public abstract class FlowWatcher {
+	private static final Logger logger = Logger.getLogger(FlowWatcher.class);
+	
 	private int execId;
 	private ExecutableFlow flow;
 	private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();
@@ -26,6 +30,10 @@ public abstract class FlowWatcher {
 	 * @param jobId
 	 */
 	protected synchronized void handleJobFinished(String jobId, Status status) {
+		if (cancelWatch) {
+			return;
+		}
+
 		BlockingStatus block = map.get(jobId);
 		if (block != null) {
 			block.changeStatus(status);
@@ -65,11 +73,15 @@ public abstract class FlowWatcher {
 	}
 	
 	public synchronized void failAllWatches() {
+		logger.info("Failing all watches on " + execId);
 		cancelWatch = true;
 		
 		for(BlockingStatus status : map.values()) {
+			status.changeStatus(Status.KILLED);
 			status.unblock();
 		}
+		
+		logger.info("Successfully failed all watches on " + execId);
 	}
 	
 	public boolean isWatchCancelled() {
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index b51f17b..ea78174 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -1,5 +1,6 @@
 package azkaban.execapp.event;
 
+
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
@@ -7,6 +8,7 @@ import azkaban.executor.ExecutableNode;
 public class LocalFlowWatcher extends FlowWatcher {
 	private LocalFlowWatcherListener watcherListener;
 	private FlowRunner runner;
+	private boolean isShutdown = false;
 	
 	public LocalFlowWatcher(FlowRunner runner) {
 		super(runner.getExecutableFlow().getExecutionId());
@@ -20,6 +22,11 @@ public class LocalFlowWatcher extends FlowWatcher {
 	@Override
 	public void stopWatcher() {
 		// Just freeing stuff
+		if(isShutdown) {
+			return;
+		}
+		
+		isShutdown = true;
 		runner.removeListener(watcherListener);
 		runner = null;
 		
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index 398a364..ec60025 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -95,6 +95,9 @@ public class RemoteFlowWatcher extends FlowWatcher {
 
 	@Override
 	public synchronized void stopWatcher() {
+		if(isShutdown) {
+			return;
+		}
 		isShutdown = true;
 		if (thread != null) {
 			thread.interrupt();
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index fb4c349..f6f9612 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -155,9 +155,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (watcher != null) {
 				watcher.stopWatcher();
 			}
-			
-			closeLogger();
+
 			flow.setEndTime(System.currentTimeMillis());
+			logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
+			closeLogger();
+			
 			updateFlow();
 			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 		}
@@ -347,9 +349,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 				flow.setStatus(Status.FAILED);
 			case FAILED:
 			case KILLED:
+				logger.info("Flow is set to " + flow.getStatus().toString());
 				break;
 			default:
 				flow.setStatus(Status.SUCCEEDED);
+				logger.info("Flow is set to " + flow.getStatus().toString());
 			}
 		}
 	}
@@ -520,18 +524,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private void cancel() {
 		synchronized(mainSyncObj) {
+			logger.info("Cancel has been called on flow " + execId);
 			flowPaused = false;
 			flowCancelled = true;
 			
 			if (watcher != null) {
+				logger.info("Watcher is attached. Stopping watcher.");
 				watcher.stopWatcher();
 			}
 			
+			logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
 			for (JobRunner runner : activeJobRunners.values()) {
 				runner.cancel();
 			}
 			
 			if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
+				logger.info("Setting flow status to " + Status.KILLED.toString());
 				flow.setStatus(Status.KILLED);
 			}
 		}
@@ -686,6 +694,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 						jobOutputProps.put(node.getJobId(), runner.getOutputProps());
 					}
 					
+					updateFlow();
+					
 					if (node.getStatus() == Status.FAILED) {
 						// Retry failure if conditions are met.
 						if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 59cbbb7..a293ac7 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -502,11 +502,12 @@ public class FlowRunnerManager implements EventListener {
 	@Override
 	public void handleEvent(Event event) {
 		if (event.getType() == Event.Type.FLOW_FINISHED) {
+			
 			FlowRunner flowRunner = (FlowRunner)event.getRunner();
 			ExecutableFlow flow = flowRunner.getExecutableFlow();
 
 			recentlyFinishedFlows.put(flow.getExecutionId(), flow);
-			logger.info("Flow " + flow.getFlowId() + " is finished. Adding it to recently finished flows list.");
+			logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
 			runningFlows.remove(flow.getExecutionId());
 		}
 	}
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index cb58324..40bc1ec 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -189,7 +189,13 @@ public class JobRunner extends EventHandler implements Runnable {
 			node.setEndTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_FINISHED));
 			return;
-		} else if (node.getStatus() == Status.KILLED) {
+		} else if (this.cancelled) {
+			node.setStartTime(System.currentTimeMillis());
+			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
+			node.setStatus(Status.FAILED);
+			node.setEndTime(System.currentTimeMillis());
+			fireEvent(Event.create(this, Type.JOB_FINISHED));
+		} else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
 			node.setStartTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			node.setEndTime(System.currentTimeMillis());
@@ -314,14 +320,13 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private boolean prepareJob() throws RuntimeException {
 		// Check pre conditions
-		if (props == null) {
-			node.setStatus(Status.FAILED);
+		if (props == null || cancelled) {
 			logError("Failing job. The job properties don't exist");
 			return false;
 		}
 		
 		synchronized(syncObject) {
-			if (node.getStatus() == Status.FAILED) {
+			if (node.getStatus() == Status.FAILED || cancelled) {
 				return false;
 			}
 
@@ -387,7 +392,6 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			// Cancel code here
 			if (job == null) {
-				node.setStatus(Status.FAILED);
 				logError("Job hasn't started yet.");
 				// Just in case we're waiting on the delay
 				synchronized(this) {
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 3e9460f..a91a6d0 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -48,7 +48,7 @@ public class ExecutionOptions {
 	}
 	
 	public void setFailureEmails(Collection<String> emails) {
-		failureEmails.addAll(emails);
+		failureEmails = new ArrayList<String>(emails);
 	}
 	
 	public boolean isFailureEmailsOverridden() {
@@ -72,7 +72,7 @@ public class ExecutionOptions {
 	}
 	
 	public void setSuccessEmails(Collection<String> emails) {
-		successEmails.addAll(emails);
+		successEmails = new ArrayList<String>(emails);
 	}
 	
 	public List<String> getSuccessEmails() {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index b58c167..3f482bc 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -709,6 +709,10 @@ public class ExecutorManager {
 			}
 
 			// Delete the executing reference.
+			if (flow.getEndTime() == -1) {
+				flow.setEndTime(System.currentTimeMillis());
+				executorLoader.updateExecutableFlow(dsFlow);
+			}
 			executorLoader.removeActiveExecutableReference(execId);
 			
 			runningFlows.remove(execId);
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index c692c27..18d8cf3 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -295,6 +295,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 		}
 	}
 
+	@SuppressWarnings("resource")
 	private void uploadProjectFile(Connection connection, Project project, int version, String filetype, String filename, File localFile, String uploader) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner();
 		long updateTime = System.currentTimeMillis();
@@ -384,6 +385,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 		return handler;
 	}
 	
+	@SuppressWarnings("resource")
 	private ProjectFileHandler getUploadedFile(Connection connection, int projectId, int version) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner();
 		ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 2b1f319..7934a19 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -150,6 +150,7 @@ public class FileIOUtils {
 		
 		long skipped = fileStream.skip(offset);
 		if (skipped < offset) {
+			fileStream.close();
 			return new Pair<Integer,Integer>(0, 0);
 		}
 
@@ -174,6 +175,7 @@ public class FileIOUtils {
 		
 		long skipped = fileStream.skip(fileOffset);
 		if (skipped < fileOffset) {
+			fileStream.close();
 			return new LogData(fileOffset, 0, "");
 		}
 		
diff --git a/src/web/js/azkaban.flow.execute.view.js b/src/web/js/azkaban.flow.execute.view.js
index 97609ae..c8aed58 100644
--- a/src/web/js/azkaban.flow.execute.view.js
+++ b/src/web/js/azkaban.flow.execute.view.js
@@ -88,8 +88,8 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
 		ajax: "executeFlow",
 		flow: this.flowId,
 		disabled: disabled,
-		failureEmailOverride:failureEmailsOverride,
-		successEmailOverride:successEmailsOverride,
+		failureEmailsOverride:failureEmailsOverride,
+		successEmailsOverride:successEmailsOverride,
 		failureAction: failureAction,
 		failureEmails: failureEmails,
 		successEmails: successEmails,
diff --git a/src/web/js/azkaban.schedule.panel.view.js b/src/web/js/azkaban.schedule.panel.view.js
index e445a5e..af81c0c 100644
--- a/src/web/js/azkaban.schedule.panel.view.js
+++ b/src/web/js/azkaban.schedule.panel.view.js
@@ -54,7 +54,7 @@ azkaban.SchedulePanelView= Backbone.View.extend({
   	 console.log("Creating schedule for "+projectName+"."+scheduleData.flow);
 	var scheduleTime = $('#hour').val() + "," + $('#minutes').val() + "," + $('#am_pm').val() + "," + $('#timezone').val();
 	var scheduleDate = $('#datepicker').val();
-	var is_recurring = $('#is_recurring').val();
+	var is_recurring = document.getElementById('is_recurring').checked ? 'on' : 'off'; 
 	var period = $('#period').val() + $('#period_units').val();
 	
 	scheduleData.ajax = "scheduleFlow";