azkaban-uncached

Adding Watchers for pipelining.

2/28/2013 6:13:03 PM

Changes

src/java/azkaban/execapp/FlowWatcher.java 132(+0 -132)

Details

diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 8770e0e..7c25028 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -36,11 +36,8 @@ import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
-import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxFlowRunnerManager;
 import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxSLAManager;
-import azkaban.jmx.JmxScheduler;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.utils.Props;
diff --git a/src/java/azkaban/execapp/event/BlockingStatus.java b/src/java/azkaban/execapp/event/BlockingStatus.java
new file mode 100644
index 0000000..02c0f44
--- /dev/null
+++ b/src/java/azkaban/execapp/event/BlockingStatus.java
@@ -0,0 +1,58 @@
+package azkaban.execapp.event;
+
+import azkaban.executor.Status;
+
+public class BlockingStatus {
+	private static final long WAIT_TIME = 5*60*1000;
+	private final int execId;
+	private final String jobId;
+	private Status status;
+	
+	public BlockingStatus(int execId, String jobId, Status initialStatus) {
+		this.execId = execId;
+		this.jobId = jobId;
+		this.status = initialStatus;
+	}
+	
+	public Status blockOnFinishedStatus() {
+		if (status == null) {
+			return null;
+		}
+		
+		while (!Status.isStatusFinished(status)) {
+			synchronized(this) {
+				try {
+					this.wait(WAIT_TIME);
+				} catch (InterruptedException e) {
+				}
+			}
+		}
+		
+		return status;
+	}
+
+	public Status viewStatus() {
+		return this.status;
+	}
+	
+	public synchronized void unblock() {
+		this.notifyAll();
+	}
+	
+	public void changeStatus(Status status) {
+		synchronized(this) {
+			this.status = status;
+			if (Status.isStatusFinished(status)) {
+				unblock();
+			}
+		}
+	}
+	
+	public int getExecId() {
+		return execId;
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+}
diff --git a/src/java/azkaban/execapp/event/EventHandler.java b/src/java/azkaban/execapp/event/EventHandler.java
index d1337b6..a71de50 100644
--- a/src/java/azkaban/execapp/event/EventHandler.java
+++ b/src/java/azkaban/execapp/event/EventHandler.java
@@ -33,4 +33,8 @@ public class EventHandler {
 			listener.handleEvent(event);
 		}
 	}
