azkaban-aplcache

Azkaban event handler move.

8/15/2012 7:20:31 PM

Details

diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
new file mode 100644
index 0000000..d208dc4
--- /dev/null
+++ b/src/java/azkaban/executor/event/Event.java
@@ -0,0 +1,41 @@
+package azkaban.executor.event;
+
+import azkaban.executor.FlowRunner;
+
+public class Event {
+	public enum Type {
+		FLOW_STARTED,
+		FLOW_FINISHED,
+		JOB_STARTED,
+		JOB_COMPLETE,
+		JOB_FAILED
+	}
+	
+	private final FlowRunner runner;
+	private final Type type;
+	private final Object eventData;
+	private final long time;
+	
+	public Event(FlowRunner runner, Type type, Object eventData) {
+		this.runner = runner;
+		this.type = type;
+		this.eventData = eventData;
+		this.time = System.currentTimeMillis();
+	}
+	
+	public FlowRunner getFlowRunner() {
+		return runner;
+	}
+	
+	public Type getType() {
+		return type;
+	}
+	
+	public long getTime() {
+		return time;
+	}
+	
+	public Object getData() {
+		return eventData;
+	}
+}
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index f63adc5..e5e03e8 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -1,16 +1,32 @@
 package azkaban.executor;
 
-public class FlowRunner implements Runnable {
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.event.EventHandler;
+
+public class FlowRunner extends EventHandler implements Runnable {
+	public static final int NUM_CONCURRENT_THREADS = 10;
+
 	private ExecutableFlow flow;
 	private FlowRunnerManager manager;
+	private ExecutorService executorService;
+	private int numThreads = NUM_CONCURRENT_THREADS;
 	
 	public FlowRunner(ExecutableFlow flow) {
 		this.flow = flow;
 		this.manager = manager;
+		this.executorService = Executors.newFixedThreadPool(numThreads);
 	}
 	
 	@Override
 	public void run() {
+		this.fireEventListeners(new Event(this, Type.FLOW_STARTED, null));
+		
+		
 		
+		this.fireEventListeners(new Event(this, Type.FLOW_FINISHED, null));
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index f8731d6..375348b 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -9,6 +9,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
 
+import azkaban.executor.event.Event;
+import azkaban.executor.event.EventListener;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
 
@@ -27,11 +29,13 @@ public class FlowRunnerManager {
 
 	private ExecutorService executorService;
 	private SubmitterThread submitterThread;
+	private FlowRunnerEventListener eventListener;
 	
 	public FlowRunnerManager(Props props) {
 		basePath = new File(props.getString("execution.directory"));
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
+		eventListener = new FlowRunnerEventListener(this);
 		
 		submitterThread = new SubmitterThread(queue);
 		submitterThread.start();
@@ -41,7 +45,10 @@ public class FlowRunnerManager {
 		// Load file and submit
 		File dir = new File(path);
 		ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
+		
 		FlowRunner runner = new FlowRunner(flow);
+		runner.addListener(eventListener);
+		executorService.submit(runner);
 	}
 	
 	//
@@ -70,4 +77,17 @@ public class FlowRunnerManager {
 			}
 		}
 	}
+	
+	private class FlowRunnerEventListener implements EventListener {
+		private FlowRunnerManager manager;
+		
+		public FlowRunnerEventListener(FlowRunnerManager manager) {
+			this.manager = manager;
+		}
+
+		@Override
+		public synchronized void handleEvent(Event event) {
+			
+		}
+	}
 }