azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/event/Event.java b/src/java/azkaban/execapp/event/Event.java
index 2ad400b..764b527 100644
--- a/src/java/azkaban/execapp/event/Event.java
+++ b/src/java/azkaban/execapp/event/Event.java
@@ -22,7 +22,9 @@ public class Event {
 		FLOW_FINISHED,
 		JOB_STARTED,
 		JOB_FINISHED,
-		JOB_STATUS_CHANGED
+		JOB_STATUS_CHANGED,
+		EXTERNAL_FLOW_UPDATED,
+		EXTERNAL_JOB_UPDATED
 	}
 	
 	private final Object runner;
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index e046958..1a91438 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -99,6 +99,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return execDir;
 	}
 	
+	public void watchedExecutionUpdate(ExecutableFlow flow) {
+		
+	}
+	
 	@Override
 	public void run() {
 		try {
@@ -412,7 +416,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			for (ExecutableNode node: pausedNode.values()) {
 				node.setStatus(Status.KILLED);
 				node.setPaused(false);
-				queueNextJob(node);
+				queueNextJob(node, "cancel-all-action");
 			}
 			
 			updateFlow();
@@ -437,7 +441,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				node.setStatus(Status.KILLED);
 				if (node.isPaused()) {
 					node.setPaused(false);
-					queueNextJob(node);
+					queueNextJob(node, "cancel-action");
 				}
 			}
 		}
