azkaban-aplcache
Changes
src/java/azkaban/executor/event/Event.java 41(+41 -0)
src/java/azkaban/executor/FlowRunner.java 18(+17 -1)
Details
src/java/azkaban/executor/event/Event.java 41(+41 -0)
diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
new file mode 100644
index 0000000..d208dc4
--- /dev/null
+++ b/src/java/azkaban/executor/event/Event.java
@@ -0,0 +1,41 @@
+package azkaban.executor.event;
+
+import azkaban.executor.FlowRunner;
+
+public class Event {
+ public enum Type {
+ FLOW_STARTED,
+ FLOW_FINISHED,
+ JOB_STARTED,
+ JOB_COMPLETE,
+ JOB_FAILED
+ }
+
+ private final FlowRunner runner;
+ private final Type type;
+ private final Object eventData;
+ private final long time;
+
+ public Event(FlowRunner runner, Type type, Object eventData) {
+ this.runner = runner;
+ this.type = type;
+ this.eventData = eventData;
+ this.time = System.currentTimeMillis();
+ }
+
+ public FlowRunner getFlowRunner() {
+ return runner;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public Object getData() {
+ return eventData;
+ }
+}
src/java/azkaban/executor/FlowRunner.java 18(+17 -1)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index f63adc5..e5e03e8 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -1,16 +1,32 @@
package azkaban.executor;
-public class FlowRunner implements Runnable {
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.event.EventHandler;
+
+public class FlowRunner extends EventHandler implements Runnable {
+ public static final int NUM_CONCURRENT_THREADS = 10;
+
private ExecutableFlow flow;
private FlowRunnerManager manager;
+ private ExecutorService executorService;
+ private int numThreads = NUM_CONCURRENT_THREADS;
public FlowRunner(ExecutableFlow flow) {
this.flow = flow;
this.manager = manager;
+ this.executorService = Executors.newFixedThreadPool(numThreads);
}
@Override
public void run() {
+ this.fireEventListeners(new Event(this, Type.FLOW_STARTED, null));
+
+
+ this.fireEventListeners(new Event(this, Type.FLOW_FINISHED, null));
}
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index f8731d6..375348b 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -9,6 +9,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
+import azkaban.executor.event.Event;
+import azkaban.executor.event.EventListener;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
@@ -27,11 +29,13 @@ public class FlowRunnerManager {
private ExecutorService executorService;
private SubmitterThread submitterThread;
+ private FlowRunnerEventListener eventListener;
public FlowRunnerManager(Props props) {
basePath = new File(props.getString("execution.directory"));
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
+ eventListener = new FlowRunnerEventListener(this);
submitterThread = new SubmitterThread(queue);
submitterThread.start();
@@ -41,7 +45,10 @@ public class FlowRunnerManager {
// Load file and submit
File dir = new File(path);
ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
+
FlowRunner runner = new FlowRunner(flow);
+ runner.addListener(eventListener);
+ executorService.submit(runner);
}
//
@@ -70,4 +77,17 @@ public class FlowRunnerManager {
}
}
}
+
+ private class FlowRunnerEventListener implements EventListener {
+ private FlowRunnerManager manager;
+
+ public FlowRunnerEventListener(FlowRunnerManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public synchronized void handleEvent(Event event) {
+
+ }
+ }
}