azkaban-aplcache

Flow execution stuff. Sweet.

8/16/2012 1:56:39 AM

Details

diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
index d208dc4..21b1ec7 100644
--- a/src/java/azkaban/executor/event/Event.java
+++ b/src/java/azkaban/executor/event/Event.java
@@ -1,29 +1,29 @@
 package azkaban.executor.event;
 
-import azkaban.executor.FlowRunner;
-
 public class Event {
 	public enum Type {
 		FLOW_STARTED,
 		FLOW_FINISHED,
+		FLOW_FAILED_FINISHING,
 		JOB_STARTED,
-		JOB_COMPLETE,
-		JOB_FAILED
+		JOB_SUCCEEDED,
+		JOB_FAILED,
+		JOB_KILLED
 	}
 	
-	private final FlowRunner runner;
+	private final Object runner;
 	private final Type type;
 	private final Object eventData;
 	private final long time;
 	
-	public Event(FlowRunner runner, Type type, Object eventData) {
+	private Event(Object runner, Type type, Object eventData) {
 		this.runner = runner;
 		this.type = type;
 		this.eventData = eventData;
 		this.time = System.currentTimeMillis();
 	}
 	
-	public FlowRunner getFlowRunner() {
+	public Object getRunner() {
 		return runner;
 	}
 	
@@ -38,4 +38,12 @@ public class Event {
 	public Object getData() {
 		return eventData;
 	}
+	
+	public static Event create(Object runner, Type type) {
+		return new Event(runner, type, null);
+	}
+	
+	public static Event create(Object runner, Type type, Object eventData) {
+		return new Event(runner, type, eventData);
+	}
 }
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 9e37355..997d5fc 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -1,6 +1,7 @@
 package azkaban.executor;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -11,13 +12,13 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
-import azkaban.utils.Props;
 
 public class ExecutableFlow {
 	private String executionId;
 	private String flowId;
 	private String projectId;
 	private String executionPath;
+
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
 	private ArrayList<String> startNodes = new ArrayList<String>();
@@ -31,7 +32,7 @@ public class ExecutableFlow {
 	private String submitUser;
 	
 	public enum Status {
-		FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY, UNKNOWN
+		FAILED, SUCCEEDED, RUNNING, WAITING, KILLED, IGNORED, READY, UNKNOWN
 	}
 	
 	public ExecutableFlow(String id, Flow flow) {
@@ -49,6 +50,14 @@ public class ExecutableFlow {
 		return new ArrayList<ExecutableNode>(executableNodes.values());
 	}
 	
+	public ExecutableNode getExecutableNode(String id) {
+		return executableNodes.get(id);
+	}
+	
+	public Collection<FlowProps> getFlowProps() {
+		return flowProps.values();
+	}
+	
 	private void setFlow(Flow flow) {
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
@@ -72,10 +81,6 @@ public class ExecutableFlow {
 		
 		flowProps.putAll(flow.getAllFlowProps());
 	}
-	
-	public void run() {
-		
-	}
 
 	public void setStatus(String nodeId, Status status) {
 		ExecutableNode exNode = executableNodes.get(nodeId);
@@ -146,6 +151,10 @@ public class ExecutableFlow {
 		this.flowStatus = flowStatus;
 	}
 	
+	public List<String> getStartNodes() {
+		return new ArrayList<String>(startNodes);
+	}
+	
 	public Map<String,Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
 		flowObj.put("type", "executableflow");
@@ -158,6 +167,21 @@ public class ExecutableFlow {
 		flowObj.put("endTime", endTime);
 		flowObj.put("status", flowStatus.toString());
 		flowObj.put("submitUser", submitUser);
+		flowObj.put("startNodes", startNodes);
+		
+		ArrayList<Object> props = new ArrayList<Object>();
+		for (FlowProps fprop: flowProps.values()) {
+			HashMap<String, Object> propObj = new HashMap<String, Object>();
+			String source = fprop.getSource();
+			String inheritedSource = fprop.getInheritedSource();
+			
+			propObj.put("source", source);
+			if (inheritedSource != null) {
+				propObj.put("inherited", inheritedSource);
+			}
+			props.add(propObj);
+		}
+		flowObj.put("properties", props);
 		
 		ArrayList<Object> nodes = new ArrayList<Object>();
 		for (ExecutableNode node: executableNodes.values()) {
@@ -188,6 +212,18 @@ public class ExecutableFlow {
 			ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj);
 			exFlow.executableNodes.put(node.getId(), node);
 		}
+
+		List<Object> properties = (List<Object>)flowObj.get("properties");
+		for (Object propNode : properties) {
+			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
+			String source = (String)fprop.get("source");
+			String inheritedSource = (String)fprop.get("inherited");
+			
+			FlowProps flowProps = new FlowProps(inheritedSource, source);
+			exFlow.flowProps.put(source, flowProps);
+		}
+
+		exFlow.startNodes.addAll((List<String>)flowObj.get("startNodes"));
 		
 		return exFlow;
 	}
@@ -248,6 +284,7 @@ public class ExecutableFlow {
 		private String type;
 		private String jobPropsSource;
 		private String inheritPropsSource;
+		private String outputPropsSource;
 		private Status status = Status.READY;
 		private long startTime = -1;
 		private long endTime = -1;
@@ -313,6 +350,11 @@ public class ExecutableFlow {
 			objMap.put("startTime", startTime);
 			objMap.put("endTime", endTime);
 			objMap.put("level", level);
+			
+			if (outputPropsSource != null) {
+				objMap.put("outputSource", outputPropsSource);
+			}
+			
 			return objMap;
 		}
 
@@ -324,6 +366,7 @@ public class ExecutableFlow {
 			exNode.id = (String)objMap.get("id");
 			exNode.jobPropsSource = (String)objMap.get("jobSource");
 			exNode.inheritPropsSource = (String)objMap.get("propSource");
+			exNode.outputPropsSource = (String)objMap.get("outputSource");
 			exNode.type = (String)objMap.get("jobType");
 			exNode.status = Status.valueOf((String)objMap.get("status"));
 			
@@ -337,7 +380,7 @@ public class ExecutableFlow {
 			return exNode;
 		}
 		
-		@SuppressWarnings("unused")
+		@SuppressWarnings("unchecked")
 		public void updateNodeFromObject(Object obj) {
 			HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
 			status = Status.valueOf((String)objMap.get("status"));
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index e5e03e8..26a9ff2 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -1,32 +1,251 @@
 package azkaban.executor;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
 import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventHandler;
+import azkaban.executor.event.EventListener;
+import azkaban.flow.FlowProps;
+import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
 	public static final int NUM_CONCURRENT_THREADS = 10;
 
 	private ExecutableFlow flow;
-	private FlowRunnerManager manager;
 	private ExecutorService executorService;
+	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
 	private int numThreads = NUM_CONCURRENT_THREADS;
+	private boolean cancelled = true;
+	private boolean done = false;
+	
+	private Map<String, JobRunner> jobRunnersMap;
+	private JobRunnerEventListener listener;
+	private Map<String, Props> sharedProps = new HashMap<String, Props>();
+	private Map<String, Props> outputProps = new HashMap<String, Props>();
+	private File basePath;
+	
+	public enum FailedFlowOptions {
+		FINISH_RUNNING_JOBS,
+		COMPLETE_ALL_DEPENDENCIES,
+		CANCEL_ALL
+	}
+	
+	private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
 	
 	public FlowRunner(ExecutableFlow flow) {
 		this.flow = flow;
-		this.manager = manager;
+		this.basePath = new File(flow.getExecutionPath());
 		this.executorService = Executors.newFixedThreadPool(numThreads);
+		this.jobRunnersMap = new HashMap<String, JobRunner>();
+		this.listener = new JobRunnerEventListener(this);
+	}
+	
+	public ExecutableFlow getFlow() {
+		return flow;
+	}
+	
+	public void cancel() {
+		done = true;
+		cancelled = true;
+		
+		executorService.shutdownNow();
+		
+		// Loop through job runners
+		for (JobRunner runner: jobRunnersMap.values()) {
+			if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
+				runner.cancel();
+			}
+		}
+		
+		this.notify();
+	}
+	
+	public boolean isCancelled() {
+		return cancelled;
 	}
 	
 	@Override
 	public void run() {
-		this.fireEventListeners(new Event(this, Type.FLOW_STARTED, null));
+		this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
 		
+		// Load all shared props
+		try {
+			loadAllProperties(flow);
+		}
+		catch (IOException e) {
+			flow.setStatus(Status.FAILED);
+			System.err.println("Failed due to " + e.getMessage());
+			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+			return;
+		}
+
+		// Set up starting nodes
+		try {
+			for (String startNode: flow.getStartNodes()) {
+				ExecutableNode node = flow.getExecutableNode(startNode);
+				JobRunner jobRunner = createJobRunner(node, null);
+				jobsToRun.add(jobRunner);
+			}
+		} catch (IOException e) {
+			System.err.println("Failed due to " + e.getMessage());
+			flow.setStatus(Status.FAILED);
+			jobsToRun.clear();
+			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+			return;
+		}
 		
+		while(!done) {
+			JobRunner runner = null;
+			try {
+				runner = jobsToRun.take();
+			} catch (InterruptedException e) {
+			}
+			
+			if (!done && runner != null) {
+				executorService.submit(runner);
+			}
+		}
 		
-		this.fireEventListeners(new Event(this, Type.FLOW_FINISHED, null));
+		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+	}
+	
+	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
+		String source = node.getJobPropsSource();
+		String propsSource = node.getPropsSource();
+
+		Props parentProps = propsSource == null ? null : sharedProps.get(propsSource);
+		
+		// We add the previous job output and put into this props.
+		if (previousOutput != null) {
+			Props earliestParent = previousOutput.getEarliestAncestor();
+			earliestParent.setParent(parentProps);
+			
+			parentProps = earliestParent;
+		}
+		
+		File propsFile = new File(basePath, source);
+		Props jobProps = new Props(parentProps, propsFile);
+		
+		JobRunner jobRunner = new JobRunner(node, jobProps);
+		jobRunner.addListener(listener);
+		jobRunnersMap.put(node.getId(), jobRunner);
+		
+		return jobRunner;
+	}
+	
+	private void loadAllProperties(ExecutableFlow flow) throws IOException {
+		// First load all the properties
+		for (FlowProps fprops: flow.getFlowProps()) {
+			String source = fprops.getSource();
+			File propsFile = new File(basePath, source);
+			
+			Props props = new Props(null, propsFile);
+			sharedProps.put(source, props);
+		}
+
+		// Resolve parents
+		for (FlowProps fprops: flow.getFlowProps()) {
+			if (fprops.getInheritedSource() != null) {
+				String source = fprops.getSource();
+				String inherit = fprops.getInheritedSource();
+				
+				Props props = sharedProps.get(source);
+				Props inherits = sharedProps.get(inherit);
+				
+				props.setParent(inherits);
+			}
+		}
+	}
+	
+	private void handleSucceededJob(ExecutableNode node) {
+		for(String dependent: node.getOutNodes()) {
+			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
+			
+			// Check all dependencies
+			boolean ready = true;
+			for (String dependency: dependentNode.getInNodes()) {
+				ExecutableNode dependencyNode = flow.getExecutableNode(dependency); 
+				if (dependencyNode.getStatus() != Status.SUCCEEDED &&
+					dependencyNode.getStatus() != Status.IGNORED) {
+					ready = false;
+					break;
+				}
+			}
+			
+			if (ready) {
+				Props previousOutput = null;
+				// Iterate the in nodes again and create the dependencies
+				for (String dependency: node.getInNodes()) {
+					Props output = outputProps.get(dependency);
+					if (output != null) {
+						output = Props.clone(output);
+						
+						output.setParent(previousOutput);
+						previousOutput = output;
+					}
+				}
+				
+				JobRunner runner = null;
+				try {
+					runner = this.createJobRunner(dependentNode, previousOutput);
+				} catch (IOException e) {
+					System.err.println("Failed due to " + e.getMessage());
+					dependentNode.setStatus(Status.FAILED);
+					handleFailedJob(dependentNode);
+					return;
+				}
+				
+				jobsToRun.add(runner);
+			}
+		}
+	}
+	
+	private void handleFailedJob(ExecutableNode node) {
+		System.err.println("Job " + node.getId() + " failed.");
+		this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
+		if (failedOptions == FailedFlowOptions.FINISH_RUNNING_JOBS) {
+			done = true;
+		}
+		else if (failedOptions == FailedFlowOptions.CANCEL_ALL) {
+			this.cancel();
+		}
+		else if (failedOptions == FailedFlowOptions.COMPLETE_ALL_DEPENDENCIES) {
+		}
+	}
+	
+	private class JobRunnerEventListener implements EventListener {
+		private FlowRunner flowRunner;
+		
+		public JobRunnerEventListener(FlowRunner flowRunner) {
+			this.flowRunner = flowRunner;
+		}
+
+		@Override
+		public synchronized void handleEvent(Event event) {
+			JobRunner runner = (JobRunner)event.getRunner();
+			String jobID = runner.getNode().getId();
+			System.out.println("Event " + jobID + " " + event.getType().toString());
+
+			if (event.getType() == Type.JOB_SUCCEEDED) {
+				// Continue adding items.
+				Props props = runner.getOutputProps();
+				outputProps.put(jobID, props);
+				
+				flowRunner.handleSucceededJob(runner.getNode());
+			}
+			else if (event.getType() == Type.JOB_FAILED) {
+				flowRunner.handleFailedJob(runner.getNode());
+			}
+		}
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 375348b..f07af9d 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -43,15 +43,28 @@ public class FlowRunnerManager {
 	
 	public void submitFlow(String id, String path) throws ExecutorManagerException {
 		// Load file and submit
+		logger.info("Flow " + id + " submitted with path " + path);
+		
 		File dir = new File(path);
 		ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
-		
+		flow.setExecutionPath(path);
+
 		FlowRunner runner = new FlowRunner(flow);
 		runner.addListener(eventListener);
 		executorService.submit(runner);
 	}
 	
-	//
+	public void cancelFlow(String id) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(id);
+		if (runner != null) {
+			runner.cancel();
+		}
+	}
+	
+	public FlowRunner getFlowRunner(String id) {
+		return runningFlows.get(id);
+	}
+	
 	private class SubmitterThread extends Thread {
 		private BlockingQueue<FlowRunner> queue;
 		private boolean shutdown = false;
@@ -87,7 +100,9 @@ public class FlowRunnerManager {
 
 		@Override
 		public synchronized void handleEvent(Event event) {
-			
+			FlowRunner runner = (FlowRunner)event.getRunner();
+			ExecutableFlow flow = runner.getFlow();
+			System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
 		}
 	}
 }
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
new file mode 100644
index 0000000..7e2d45a
--- /dev/null
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -0,0 +1,61 @@
+package azkaban.executor;
+
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.event.EventHandler;
+import azkaban.utils.Props;
+
+public class JobRunner  extends EventHandler implements Runnable {
+	private Props props;
+	private Props outputProps;
+	private ExecutableNode node;
+	
+	public JobRunner(ExecutableNode node, Props props) {
+		this.props = props;
+		this.node = node;
+		this.node.setStatus(Status.WAITING);
+	}
+	
+	public ExecutableNode getNode() {
+		return node;
+	}
+	
+	@Override
+	public void run() {
+		if (node.getStatus() == Status.KILLED) {
+			this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
+			return;
+		}
+		
+		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+		node.setStatus(Status.RUNNING);
+		
+		// Run Job
+		boolean succeeded = true;
+		
+		if (succeeded) {
+			node.setStatus(Status.SUCCEEDED);
+			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
+		}
+		else {
+			node.setStatus(Status.FAILED);
+			this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
+		}
+	}
+
+	public void cancel() {
+		// Cancel code here
+		
+		node.setStatus(Status.KILLED);
+	}
+	
+	public Status getStatus() {
+		return node.getStatus();
+	}
+	
+	public Props getOutputProps() {
+		return outputProps;
+	}
+}
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 8afd7a6..613f3d4 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -34,6 +34,7 @@ public class ExecutableFlowLoader {
 		}
 		
 		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+		flow.setExecutionPath(exDir.getPath());
 		return flow;
 	}
 	
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 6dd0a5e..4b16b9c 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -157,6 +157,14 @@ public class Props {
 		}
 	}
 
+	public Props getEarliestAncestor() {
+		if (_parent == null) {
+			return this;
+		}
+		
+		return _parent.getEarliestAncestor();
+	}
+	
 	/**
 	 * Create a Props with a null parent from a list of key value pairing. i.e.
 	 * [key1, value1, key2, value2 ...]