@@ -458,7 +462,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (node.isPaused()) {
 					node.setPaused(false);
 					if (pausedNode.containsKey(jobId)) {
-						queueNextJob(node);
+						queueNextJob(node, "resume-action");
 					}
 					
 					updateFlow();
@@ -585,7 +589,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 			
 			for (ExecutableNode node: jobsToBeQueued) {
-				queueNextJob(node);
+				queueNextJob(node, "retry-action");
 			}
 			
 			updateFlow();
@@ -633,6 +637,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			case SKIPPED:
 			case SUCCEEDED:
 				continue;
+			case RUNNING:
+			case QUEUED:
+				return null;
 			default:
 				// Return null means it's not ready to run.
 				return null;
@@ -660,7 +667,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private synchronized void queueNextJobs(ExecutableNode finishedNode) {
 		for (String dependent : finishedNode.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-			queueNextJob(dependentNode);
+			queueNextJob(dependentNode, finishedNode.getJobId());
 		}
 	}
 
@@ -669,7 +676,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	 * 
 	 * @param node
 	 */
-	private void queueNextJob(ExecutableNode node) {
+	private void queueNextJob(ExecutableNode node, String trigger) {
 		Status nextStatus = getImpliedStatus(node);
 		if (nextStatus == null) {
 			// Not yet ready or not applicable
@@ -706,11 +713,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 				logger.info("Flow Paused. Pausing " + node.getJobId());
 			}
 			else {
-				logger.info("Adding " + node.getJobId() + " to run queue.");
 				if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
 					node.setStatus(Status.QUEUED);
 				}
-
+				logger.info("Adding " + node.getJobId() + " to run queue with status " + node.getStatus().toString() + " triggered by '" + trigger + "'.");
 				jobsToRun.add(runner);
 			}
 		}
@@ -726,7 +732,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (event.getType() == Type.JOB_FINISHED) {
 				ExecutableNode node = runner.getNode();
 
-				logger.info("Job Finished " + node.getJobId());
+				logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
 				synchronized (actionSyncObj) {
 					if (node.getStatus() == Status.FAILED) {
 						// Setting failure
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 68f5882..b0da031 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -38,7 +38,6 @@ import azkaban.project.ProjectLoader;
 import azkaban.execapp.event.Event;
 import azkaban.execapp.event.EventListener;
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
diff --git a/src/java/azkaban/execapp/FlowWatcher.java b/src/java/azkaban/execapp/FlowWatcher.java
index 5bcb4d0..22c0d24 100644
--- a/src/java/azkaban/execapp/FlowWatcher.java
+++ b/src/java/azkaban/execapp/FlowWatcher.java
@@ -1,9 +1,132 @@
 package azkaban.execapp;
 
+import azkaban.execapp.event.Event;
+import azkaban.execapp.event.EventListener;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+
 /**
  * Class that watches and updates execution flows that are being listened to by
  * other executing flows.
  */
 public class FlowWatcher {
+	private FlowRunnerManager manager;
+	private ExecutorLoader loader;
+	private WatcherThread watcherThread;
+	
+	public FlowWatcher(FlowRunnerManager manager, ExecutorLoader loader) {
+		this.manager = manager;
+		this.loader = loader;
+		
+		watcherThread = new WatcherThread();
+		watcherThread.start();
+	}
+	
+	public void watchExecution(int execId, EventListener listener) throws ExecutorManagerException {
+		watchExecution(execId, null, listener);
+	}
+	
+	public void watchExecution(int execId, String jobId, EventListener listener) throws ExecutorManagerException {
+		Watch watcher = new Watch(execId, jobId, listener);
+		
+		
+	}
+	
+	public void unwatchAll(EventListener listener, int execId, String jobId) {
+		
+	}
+	
+	private class WatcherThread extends Thread {
+		boolean isShutdown = false;
+		
+		public WatcherThread() {
+			this.setName("Execution-watcher-Thread");
+		}
+		
+		public void run() {
+			while (!isShutdown) {
+				
+			}
+		}
+	}
+	
+	public class Watch {
+		private final EventListener listener;
+
+		private final int execId;
+		private final String jobId;
+		private long updateTime = -1;
+
+		public Watch(int execId, EventListener listener) {
+			this(execId, null, listener);
+		}
+
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + getOuterType().hashCode();
+			result = prime * result + execId;
+			result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+			result = prime * result + ((listener == null) ? 0 : listener.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			Watch other = (Watch) obj;
+			if (!getOuterType().equals(other.getOuterType()))
+				return false;
+			if (execId != other.execId)
+				return false;
+			if (jobId == null) {
+				if (other.jobId != null)
+					return false;
+			} else if (!jobId.equals(other.jobId))
+				return false;
+			if (listener == null) {
+				if (other.listener != null)
+					return false;
+			} else if (!listener.equals(other.listener))
+				return false;
+			return true;
+		}
+
+		public Watch(int execId, String jobId, EventListener listener) {
+			this.execId = execId;
+			this.listener = listener;
+			this.jobId = jobId;
+		}
+		
+		private void notifyIfUpdate(ExecutableFlow flow) {
+			if (jobId == null) {
+				if (flow.getUpdateTime() > updateTime) {
+					listener.handleEvent(Event.create(flow, Event.Type.EXTERNAL_FLOW_UPDATED));
+
+					updateTime = flow.getUpdateTime();
+				}
+			}
+			else {
+				ExecutableNode node = flow.getExecutableNode(jobId);
+
+				if (node.getUpdateTime() > updateTime) {
+					listener.handleEvent(Event.create(node, Event.Type.EXTERNAL_JOB_UPDATED));
+
+					updateTime = node.getUpdateTime();
+				}
+			}
+		}
 
+		private FlowWatcher getOuterType() {
+			return FlowWatcher.this;
+		}
+	}
 }
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 2d9c475..01976b7 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -801,7 +801,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 			runner.update(connection, INSERT_PROPERTIES, project.getId(), project.getVersion(), name, System.currentTimeMillis(), defaultEncodingType.getNumVal(), data);
 			connection.commit();
 		} catch (SQLException e) {
-			throw new ProjectManagerException("Error fetching flows from project " + project.getName() + " version " + project.getVersion(), e);
+			throw new ProjectManagerException("Error uploading project properties " + name + " into " + project.getName() + " version " + project.getVersion(), e);
 		}
 	}