azkaban-aplcache
Changes
src/java/azkaban/executor/event/Event.java 22(+15 -7)
src/java/azkaban/executor/FlowRunner.java 227(+223 -4)
src/java/azkaban/executor/JobRunner.java 61(+61 -0)
src/java/azkaban/utils/Props.java 8(+8 -0)
Details
src/java/azkaban/executor/event/Event.java 22(+15 -7)
diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
index d208dc4..21b1ec7 100644
--- a/src/java/azkaban/executor/event/Event.java
+++ b/src/java/azkaban/executor/event/Event.java
@@ -1,29 +1,29 @@
package azkaban.executor.event;
-import azkaban.executor.FlowRunner;
-
public class Event {
public enum Type {
FLOW_STARTED,
FLOW_FINISHED,
+ FLOW_FAILED_FINISHING,
JOB_STARTED,
- JOB_COMPLETE,
- JOB_FAILED
+ JOB_SUCCEEDED,
+ JOB_FAILED,
+ JOB_KILLED
}
- private final FlowRunner runner;
+ private final Object runner;
private final Type type;
private final Object eventData;
private final long time;
- public Event(FlowRunner runner, Type type, Object eventData) {
+ private Event(Object runner, Type type, Object eventData) {
this.runner = runner;
this.type = type;
this.eventData = eventData;
this.time = System.currentTimeMillis();
}
- public FlowRunner getFlowRunner() {
+ public Object getRunner() {
return runner;
}
@@ -38,4 +38,12 @@ public class Event {
public Object getData() {
return eventData;
}
+
+ public static Event create(Object runner, Type type) {
+ return new Event(runner, type, null);
+ }
+
+ public static Event create(Object runner, Type type, Object eventData) {
+ return new Event(runner, type, eventData);
+ }
}
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 9e37355..997d5fc 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -1,6 +1,7 @@
package azkaban.executor;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -11,13 +12,13 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
-import azkaban.utils.Props;
public class ExecutableFlow {
private String executionId;
private String flowId;
private String projectId;
private String executionPath;
+
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
private ArrayList<String> startNodes = new ArrayList<String>();
@@ -31,7 +32,7 @@ public class ExecutableFlow {
private String submitUser;
public enum Status {
- FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY, UNKNOWN
+ FAILED, SUCCEEDED, RUNNING, WAITING, KILLED, IGNORED, READY, UNKNOWN
}
public ExecutableFlow(String id, Flow flow) {
@@ -49,6 +50,14 @@ public class ExecutableFlow {
return new ArrayList<ExecutableNode>(executableNodes.values());
}
+ public ExecutableNode getExecutableNode(String id) {
+ return executableNodes.get(id);
+ }
+
+ public Collection<FlowProps> getFlowProps() {
+ return flowProps.values();
+ }
+
private void setFlow(Flow flow) {
for (Node node: flow.getNodes()) {
String id = node.getId();
@@ -72,10 +81,6 @@ public class ExecutableFlow {
flowProps.putAll(flow.getAllFlowProps());
}
-
- public void run() {
-
- }
public void setStatus(String nodeId, Status status) {
ExecutableNode exNode = executableNodes.get(nodeId);
@@ -146,6 +151,10 @@ public class ExecutableFlow {
this.flowStatus = flowStatus;
}
+ public List<String> getStartNodes() {
+ return new ArrayList<String>(startNodes);
+ }
+
public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
@@ -158,6 +167,21 @@ public class ExecutableFlow {
flowObj.put("endTime", endTime);
flowObj.put("status", flowStatus.toString());
flowObj.put("submitUser", submitUser);
+ flowObj.put("startNodes", startNodes);
+
+ ArrayList<Object> props = new ArrayList<Object>();
+ for (FlowProps fprop: flowProps.values()) {
+ HashMap<String, Object> propObj = new HashMap<String, Object>();
+ String source = fprop.getSource();
+ String inheritedSource = fprop.getInheritedSource();
+
+ propObj.put("source", source);
+ if (inheritedSource != null) {
+ propObj.put("inherited", inheritedSource);
+ }
+ props.add(propObj);
+ }
+ flowObj.put("properties", props);
ArrayList<Object> nodes = new ArrayList<Object>();
for (ExecutableNode node: executableNodes.values()) {
@@ -188,6 +212,18 @@ public class ExecutableFlow {
ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj);
exFlow.executableNodes.put(node.getId(), node);
}
+
+ List<Object> properties = (List<Object>)flowObj.get("properties");
+ for (Object propNode : properties) {
+ HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
+ String source = (String)fprop.get("source");
+ String inheritedSource = (String)fprop.get("inherited");
+
+ FlowProps flowProps = new FlowProps(inheritedSource, source);
+ exFlow.flowProps.put(source, flowProps);
+ }
+
+ exFlow.startNodes.addAll((List<String>)flowObj.get("startNodes"));
return exFlow;
}
@@ -248,6 +284,7 @@ public class ExecutableFlow {
private String type;
private String jobPropsSource;
private String inheritPropsSource;
+ private String outputPropsSource;
private Status status = Status.READY;
private long startTime = -1;
private long endTime = -1;
@@ -313,6 +350,11 @@ public class ExecutableFlow {
objMap.put("startTime", startTime);
objMap.put("endTime", endTime);
objMap.put("level", level);
+
+ if (outputPropsSource != null) {
+ objMap.put("outputSource", outputPropsSource);
+ }
+
return objMap;
}
@@ -324,6 +366,7 @@ public class ExecutableFlow {
exNode.id = (String)objMap.get("id");
exNode.jobPropsSource = (String)objMap.get("jobSource");
exNode.inheritPropsSource = (String)objMap.get("propSource");
+ exNode.outputPropsSource = (String)objMap.get("outputSource");
exNode.type = (String)objMap.get("jobType");
exNode.status = Status.valueOf((String)objMap.get("status"));
@@ -337,7 +380,7 @@ public class ExecutableFlow {
return exNode;
}
- @SuppressWarnings("unused")
+ @SuppressWarnings("unchecked")
public void updateNodeFromObject(Object obj) {
HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
status = Status.valueOf((String)objMap.get("status"));
src/java/azkaban/executor/FlowRunner.java 227(+223 -4)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index e5e03e8..26a9ff2 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -1,32 +1,251 @@
package azkaban.executor;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
+import azkaban.executor.event.EventListener;
+import azkaban.flow.FlowProps;
+import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
public static final int NUM_CONCURRENT_THREADS = 10;
private ExecutableFlow flow;
- private FlowRunnerManager manager;
private ExecutorService executorService;
+ private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
private int numThreads = NUM_CONCURRENT_THREADS;
+ private boolean cancelled = true;
+ private boolean done = false;
+
+ private Map<String, JobRunner> jobRunnersMap;
+ private JobRunnerEventListener listener;
+ private Map<String, Props> sharedProps = new HashMap<String, Props>();
+ private Map<String, Props> outputProps = new HashMap<String, Props>();
+ private File basePath;
+
+ public enum FailedFlowOptions {
+ FINISH_RUNNING_JOBS,
+ COMPLETE_ALL_DEPENDENCIES,
+ CANCEL_ALL
+ }
+
+ private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
public FlowRunner(ExecutableFlow flow) {
this.flow = flow;
- this.manager = manager;
+ this.basePath = new File(flow.getExecutionPath());
this.executorService = Executors.newFixedThreadPool(numThreads);
+ this.jobRunnersMap = new HashMap<String, JobRunner>();
+ this.listener = new JobRunnerEventListener(this);
+ }
+
+ public ExecutableFlow getFlow() {
+ return flow;
+ }
+
+ public void cancel() {
+ done = true;
+ cancelled = true;
+
+ executorService.shutdownNow();
+
+ // Loop through job runners
+ for (JobRunner runner: jobRunnersMap.values()) {
+ if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
+ runner.cancel();
+ }
+ }
+
+ this.notify();
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
}
@Override
public void run() {
- this.fireEventListeners(new Event(this, Type.FLOW_STARTED, null));
+ this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
+ // Load all shared props
+ try {
+ loadAllProperties(flow);
+ }
+ catch (IOException e) {
+ flow.setStatus(Status.FAILED);
+ System.err.println("Failed due to " + e.getMessage());
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ return;
+ }
+
+ // Set up starting nodes
+ try {
+ for (String startNode: flow.getStartNodes()) {
+ ExecutableNode node = flow.getExecutableNode(startNode);
+ JobRunner jobRunner = createJobRunner(node, null);
+ jobsToRun.add(jobRunner);
+ }
+ } catch (IOException e) {
+ System.err.println("Failed due to " + e.getMessage());
+ flow.setStatus(Status.FAILED);
+ jobsToRun.clear();
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ return;
+ }
+ while(!done) {
+ JobRunner runner = null;
+ try {
+ runner = jobsToRun.take();
+ } catch (InterruptedException e) {
+ }
+
+ if (!done && runner != null) {
+ executorService.submit(runner);
+ }
+ }
- this.fireEventListeners(new Event(this, Type.FLOW_FINISHED, null));
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ }
+
+ private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
+ String source = node.getJobPropsSource();
+ String propsSource = node.getPropsSource();
+
+ Props parentProps = propsSource == null ? null : sharedProps.get(propsSource);
+
+ // We add the previous job output and put into this props.
+ if (previousOutput != null) {
+ Props earliestParent = previousOutput.getEarliestAncestor();
+ earliestParent.setParent(parentProps);
+
+ parentProps = earliestParent;
+ }
+
+ File propsFile = new File(basePath, source);
+ Props jobProps = new Props(parentProps, propsFile);
+
+ JobRunner jobRunner = new JobRunner(node, jobProps);
+ jobRunner.addListener(listener);
+ jobRunnersMap.put(node.getId(), jobRunner);
+
+ return jobRunner;
+ }
+
+ private void loadAllProperties(ExecutableFlow flow) throws IOException {
+ // First load all the properties
+ for (FlowProps fprops: flow.getFlowProps()) {
+ String source = fprops.getSource();
+ File propsFile = new File(basePath, source);
+
+ Props props = new Props(null, propsFile);
+ sharedProps.put(source, props);
+ }
+
+ // Resolve parents
+ for (FlowProps fprops: flow.getFlowProps()) {
+ if (fprops.getInheritedSource() != null) {
+ String source = fprops.getSource();
+ String inherit = fprops.getInheritedSource();
+
+ Props props = sharedProps.get(source);
+ Props inherits = sharedProps.get(inherit);
+
+ props.setParent(inherits);
+ }
+ }
+ }
+
+ private void handleSucceededJob(ExecutableNode node) {
+ for(String dependent: node.getOutNodes()) {
+ ExecutableNode dependentNode = flow.getExecutableNode(dependent);
+
+ // Check all dependencies
+ boolean ready = true;
+ for (String dependency: dependentNode.getInNodes()) {
+ ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+ if (dependencyNode.getStatus() != Status.SUCCEEDED &&
+ dependencyNode.getStatus() != Status.IGNORED) {
+ ready = false;
+ break;
+ }
+ }
+
+ if (ready) {
+ Props previousOutput = null;
+ // Iterate the in nodes again and create the dependencies
+ for (String dependency: node.getInNodes()) {
+ Props output = outputProps.get(dependency);
+ if (output != null) {
+ output = Props.clone(output);
+
+ output.setParent(previousOutput);
+ previousOutput = output;
+ }
+ }
+
+ JobRunner runner = null;
+ try {
+ runner = this.createJobRunner(dependentNode, previousOutput);
+ } catch (IOException e) {
+ System.err.println("Failed due to " + e.getMessage());
+ dependentNode.setStatus(Status.FAILED);
+ handleFailedJob(dependentNode);
+ return;
+ }
+
+ jobsToRun.add(runner);
+ }
+ }
+ }
+
+ private void handleFailedJob(ExecutableNode node) {
+ System.err.println("Job " + node.getId() + " failed.");
+ this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
+ if (failedOptions == FailedFlowOptions.FINISH_RUNNING_JOBS) {
+ done = true;
+ }
+ else if (failedOptions == FailedFlowOptions.CANCEL_ALL) {
+ this.cancel();
+ }
+ else if (failedOptions == FailedFlowOptions.COMPLETE_ALL_DEPENDENCIES) {
+ }
+ }
+
+ private class JobRunnerEventListener implements EventListener {
+ private FlowRunner flowRunner;
+
+ public JobRunnerEventListener(FlowRunner flowRunner) {
+ this.flowRunner = flowRunner;
+ }
+
+ @Override
+ public synchronized void handleEvent(Event event) {
+ JobRunner runner = (JobRunner)event.getRunner();
+ String jobID = runner.getNode().getId();
+ System.out.println("Event " + jobID + " " + event.getType().toString());
+
+ if (event.getType() == Type.JOB_SUCCEEDED) {
+ // Continue adding items.
+ Props props = runner.getOutputProps();
+ outputProps.put(jobID, props);
+
+ flowRunner.handleSucceededJob(runner.getNode());
+ }
+ else if (event.getType() == Type.JOB_FAILED) {
+ flowRunner.handleFailedJob(runner.getNode());
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 375348b..f07af9d 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -43,15 +43,28 @@ public class FlowRunnerManager {
public void submitFlow(String id, String path) throws ExecutorManagerException {
// Load file and submit
+ logger.info("Flow " + id + " submitted with path " + path);
+
File dir = new File(path);
ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
-
+ flow.setExecutionPath(path);
+
FlowRunner runner = new FlowRunner(flow);
runner.addListener(eventListener);
executorService.submit(runner);
}
- //
+ public void cancelFlow(String id) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(id);
+ if (runner != null) {
+ runner.cancel();
+ }
+ }
+
+ public FlowRunner getFlowRunner(String id) {
+ return runningFlows.get(id);
+ }
+
private class SubmitterThread extends Thread {
private BlockingQueue<FlowRunner> queue;
private boolean shutdown = false;
@@ -87,7 +100,9 @@ public class FlowRunnerManager {
@Override
public synchronized void handleEvent(Event event) {
-
+ FlowRunner runner = (FlowRunner)event.getRunner();
+ ExecutableFlow flow = runner.getFlow();
+ System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
}
}
}
src/java/azkaban/executor/JobRunner.java 61(+61 -0)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
new file mode 100644
index 0000000..7e2d45a
--- /dev/null
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -0,0 +1,61 @@
+package azkaban.executor;
+
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.event.EventHandler;
+import azkaban.utils.Props;
+
+public class JobRunner extends EventHandler implements Runnable {
+ private Props props;
+ private Props outputProps;
+ private ExecutableNode node;
+
+ public JobRunner(ExecutableNode node, Props props) {
+ this.props = props;
+ this.node = node;
+ this.node.setStatus(Status.WAITING);
+ }
+
+ public ExecutableNode getNode() {
+ return node;
+ }
+
+ @Override
+ public void run() {
+ if (node.getStatus() == Status.KILLED) {
+ this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
+ return;
+ }
+
+ this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+ node.setStatus(Status.RUNNING);
+
+ // Run Job
+ boolean succeeded = true;
+
+ if (succeeded) {
+ node.setStatus(Status.SUCCEEDED);
+ this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
+ }
+ else {
+ node.setStatus(Status.FAILED);
+ this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
+ }
+ }
+
+ public void cancel() {
+ // Cancel code here
+
+ node.setStatus(Status.KILLED);
+ }
+
+ public Status getStatus() {
+ return node.getStatus();
+ }
+
+ public Props getOutputProps() {
+ return outputProps;
+ }
+}
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 8afd7a6..613f3d4 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -34,6 +34,7 @@ public class ExecutableFlowLoader {
}
ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+ flow.setExecutionPath(exDir.getPath());
return flow;
}
src/java/azkaban/utils/Props.java 8(+8 -0)
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 6dd0a5e..4b16b9c 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -157,6 +157,14 @@ public class Props {
}
}
+ public Props getEarliestAncestor() {
+ if (_parent == null) {
+ return this;
+ }
+
+ return _parent.getEarliestAncestor();
+ }
+
/**
* Create a Props with a null parent from a list of key value pairing. i.e.
* [key1, value1, key2, value2 ...]