+	
+	public void removeListener(EventListener listener) {
+		listeners.remove(listener);
+	}
 }
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
new file mode 100644
index 0000000..9a46168
--- /dev/null
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -0,0 +1,76 @@
+package azkaban.execapp.event;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+public abstract class FlowWatcher {
+	private int execId;
+	private ExecutableFlow flow;
+	private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();
+	private boolean cancelWatch = false;
+	
+	public FlowWatcher(int execId) {
+		this.execId = execId;
+	}
+	
+	public void setFlow(ExecutableFlow flow) {
+		this.flow = flow;
+	}
+	
+	/**
+	 * Called to fire events to the JobRunner listeners
+	 * @param jobId
+	 */
+	protected synchronized void handleJobFinished(String jobId, Status status) {
+		BlockingStatus block = map.get(jobId);
+		if (block != null) {
+			block.changeStatus(status);
+		}
+	}
+
+	public int getExecId() {
+		return execId;
+	}
+	
+	public synchronized BlockingStatus getBlockingStatus(String jobId) {
+		if (cancelWatch) {
+			return null;
+		}
+		
+		ExecutableNode node = flow.getExecutableNode(jobId);
+		if (node == null) {
+			return null;
+		}
+		
+		BlockingStatus blockingStatus = map.get(jobId);
+		if (blockingStatus == null) {
+			blockingStatus = new BlockingStatus(execId, jobId, node.getStatus());
+			map.put(jobId, blockingStatus);
+		}
+
+		return blockingStatus;
+	}
+	
+	public Status peekStatus(String jobId) {
+		ExecutableNode node = flow.getExecutableNode(jobId);
+		if (node != null) {
+			return node.getStatus();
+		}
+		
+		return null;
+	}
+	
+	public synchronized void failAllWatches() {
+		cancelWatch = true;
+		
+		for(BlockingStatus status : map.values()) {
+			status.unblock();
+		}
+	}
+	
+	public abstract void stopWatcher();
+}
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
new file mode 100644
index 0000000..b51f17b
--- /dev/null
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -0,0 +1,40 @@
+package azkaban.execapp.event;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.ExecutableNode;
+
+public class LocalFlowWatcher extends FlowWatcher {
+	private LocalFlowWatcherListener watcherListener;
+	private FlowRunner runner;
+	
+	public LocalFlowWatcher(FlowRunner runner) {
+		super(runner.getExecutableFlow().getExecutionId());
+		super.setFlow(runner.getExecutableFlow());
+		
+		watcherListener = new LocalFlowWatcherListener();
+		this.runner = runner;
+		runner.addListener(watcherListener);
+	}
+
+	@Override
+	public void stopWatcher() {
+		// Just freeing stuff
+		runner.removeListener(watcherListener);
+		runner = null;
+		
+		super.failAllWatches();
+	}
+
+	public class LocalFlowWatcherListener implements EventListener {
+		@Override
+		public void handleEvent(Event event) {
+			if (event.getRunner() instanceof JobRunner) {
+				JobRunner runner = (JobRunner)event.getRunner();
+				ExecutableNode node = runner.getNode();
+				
+				handleJobFinished(node.getJobId(), node.getStatus());
+			}
+		}
+	}
+}
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
new file mode 100644
index 0000000..398a364
--- /dev/null
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -0,0 +1,106 @@
+package azkaban.execapp.event;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+
+public class RemoteFlowWatcher extends FlowWatcher {
+	private final static long CHECK_INTERVAL_MS = 60*1000;
+	
+	private int execId;
+	private ExecutorLoader loader;
+	private ExecutableFlow flow;
+	private RemoteUpdaterThread thread;
+	private boolean isShutdown = false;
+	
+	// Every minute
+	private long checkIntervalMs = CHECK_INTERVAL_MS;
+	
+	public RemoteFlowWatcher(int execId, ExecutorLoader loader) {
+		this(execId, loader, CHECK_INTERVAL_MS);
+	}
+	
+	public RemoteFlowWatcher(int execId, ExecutorLoader loader, long interval) {
+		super(execId);
+		checkIntervalMs = interval;
+		
+		try {
+			flow = loader.fetchExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			return;
+		}
+		
+		super.setFlow(flow);
+		this.loader = loader;
+		this.execId = execId;
+		if (flow != null) {
+			this.thread = new RemoteUpdaterThread();
+			this.thread.setName("Remote-watcher-flow-" + execId);
+			this.thread.start();
+		}
+	}
+	
+	private class RemoteUpdaterThread extends Thread {
+		@Override
+		public void run() {
+			do {
+				ExecutableFlow updateFlow = null;
+				try {
+					updateFlow = loader.fetchExecutableFlow(execId);
+				} catch (ExecutorManagerException e) {
+					e.printStackTrace();
+					isShutdown = true;
+				}
+				
+				if (flow == null) {
+					flow = updateFlow;
+				}
+				else {
+					flow.setStatus(updateFlow.getStatus());
+					flow.setEndTime(updateFlow.getEndTime());
+					flow.setUpdateTime(updateFlow.getUpdateTime());
+					
+					for (ExecutableNode node : flow.getExecutableNodes()) {
+						String jobId = node.getJobId();
+						ExecutableNode newNode = updateFlow.getExecutableNode(jobId);
+						long updateTime = node.getUpdateTime();
+						node.setUpdateTime(newNode.getUpdateTime());
+						node.setStatus(newNode.getStatus());
+						node.setStartTime(newNode.getStartTime());
+						node.setEndTime(newNode.getEndTime());
+						
+						if (updateTime < newNode.getUpdateTime()) {
+							handleJobFinished(jobId, newNode.getStatus());
+						}
+					}
+				}
+				
+				if (Status.isStatusFinished(flow.getStatus())) {
+					isShutdown = true;
+				}
+				else {
+					synchronized(this) {
+						try {
+							wait(checkIntervalMs);
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+			} while (!isShutdown);
+		}
+		
+	}
+
+	@Override
+	public synchronized void stopWatcher() {
+		isShutdown = true;
+		if (thread != null) {
+			thread.interrupt();
+		}
+		super.failAllWatches();
+		loader = null;
+		flow = null;
+	}
+}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c912ad9..c61d724 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -25,12 +25,13 @@ import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
 import azkaban.execapp.event.EventHandler;
 import azkaban.execapp.event.EventListener;
+import azkaban.execapp.event.FlowWatcher;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
@@ -68,6 +69,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
 	private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
+	
 	private Map<Pair<String, Integer>, JobRunner> allJobs = new ConcurrentHashMap<Pair<String, Integer>, JobRunner>();
 	private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
 	
@@ -80,7 +82,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowFinished = false;
 	private boolean flowCancelled = false;
 	
-	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+	// Used for pipelining
+	private Integer pipelineLevel = null;
+	private Integer pipelineExecId = null;
+	
+	// Watches external flows for execution.
+	private FlowWatcher watcher = null;
+	
+	public FlowRunner(ExecutableFlow flow, FlowWatcher watcher, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
@@ -88,6 +97,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
+		
+		this.pipelineLevel = flow.getPipelineLevel();
+		this.pipelineExecId = flow.getPipelineExecutionId();
+		this.watcher = watcher;
 	}
 
 	public FlowRunner setGlobalProps(Props globalProps) {
@@ -99,10 +112,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return execDir;
 	}
 	
-	public void watchedExecutionUpdate(ExecutableFlow flow) {
-		
-	}
-	
 	@Override
 	public void run() {
 		try {
@@ -116,6 +125,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			// Create execution dir
 			createLogger(flowId);
 			logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
+			if (pipelineExecId != null) {
+				logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
+			}
 			
 			// The current thread is used for interrupting blocks
 			currentThread = Thread.currentThread();
@@ -144,6 +156,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 			flow.setStatus(Status.FAILED);
 		}
 		finally {
+			if (watcher != null) {
+				watcher.stopWatcher();
+			}
+			
 			closeLogger();
 			flow.setEndTime(System.currentTimeMillis());
 			updateFlow();
@@ -346,6 +362,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		// should have one prop with system secrets, the other user level props
 		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, logger);
+		if (watcher != null) {
+			jobRunner.setPipeline(watcher, pipelineLevel);
+		}
+		
 		jobRunner.addListener(listener);
 
 		return jobRunner;
@@ -398,6 +418,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 			flowPaused = false;
 			flowCancelled = true;
 			
+			if (watcher != null) {
+				watcher.stopWatcher();
+			}
+			
 			for (JobRunner runner: pausedJobsToRun) {
 				ExecutableNode node = runner.getNode();
 				logger.info("Resumed flow is cancelled. Job killed " + node.getJobId() + " by " + user);
@@ -669,7 +693,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		String trigger = finishedNode.getAttempt() > 0 ? finishedNode.getJobId() + "." + finishedNode.getAttempt() : finishedNode.getJobId();
 		for (String dependent : finishedNode.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-			queueNextJob(dependentNode, finishedNode.getJobId());
+			queueNextJob(dependentNode, trigger);
 		}
 	}
 
@@ -750,6 +774,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 					jobOutputProps.put(node.getJobId(), runner.getOutputProps());
 					
 					runningJob.remove(node.getJobId());
+					
+					fireEventListeners(event);
 					queueNextJobs(node);
 				}
 				
@@ -781,13 +807,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean isFlowFinished() {
 		for (String end: flow.getEndNodes()) {
 			ExecutableNode node = flow.getExecutableNode(end);
-			switch(node.getStatus()) {
-			case KILLED:
-			case SKIPPED:
-			case FAILED:
-			case SUCCEEDED:
-				continue;
-			default:
+			if (!Status.isStatusFinished(node.getStatus())) {
 				return false;
 			}
 		}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b0da031..6871e08 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -37,6 +37,9 @@ import org.apache.log4j.Logger;
 import azkaban.project.ProjectLoader;
 import azkaban.execapp.event.Event;
 import azkaban.execapp.event.EventListener;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
@@ -319,7 +322,20 @@ public class FlowRunnerManager implements EventListener {
 		setupFlow(flow);
 		
 		// Setup flow runner
-		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+		FlowWatcher watcher = null;
+		if (flow.getPipelineExecutionId() != null) {
+			int pipelineExecId = flow.getPipelineExecutionId();
+			FlowRunner runner = runningFlows.get(pipelineExecId);
+			
+			if (runner != null) {
+				watcher = new LocalFlowWatcher(runner);
+			}
+			else {
+				watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
+			}
+		}
+		
+		FlowRunner runner = new FlowRunner(flow, watcher, executorLoader, projectLoader, jobtypeManager);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(this);
 		
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index e34132e..f6c482b 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -17,6 +17,10 @@ package azkaban.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -24,13 +28,15 @@ import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+import azkaban.execapp.event.BlockingStatus;
 import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
 import azkaban.execapp.event.EventHandler;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.execapp.event.FlowWatcher;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.jobExecutor.Job;
@@ -61,7 +67,10 @@ public class JobRunner extends EventHandler implements Runnable {
 	private Object syncObject = new Object();
 	
 	private final JobTypeManager jobtypeManager;
-
+	private Integer pipelineLevel = null;
+	private FlowWatcher watcher = null;
+	private Set<String> pipelineJobs = new HashSet<String>();
+	
 	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
 		this.props = props;
 		this.node = node;
@@ -72,6 +81,19 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.flowLogger = flowLogger;
 	}
 	
+	public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
+		this.watcher = watcher;
+		this.pipelineLevel = pipelineLevel;
+
+		if (pipelineLevel == 1) {
+			pipelineJobs.add(node.getJobId());
+		}
+		else if (pipelineLevel == 2) {
+			pipelineJobs.add(node.getJobId());
+			pipelineJobs.addAll(node.getOutNodes());
+		}
+	}
+	
 	public ExecutableNode getNode() {
 		return node;
 	}
@@ -122,16 +144,16 @@ public class JobRunner extends EventHandler implements Runnable {
 	@Override
 	public void run() {
 		Thread.currentThread().setName("JobRunner-" + node.getJobId() + "-" + executionId);
-
-		node.setStartTime(System.currentTimeMillis());
 		
 		if (node.getStatus() == Status.DISABLED) {
+			node.setStartTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			node.setStatus(Status.SKIPPED);
 			node.setEndTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_FINISHED));
 			return;
 		} else if (node.getStatus() == Status.KILLED) {
+			node.setStartTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			node.setEndTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_FINISHED));
@@ -139,7 +161,32 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 		else {
 			createLogger();
-			
+			node.setUpdateTime(System.currentTimeMillis());
+
+			// For pipelining of jobs. Will watch other jobs.
+			if (!pipelineJobs.isEmpty()) {
+				String blockedList = "";
+				ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
+				for (String waitingJobId : pipelineJobs) {
+					Status status = watcher.peekStatus(waitingJobId);
+					if (status != null && !Status.isStatusFinished(status)) {
+						BlockingStatus block = watcher.getBlockingStatus(waitingJobId);
+						blockingStatus.add(block);
+						blockedList += waitingJobId + ",";
+					}
+				}
+				if (!blockingStatus.isEmpty()) {
+					logger.info("Pipeline job " + node.getJobId() + " waiting on " + blockedList + " in execution " + watcher.getExecId());
+					
+					for(BlockingStatus bStatus: blockingStatus) {
+						logger.info("Waiting on pipelined job " + bStatus.getJobId());
+						bStatus.blockOnFinishedStatus();
+						logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
+					}
+				}
+			}
+
+			node.setStartTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
@@ -185,7 +232,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.fireEventListeners(event);
 	}
 	
-	private boolean prepareJob() throws RuntimeException{
+	private boolean prepareJob() throws RuntimeException {
 		// Check pre conditions
 		if (props == null) {
 			node.setStatus(Status.FAILED);
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index ce88cb8..d2d44dc 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -58,6 +58,7 @@ public class ExecutableFlow {
 	private boolean notifyOnLastFailure = false;
 	
 	private Integer pipelineLevel = null;
+	private Integer pipelineExecId = null;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
 
 	public enum FailureAction {
@@ -67,49 +68,6 @@ public class ExecutableFlow {
 	}
 	
 	private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
-
-	public static enum Status {
-		READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110);
-		
-		private int numVal;
-
-		Status(int numVal) {
-			this.numVal = numVal;
-		}
-
-		public int getNumVal() {
-			return numVal;
-		}
-		
-		public static Status fromInteger(int x) {
-			switch (x) {
-			case 10:
-				return READY;
-			case 20:
-				return PREPARING;
-			case 30:
-				return RUNNING;
-			case 40:
-				return PAUSED;
-			case 50:
-				return SUCCEEDED;
-			case 60:
-				return KILLED;
-			case 70:
-				return FAILED;
-			case 80:
-				return FAILED_FINISHING;
-			case 90:
-				return SKIPPED;
-			case 100:
-				return DISABLED;
-			case 110:
-				return QUEUED;
-			default:
-				return READY;
-			}
-		}
-	}
 	
 	public ExecutableFlow(Flow flow) {
 		this.projectId = flow.getProjectId();
@@ -330,6 +288,7 @@ public class ExecutableFlow {
 		flowObj.put("failureEmails", failureEmails);
 		flowObj.put("failureAction", failureAction.toString());
 		flowObj.put("pipelineLevel", pipelineLevel);
+		flowObj.put("pipelineExecId", pipelineExecId);
 		flowObj.put("version", version);
 		
 		ArrayList<Object> props = new ArrayList<Object>();
@@ -464,6 +423,7 @@ public class ExecutableFlow {
 			exFlow.failureAction = FailureAction.valueOf((String)flowObj.get("failureAction"));
 		}
 		exFlow.pipelineLevel = (Integer)flowObj.get("pipelineLevel");
+		exFlow.pipelineExecId = (Integer)flowObj.get("pipelineExecId");
 		
 		// Copy nodes
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -532,10 +492,14 @@ public class ExecutableFlow {
 		this.submitUser = submitUser;
 	}
 
-	public void setPipelineLevel(int level) {
+	public void setPipelineLevel(Integer level) {
 		pipelineLevel = level;
 	}
 	
+	public void setPipelineExecutionId(Integer execId) {
+		pipelineExecId = execId;
+	}
+	
 	public void setNotifyOnFirstFailure(boolean notify) {
 		this.notifyOnFirstFailure = notify;
 	}
@@ -559,4 +523,12 @@ public class ExecutableFlow {
 	public void setVersion(int version) {
 		this.version = version;
 	}
+	
+	public Integer getPipelineLevel() {
+		return pipelineLevel;
+	}
+	
+	public Integer getPipelineExecutionId() {
+		return pipelineExecId;
+	}
 }
diff --git a/src/java/azkaban/executor/ExecutableJobInfo.java b/src/java/azkaban/executor/ExecutableJobInfo.java
index b716b5f..b096505 100644
--- a/src/java/azkaban/executor/ExecutableJobInfo.java
+++ b/src/java/azkaban/executor/ExecutableJobInfo.java
@@ -3,8 +3,6 @@ package azkaban.executor;
 import java.util.HashMap;
 import java.util.Map;
 
-import azkaban.executor.ExecutableFlow.Status;
-
 public class ExecutableJobInfo {
 	private final int execId;
 	private final int projectId;
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index aaa6992..35483c8 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Node;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 75b2723..59d7727 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -8,7 +8,6 @@ import javax.mail.MessagingException;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 72c8e3d..975a58e 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -35,7 +35,6 @@ import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.project.Project;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index c70d9f5..2fb2dbe 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -23,7 +23,6 @@ import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
@@ -110,8 +109,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 
 		long id;
 		try {
-			flow.setStatus(ExecutableFlow.Status.PREPARING);
-			runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(), ExecutableFlow.Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);
+			flow.setStatus(Status.PREPARING);
+			runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);
 			connection.commit();
 			id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
 
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
new file mode 100644
index 0000000..1f4ee39
--- /dev/null
+++ b/src/java/azkaban/executor/Status.java
@@ -0,0 +1,56 @@
+package azkaban.executor;
+
+public enum Status {
+	READY(10), PREPARING(20), RUNNING(30), PAUSED(40), SUCCEEDED(50), KILLED(60), FAILED(70), FAILED_FINISHING(80), SKIPPED(90), DISABLED(100), QUEUED(110);
+	
+	private int numVal;
+
+	Status(int numVal) {
+		this.numVal = numVal;
+	}
+
+	public int getNumVal() {
+		return numVal;
+	}
+	
+	public static Status fromInteger(int x) {
+		switch (x) {
+		case 10:
+			return READY;
+		case 20:
+			return PREPARING;
+		case 30:
+			return RUNNING;
+		case 40:
+			return PAUSED;
+		case 50:
+			return SUCCEEDED;
+		case 60:
+			return KILLED;
+		case 70:
+			return FAILED;
+		case 80:
+			return FAILED_FINISHING;
+		case 90:
+			return SKIPPED;
+		case 100:
+			return DISABLED;
+		case 110:
+			return QUEUED;
+		default:
+			return READY;
+		}
+	}
+	
+	public static boolean isStatusFinished(Status status) {
+		switch (status) {
+		case FAILED:
+		case KILLED:
+		case SUCCEEDED:
+		case SKIPPED:
+			return true;
+		default:
+			return false;
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 319ce96..5d71d30 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -1,8 +1,5 @@
 package azkaban.flow;
 
-import java.util.UUID;
-
-import org.joda.time.DateTime;
 
 public class CommonJobProperties {
 	/*
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 9a53f37..540fea6 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -35,7 +35,7 @@ import org.joda.time.format.DateTimeFormatter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.Status;
 
 import azkaban.flow.Flow;
 import azkaban.project.Project;
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index 17a67d0..9b19da8 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -9,8 +9,8 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
 import azkaban.sla.SLA;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 04fbe08..1adffc6 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -18,11 +18,11 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index 4878e34..ef63fb6 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -7,7 +7,7 @@ import org.joda.time.DurationFieldType;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.Status;
 
 public class WebUtils {
 	public static final String DATE_TIME_STRING = "YYYY-MM-dd HH:mm:ss";
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 71eb652..97c7414 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,7 +35,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.FileAppender;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -179,11 +178,6 @@ public class AzkabanWebServer implements AzkabanServer {
 		
 		configureMBeanServer();
 	}
-	
-	private void setupLoggers() {
-		//FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
-		
-	}
 
 	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
 		this.viewerPlugins = viewerPlugins;
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f275119..c16f6b9 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -33,18 +33,16 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.user.Permission;
-import azkaban.user.Role;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
-import azkaban.user.UserManager;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
@@ -54,7 +52,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
 	private ScheduleManager scheduleManager;
-	private UserManager userManager;
 	private ExecutorVMHelper vmHelper;
 
 	@Override
@@ -64,7 +61,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		projectManager = server.getProjectManager();
 		executorManager = server.getExecutorManager();
 		scheduleManager = server.getScheduleManager();
-		userManager = server.getUserManager();
 		vmHelper = new ExecutorVMHelper();
 	}
 
@@ -750,19 +746,4 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			return project.getName();
 		}
 	}
-	
-	private boolean hasPermission(Project project, User user, Permission.Type type) {
-		if (project.hasPermission(user, type)) {
-			return true;
-		}
-		
-		for(String roleName: user.getRoles()) {
-			Role role = userManager.getRole(roleName);
-			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
-				return true;
-			}
-		}
-		
-		return false;
-	}
 }
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index a92fb45..891e896 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -31,6 +31,9 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.fileupload.servlet.ServletFileUpload;
 import org.apache.log4j.Logger;
 
+import azkaban.project.Project;
+import azkaban.user.Permission;
+import azkaban.user.Role;
 import azkaban.user.User;
 import azkaban.user.UserManager;
 import azkaban.user.UserManagerException;
@@ -217,6 +220,22 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		return session;
 	}
 	
+	protected boolean hasPermission(Project project, User user, Permission.Type type) {
+		UserManager userManager = getApplication().getUserManager();
+		if (project.hasPermission(user, type)) {
+			return true;
+		}
+		
+		for(String roleName: user.getRoles()) {
+			Role role = userManager.getRole(roleName);
+			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+				return true;
+			}
+		}
+		
+		return false;
+	}
+	
 	protected void handleAjaxLoginAction(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> ret) throws ServletException {
 		if (hasParam(req, "username") && hasParam(req, "password")) {
 			Session session = null;
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 2f1403b..0e06e59 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1250,21 +1250,6 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-	private boolean hasPermission(Project project, User user, Permission.Type type) {
-		if (project.hasPermission(user, type)) {
-			return true;
-		}
-		
-		for(String roleName: user.getRoles()) {
-			Role role = userManager.getRole(roleName);
-			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
-				return true;
-			}
-		}
-		
-		return false;
-	}
-	
 	private Permission getPermissionObject(Project project, User user, Permission.Type type) {
 		Permission perm = project.getCollectivePermission(user);
 		
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 489299a..13de687 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -27,7 +27,6 @@ import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.swing.text.StyledEditorKit.BoldAction;
 
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
@@ -42,7 +41,6 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
@@ -585,19 +583,4 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 
 		return firstSchedTime;
 	}
-
-	private boolean hasPermission(Project project, User user, Permission.Type type) {
-		if (project.hasPermission(user, type)) {
-			return true;
-		}
-		
-		for(String roleName: user.getRoles()) {
-			Role role = userManager.getRole(roleName);
-			if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
-				return true;
-			}
-		}
-		
-		return false;
-	}
 }
diff --git a/src/java/log4j.properties b/src/java/log4j.properties
index 9df233d..52008f9 100644
--- a/src/java/log4j.properties
+++ b/src/java/log4j.properties
@@ -1,6 +1,7 @@
 log4j.rootLogger=INFO, Console
-log4j.logger.azkaban.webapp.AzkabanServer=INFO, R
+log4j.logger.azkaban.webapp=INFO, WebServer
 log4j.logger.azkaban.webapp.servlet.AbstractAzkabanServlet=INFO, R
+log4j.logger.azkaban.webapp.servlet.LoginAbstractAzkabanServlet=INFO, R
 
 log4j.appender.R=org.apache.log4j.RollingFileAppender
 log4j.appender.R.layout=org.apache.log4j.PatternLayout
@@ -9,6 +10,20 @@ log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1
 log4j.appender.R.MaxFileSize=102400MB
 log4j.appender.R.MaxBackupIndex=2
 
+log4j.appender.WebServer=org.apache.log4j.RollingFileAppender
+log4j.appender.WebServer.layout=org.apache.log4j.PatternLayout
+log4j.appender.WebServer.File=azkaban-webserver.log
+log4j.appender.WebServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.WebServer.MaxFileSize=102400MB
+log4j.appender.WebServer.MaxBackupIndex=2
+
+log4j.appender.ExecServer=org.apache.log4j.RollingFileAppender
+log4j.appender.ExecServer.layout=org.apache.log4j.PatternLayout
+log4j.appender.ExecServer.File=azkaban-execserver.log
+log4j.appender.ExecServer.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
+log4j.appender.ExecServer.MaxFileSize=102400MB
+log4j.appender.ExecServer.MaxBackupIndex=2
+
 log4j.appender.Console=org.apache.log4j.ConsoleAppender
 log4j.appender.Console.layout=org.apache.log4j.PatternLayout
 log4j.appender.Console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] [Azkaban] %m%n
\ No newline at end of file
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 07e0b7b..6e76312 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -6,7 +6,8 @@ CREATE TABLE execution_logs (
 	start_byte INT,
 	end_byte INT,
 	log LONGBLOB,
-	PRIMARY KEY (exec_id, name, attempt),
+	PRIMARY KEY (exec_id, name, attempt, start_byte),
+	INDEX log_attempt (exec_id, name, attempt),
 	INDEX log_index (exec_id, name),
 	INDEX byte_log_index(exec_id, name, start_byte, end_byte)
 ) ENGINE=InnoDB;
diff --git a/src/sql/create_project_event_table.sql b/src/sql/create_project_event_table.sql
index 4cf8f21..ea9802e 100644
--- a/src/sql/create_project_event_table.sql
+++ b/src/sql/create_project_event_table.sql
@@ -3,6 +3,6 @@ CREATE TABLE project_events (
 	event_type TINYINT NOT NULL,
 	event_time BIGINT NOT NULL,
 	username VARCHAR(64),
-	message VARCHAR(128),
+	message VARCHAR(512),
 	INDEX log (project_id, event_time)
 ) ENGINE=InnoDB;
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
index 80273f4..9ab565e 100644
--- a/src/sql/update_2.0_to_2.01.sql
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -8,7 +8,8 @@ ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
 
 ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
 ALTER TABLE execution_logs DROP PRIMARY KEY;
-ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt);
+ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
+ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
 
 ALTER TABLE schedules ADD COLUMN enc_type TINYINT;
 ALTER TABLE schedules ADD COLUMN schedule_options LONGBLOB;
diff --git a/unit/executions/exectest1/exec1.flow b/unit/executions/exectest1/exec1.flow
index 3612d58..5ca051c 100644
--- a/unit/executions/exectest1/exec1.flow
+++ b/unit/executions/exectest1/exec1.flow
@@ -30,17 +30,10 @@
   },{
     "source" : "job7",
     "target" : "job8"
-  },{
-    "source" : "job7",
-    "target" : "job9"
   },
   {
     "source" : "job8",
     "target" : "job10"
-  },
-  {
-    "source" : "job9",
-    "target" : "job10"
   }
    ],
   "failure.email" : [],
@@ -126,16 +119,6 @@
   },
   {
     "propSource" : "prop2.properties",
-    "id" : "job9",
-    "jobType" : "java",
-    "layout" : {
-      "level" : 0
-    },
-    "jobSource" : "job9.job",
-    "expectedRuntime" : 1
-  },
-  {
-    "propSource" : "prop2.properties",
     "id" : "job10",
     "jobType" : "java",
     "layout" : {
diff --git a/unit/executions/exectest1/exec1-mod.flow b/unit/executions/exectest1/exec1-mod.flow
new file mode 100644
index 0000000..3612d58
--- /dev/null
+++ b/unit/executions/exectest1/exec1-mod.flow
@@ -0,0 +1,156 @@
+{
+  "project.id":1,
+  "version":2,
+  "id" : "derived-member-data",
+  "success.email" : [],
+  "edges" : [ {
+    "source" : "job1",
+    "target" : "job2"
+  }, {
+    "source" : "job2",
+    "target" : "job3"
+  },{
+    "source" : "job2",
+    "target" : "job4"
+  }, {
+    "source" : "job3",
+    "target" : "job5"
+  },{
+    "source" : "job4",
+    "target" : "job5"
+  },{
+    "source" : "job5",
+    "target" : "job7"
+  },{
+    "source" : "job1",
+    "target" : "job6"
+  },{
+    "source" : "job6",
+    "target" : "job7"
+  },{
+    "source" : "job7",
+    "target" : "job8"
+  },{
+    "source" : "job7",
+    "target" : "job9"
+  },
+  {
+    "source" : "job8",
+    "target" : "job10"
+  },
+  {
+    "source" : "job9",
+    "target" : "job10"
+  }
+   ],
+  "failure.email" : [],
+  "nodes" : [ {
+    "propSource" : "prop2.properties",
+    "id" : "job1",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job1.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job2",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job2.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job3",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job3.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job4",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job4.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job5",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job5.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job6",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job6.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job7",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job7.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job8",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job8.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job9",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job9.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job10",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job10.job",
+    "expectedRuntime" : 1
+  }
+   ],
+  "layedout" : false,
+  "type" : "flow",
+  "props" : [ {
+    "inherits" : "prop1.properties",
+    "source" : "prop2.properties"
+  },{
+    "source" : "prop1.properties"
+  }]
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/event/BlockingStatusTest.java b/unit/java/azkaban/test/execapp/event/BlockingStatusTest.java
new file mode 100644
index 0000000..3acd91b
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/event/BlockingStatusTest.java
@@ -0,0 +1,114 @@
+package azkaban.test.execapp.event;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import azkaban.execapp.event.BlockingStatus;
+import azkaban.executor.Status;
+
+public class BlockingStatusTest {
+
+	public class WatchingThread extends Thread {
+		private BlockingStatus status;
+		private long diff = 0;
+		public WatchingThread(BlockingStatus status) {
+			this.status = status;
+		}
+		
+		public void run() {
+			long startTime = System.currentTimeMillis();
+			status.blockOnFinishedStatus();
+			diff = System.currentTimeMillis() - startTime;
+		}
+
+		public long getDiff() {
+			return diff;
+		}
+	}
+	
+	@Test
+	public void testFinishedBlock() {
+		BlockingStatus status = new BlockingStatus(1, "test", Status.SKIPPED);
+		
+		WatchingThread thread = new WatchingThread(status);
+		thread.start();
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		System.out.println("Diff " + thread.getDiff());
+		Assert.assertTrue(thread.getDiff() < 100);
+	}
+	
+	@Test
+	public void testUnfinishedBlock() throws InterruptedException {
+		BlockingStatus status = new BlockingStatus(1, "test", Status.QUEUED);
+		
+		WatchingThread thread = new WatchingThread(status);
+		thread.start();
+	
+		synchronized(this) {
+			wait(3000);
+		}
+	
+		status.changeStatus(Status.SUCCEEDED);
+		thread.join();
+		
+		System.out.println("Diff " + thread.getDiff());
+		Assert.assertTrue(thread.getDiff() >= 3000 && thread.getDiff() < 3100);
+	}
+	
+	@Test
+	public void testUnfinishedBlockSeveralChanges() throws InterruptedException {
+		BlockingStatus status = new BlockingStatus(1, "test", Status.QUEUED);
+		
+		WatchingThread thread = new WatchingThread(status);
+		thread.start();
+	
+		synchronized(this) {
+			wait(3000);
+		}
+	
+		status.changeStatus(Status.PAUSED);
+		
+		synchronized(this) {
+			wait(1000);
+		}
+		
+		status.changeStatus(Status.FAILED);
+		
+		thread.join(1000);
+		
+		System.out.println("Diff " + thread.getDiff());
+		Assert.assertTrue(thread.getDiff() >= 4000 && thread.getDiff() < 4100);
+	}
+	
+	@Test
+	public void testMultipleWatchers() throws InterruptedException {
+		BlockingStatus status = new BlockingStatus(1, "test", Status.QUEUED);
+		
+		WatchingThread thread1 = new WatchingThread(status);
+		thread1.start();
+
+		synchronized(this) {
+			wait(2000);
+		}
+	
+		WatchingThread thread2 = new WatchingThread(status);
+		thread2.start();
+		
+		synchronized(this) {
+			wait(2000);
+		}
+		
+		status.changeStatus(Status.FAILED);
+		thread2.join(1000);
+		thread1.join(1000);
+		
+		System.out.println("Diff thread 1 " + thread1.getDiff());
+		System.out.println("Diff thread 2 " + thread2.getDiff());
+		Assert.assertTrue(thread1.getDiff() >= 4000 && thread1.getDiff() < 4100);
+		Assert.assertTrue(thread2.getDiff() >= 2000 && thread2.getDiff() < 2100);
+	}
+}
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
new file mode 100644
index 0000000..df5f5a6
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -0,0 +1,234 @@
+package azkaban.test.execapp.event;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.EventCollectorListener;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.test.execapp.MockProjectLoader;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.JSONUtils;
+
+public class LocalFlowWatcherTest {
+	private File workingDir;
+	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
+	private int dirVal= 0;
+	
+	@Before
+	public void setUp() throws Exception {
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.registerJobType("java", JavaJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+	}
+	
+	public File setupDirectory() throws IOException {
+		System.out.println("Create temp dir");
+		File workingDir = new File("_AzkabanTestDir_" + dirVal );
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+		dirVal++;
+		
+		return workingDir;
+	}
+	
+	@Test
+	public void testBasicLocalFlowWatcher() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+
+		File workingDir1 = setupDirectory();
+		FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+		Thread runner1Thread = new Thread(runner1);
+		
+		File workingDir2 = setupDirectory();
+		LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
+		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 2);
+		Thread runner2Thread = new Thread(runner2);
+		
+		runner1Thread.start();
+		runner2Thread.start();
+		runner2Thread.join();
+		
+		FileUtils.deleteDirectory(workingDir1);
+		FileUtils.deleteDirectory(workingDir2);
+		
+		testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+	}
+	
+	@Test
+	public void testLevel1LocalFlowWatcher() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+
+		File workingDir1 = setupDirectory();
+		FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+		Thread runner1Thread = new Thread(runner1);
+		
+		File workingDir2 = setupDirectory();
+		LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
+		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 1);
+		Thread runner2Thread = new Thread(runner2);
+		
+		runner1Thread.start();
+		runner2Thread.start();
+		runner2Thread.join();
+		
+		FileUtils.deleteDirectory(workingDir1);
+		FileUtils.deleteDirectory(workingDir2);
+		
+		testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+	}
+	
+	@Test
+	public void testLevel2DiffLocalFlowWatcher() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+
+		File workingDir1 = setupDirectory();
+		FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+		Thread runner1Thread = new Thread(runner1);
+		
+		File workingDir2 = setupDirectory();
+		LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
+		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2, watcher, 1);
+		Thread runner2Thread = new Thread(runner2);
+		
+		runner1Thread.start();
+		runner2Thread.start();
+		runner2Thread.join();
+		
+		FileUtils.deleteDirectory(workingDir1);
+		FileUtils.deleteDirectory(workingDir2);
+		
+		testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+	}
+	
+	private void testPipelineLevel1(ExecutableFlow first, ExecutableFlow second) {
+		for (ExecutableNode node: second.getExecutableNodes()) {
+			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+			
+			// check it's start time is after the first's children.
+			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			if (watchedNode == null) {
+				continue;
+			}
+			Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+			
+			System.out.println("Node " + node.getJobId() + 
+					" start: " + node.getStartTime() + 
+					" dependent on " + watchedNode.getJobId() + 
+					" " + watchedNode.getEndTime() + 
+					" diff: " + (node.getStartTime() - watchedNode.getEndTime()));
+
+			Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
+			
+			long minParentDiff = 0;
+			if (node.getInNodes().size() > 0) {
+				minParentDiff = Long.MAX_VALUE;
+				for (String dependency: node.getInNodes()) {
+					ExecutableNode parent = second.getExecutableNode(dependency);
+					long diff = node.getStartTime() - parent.getEndTime();
+					minParentDiff = Math.min(minParentDiff, diff);
+				}
+			}
+			long diff = node.getStartTime() - watchedNode.getEndTime();
+			System.out.println("   minPipelineTimeDiff:" + diff + " minDependencyTimeDiff:" + minParentDiff);
+			Assert.assertTrue(minParentDiff < 100 || diff < 100);
+		}
+	}
+	
+	private void testPipelineLevel2(ExecutableFlow first, ExecutableFlow second) {
+		for (ExecutableNode node: second.getExecutableNodes()) {
+			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+			
+			// check it's start time is after the first's children.
+			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			if (watchedNode == null) {
+				continue;
+			}
+			Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+			
+			long minDiff = Long.MAX_VALUE;
+			for (String watchedChild: watchedNode.getOutNodes()) {
+				ExecutableNode child = first.getExecutableNode(watchedChild);
+				if (child == null) {
+					continue;
+				}
+				Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+				long diff = node.getStartTime() - child.getEndTime();
+				minDiff = Math.min(minDiff, diff);
+				System.out.println("Node " + node.getJobId() + 
+						" start: " + node.getStartTime() + 
+						" dependent on " + watchedChild + " " + child.getEndTime() +
+						" diff: " + diff);
+				Assert.assertTrue(node.getStartTime() >= child.getEndTime());
+			}
+			
+			long minParentDiff = Long.MAX_VALUE;
+			for (String dependency: node.getInNodes()) {
+				ExecutableNode parent = second.getExecutableNode(dependency);
+				long diff = node.getStartTime() - parent.getEndTime();
+				minParentDiff = Math.min(minParentDiff, diff);
+			}
+			System.out.println("   minPipelineTimeDiff:" + minDiff + " minDependencyTimeDiff:" + minParentDiff);
+			Assert.assertTrue(minParentDiff < 100 || minDiff < 100);
+		}
+	}
+	
+	private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+		if (watcher != null) {
+			exFlow.setPipelineLevel(pipeline);
+			exFlow.setPipelineExecutionId(watcher.getExecId());
+		}
+		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
+		
+		loader.uploadExecutableFlow(exFlow);
+		FlowRunner runner = new FlowRunner(exFlow, watcher,  loader, fakeProjectLoader, jobtypeManager);
+		runner.addListener(eventCollector);
+		
+		return runner;
+	}
+	
+	private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
+		FileUtils.copyDirectory(execDir, workingDir);
+		
+		File jsonFlowFile = new File(workingDir, flowName + ".flow");
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+		
+		Flow flow = Flow.flowFromObject(flowObj);
+		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		execFlow.setExecutionId(execId);
+		execFlow.setExecutionPath(workingDir.getPath());
+		return execFlow;
+	}
+}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
new file mode 100644
index 0000000..78aac81
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -0,0 +1,234 @@
+package azkaban.test.execapp.event;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.RemoteFlowWatcher;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.EventCollectorListener;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.test.execapp.MockProjectLoader;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.JSONUtils;
+
+public class RemoteFlowWatcherTest {
+	private File workingDir;
+	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
+	private int dirVal= 0;
+	
+	@Before
+	public void setUp() throws Exception {
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.registerJobType("java", JavaJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+	}
+	
+	public File setupDirectory() throws IOException {
+		System.out.println("Create temp dir");
+		File workingDir = new File("_AzkabanTestDir_" + dirVal );
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+		dirVal++;
+		
+		return workingDir;
+	}
+	
+	@Test
+	public void testBasicRemoteFlowWatcher() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+
+		File workingDir1 = setupDirectory();
+		FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+		Thread runner1Thread = new Thread(runner1);
+		
+		File workingDir2 = setupDirectory();
+		RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 2);
+		Thread runner2Thread = new Thread(runner2);
+		
+		runner1Thread.start();
+		runner2Thread.start();
+		runner2Thread.join();
+		
+		FileUtils.deleteDirectory(workingDir1);
+		FileUtils.deleteDirectory(workingDir2);
+		
+		testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+	}
+	
+	@Test
+	public void testLevel1RemoteFlowWatcher() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+
+		File workingDir1 = setupDirectory();
+		FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+		Thread runner1Thread = new Thread(runner1);
+		
+		File workingDir2 = setupDirectory();
+		RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 1);
+		Thread runner2Thread = new Thread(runner2);
+		
+		runner1Thread.start();
+		runner2Thread.start();
+		runner2Thread.join();
+		
+		FileUtils.deleteDirectory(workingDir1);
+		FileUtils.deleteDirectory(workingDir2);
+		
+		testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+	}
+	
+	@Test
+	public void testLevel2DiffRemoteFlowWatcher() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+
+		File workingDir1 = setupDirectory();
+		FlowRunner runner1 = createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null, null);
+		Thread runner1Thread = new Thread(runner1);
+		
+		File workingDir2 = setupDirectory();
+		
+		RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2, watcher, 1);
+		Thread runner2Thread = new Thread(runner2);
+		
+		runner1Thread.start();
+		runner2Thread.start();
+		runner2Thread.join();
+		
+		FileUtils.deleteDirectory(workingDir1);
+		FileUtils.deleteDirectory(workingDir2);
+		
+		testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+	}
+	
+	private void testPipelineLevel1(ExecutableFlow first, ExecutableFlow second) {
+		for (ExecutableNode node: second.getExecutableNodes()) {
+			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+			
+			// check it's start time is after the first's children.
+			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			if (watchedNode == null) {
+				continue;
+			}
+			Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+			
+			System.out.println("Node " + node.getJobId() + 
+					" start: " + node.getStartTime() + 
+					" dependent on " + watchedNode.getJobId() + 
+					" " + watchedNode.getEndTime() + 
+					" diff: " + (node.getStartTime() - watchedNode.getEndTime()));
+
+			Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
+			
+			long minParentDiff = 0;
+			if (node.getInNodes().size() > 0) {
+				minParentDiff = Long.MAX_VALUE;
+				for (String dependency: node.getInNodes()) {
+					ExecutableNode parent = second.getExecutableNode(dependency);
+					long diff = node.getStartTime() - parent.getEndTime();
+					minParentDiff = Math.min(minParentDiff, diff);
+				}
+			}
+			long diff = node.getStartTime() - watchedNode.getEndTime();
+			Assert.assertTrue(minParentDiff < 500 || diff < 500);
+		}
+	}
+	
+	private void testPipelineLevel2(ExecutableFlow first, ExecutableFlow second) {
+		for (ExecutableNode node: second.getExecutableNodes()) {
+			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+			
+			// check it's start time is after the first's children.
+			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			if (watchedNode == null) {
+				continue;
+			}
+			Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+			
+			long minDiff = Long.MAX_VALUE;
+			for (String watchedChild: watchedNode.getOutNodes()) {
+				ExecutableNode child = first.getExecutableNode(watchedChild);
+				if (child == null) {
+					continue;
+				}
+				Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+				long diff = node.getStartTime() - child.getEndTime();
+				minDiff = Math.min(minDiff, diff);
+				System.out.println("Node " + node.getJobId() + 
+						" start: " + node.getStartTime() + 
+						" dependent on " + watchedChild + " " + child.getEndTime() +
+						" diff: " + diff);
+				Assert.assertTrue(node.getStartTime() >= child.getEndTime());
+			}
+			
+			long minParentDiff = Long.MAX_VALUE;
+			for (String dependency: node.getInNodes()) {
+				ExecutableNode parent = second.getExecutableNode(dependency);
+				long diff = node.getStartTime() - parent.getEndTime();
+				minParentDiff = Math.min(minParentDiff, diff);
+			}
+			System.out.println("   minPipelineTimeDiff:" + minDiff + " minDependencyTimeDiff:" + minParentDiff);
+			Assert.assertTrue(minParentDiff < 500 || minDiff < 500);
+		}
+	}
+	
+	private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+		if (watcher != null) {
+			exFlow.setPipelineLevel(pipeline);
+			exFlow.setPipelineExecutionId(watcher.getExecId());
+		}
+		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
+		
+		loader.uploadExecutableFlow(exFlow);
+		FlowRunner runner = new FlowRunner(exFlow, watcher,  loader, fakeProjectLoader, jobtypeManager);
+		runner.addListener(eventCollector);
+		
+		return runner;
+	}
+	
+	private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
+		FileUtils.copyDirectory(execDir, workingDir);
+		
+		File jsonFlowFile = new File(workingDir, flowName + ".flow");
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+		
+		Flow flow = Flow.flowFromObject(flowObj);
+		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		execFlow.setExecutionId(execId);
+		execFlow.setExecutionPath(workingDir.getPath());
+		return execFlow;
+	}
+}
diff --git a/unit/java/azkaban/test/execapp/EventCollectorListener.java b/unit/java/azkaban/test/execapp/EventCollectorListener.java
index 1dde864..ba763d1 100644
--- a/unit/java/azkaban/test/execapp/EventCollectorListener.java
+++ b/unit/java/azkaban/test/execapp/EventCollectorListener.java
@@ -1,6 +1,8 @@
 package azkaban.test.execapp;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 
 import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
@@ -8,10 +10,17 @@ import azkaban.execapp.event.EventListener;
 
 public class EventCollectorListener implements EventListener {
 	private ArrayList<Event> eventList = new ArrayList<Event>();
+	private HashSet<Event.Type> filterOutTypes = new HashSet<Event.Type>();
+	
+	public void setEventFilterOut(Event.Type ... types) {
+		filterOutTypes.addAll(Arrays.asList(types)); 
+	}
 	
 	@Override
 	public void handleEvent(Event event) {
-		eventList.add(event);
+		if (!filterOutTypes.contains(event.getType())) {
+			eventList.add(event);
+		}
 	}
 
 	public ArrayList<Event> getEventList() {
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9a9d80c..c63fb11 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -12,12 +12,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
 
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
@@ -61,6 +62,7 @@ public class FlowRunnerTest {
 		//just making compile. may not work at all.
 		
 		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
 		Assert.assertTrue(!runner.isCancelled());
@@ -94,6 +96,7 @@ public class FlowRunnerTest {
 	public void exec1Disabled() throws Exception {
 		MockExecutorLoader loader = new MockExecutorLoader();
 		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow exFlow = prepareExecDir(testDir, "exec1", 1);
 		
@@ -139,6 +142,7 @@ public class FlowRunnerTest {
 	public void exec1Failed() throws Exception {
 		MockExecutorLoader loader = new MockExecutorLoader();
 		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
 		
@@ -174,6 +178,7 @@ public class FlowRunnerTest {
 	public void exec1FailedKillAll() throws Exception {
 		MockExecutorLoader loader = new MockExecutorLoader();
 		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
 		flow.setFailureAction(FailureAction.CANCEL_ALL);
@@ -212,6 +217,7 @@ public class FlowRunnerTest {
 	public void exec1FailedFinishRest() throws Exception {
 		MockExecutorLoader loader = new MockExecutorLoader();
 		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow flow = prepareExecDir(testDir, "exec3", 1);
 		flow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
@@ -247,6 +253,7 @@ public class FlowRunnerTest {
 	public void execAndCancel() throws Exception {
 		MockExecutorLoader loader = new MockExecutorLoader();
 		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
 		Assert.assertTrue(!runner.isCancelled());
@@ -354,7 +361,7 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(flow);
-		FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(flow, null, loader, fakeProjectLoader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
@@ -366,7 +373,7 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(exFlow, null,  loader, fakeProjectLoader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 41d44f3..e4d305c 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -16,8 +16,8 @@ import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
 import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.test.executor.JavaJob;
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ba673dd..56ffcc4 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -27,7 +27,7 @@ import azkaban.executor.ExecutionReference;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.JdbcExecutorLoader;
-import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.Status;
 import azkaban.flow.Flow;
 
 import azkaban.utils.DataSourceUtils;