FlowRunner.java

306 lines | 8.664 kB Blame History Raw Download
package azkaban.executor;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
import azkaban.executor.event.EventListener;
import azkaban.flow.FlowProps;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;

public class FlowRunner extends EventHandler implements Runnable {
	public static final int NUM_CONCURRENT_THREADS = 10;

	private ExecutableFlow flow;
	private ExecutorService executorService;
	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
	private int numThreads = NUM_CONCURRENT_THREADS;
	private boolean cancelled = true;
	
	private Map<String, JobRunner> jobRunnersMap;
	private JobRunnerEventListener listener;
	private Map<String, Props> sharedProps = new HashMap<String, Props>();
	private Map<String, Props> outputProps = new HashMap<String, Props>();
	private File basePath;
	private AtomicInteger commitCount = new AtomicInteger(0);
	private HashSet<String> finalNodes = new HashSet<String>();

	public enum FailedFlowOptions {
		FINISH_RUNNING_JOBS,
		KILL_ALL
	}
	
	private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
	
	public FlowRunner(ExecutableFlow flow) {
		this.flow = flow;
		this.basePath = new File(flow.getExecutionPath());
		this.executorService = Executors.newFixedThreadPool(numThreads);
		this.jobRunnersMap = new HashMap<String, JobRunner>();
		this.listener = new JobRunnerEventListener(this);
	}
	
	public ExecutableFlow getFlow() {
		return flow;
	}
	
	public void cancel() {
		finalNodes.clear();
		cancelled = true;
		
		executorService.shutdownNow();
		
		// Loop through job runners
		for (JobRunner runner: jobRunnersMap.values()) {
			if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
				runner.cancel();
			}
		}
		
		this.notify();
	}
	
	public boolean isCancelled() {
		return cancelled;
	}
	
	private synchronized void commitFlow() {
		int count = commitCount.getAndIncrement();

		try {
			ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
		} catch (ExecutorManagerException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		flow.setStatus(Status.RUNNING);
		flow.setStartTime(System.currentTimeMillis());
		this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
		
		// Load all shared props
		try {
			loadAllProperties(flow);
		}
		catch (IOException e) {
			flow.setStatus(Status.FAILED);
			System.err.println("Failed due to " + e.getMessage());
			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
			return;
		}

		// Set up starting nodes
		try {
			for (String startNode: flow.getStartNodes()) {
				ExecutableNode node = flow.getExecutableNode(startNode);
				JobRunner jobRunner = createJobRunner(node, null);
				jobsToRun.add(jobRunner);
			}
		} catch (IOException e) {
			System.err.println("Failed due to " + e.getMessage());
			flow.setStatus(Status.FAILED);
			jobsToRun.clear();
			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
			return;
		}
		
		// When this is empty, we will stop.
		finalNodes.addAll(flow.getEndNodes());
		
		// Main loop
		while(!finalNodes.isEmpty()) {
			JobRunner runner = null;
			try {
				runner = jobsToRun.take();
			} catch (InterruptedException e) {
			}
			
			if (!finalNodes.isEmpty() && runner != null) {
				try {
					ExecutableNode node = runner.getNode();
					node.setStatus(Status.WAITING);
					executorService.submit(runner);
					finalNodes.remove(node.getId());
				} catch (RejectedExecutionException e) {
					// Should reject if I shutdown executor.
					break;
				}
			}
		}
		
		executorService.shutdown();
		while (executorService.isTerminated()) {
			try {
				executorService.awaitTermination(1, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

		if (flow.getStatus() == Status.RUNNING) {
			flow.setStatus(Status.SUCCEEDED);
		}
		else {
			flow.setStatus(Status.FAILED);
		}
		flow.setEndTime(System.currentTimeMillis());
		commitFlow();
		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
	}
	
	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
		String source = node.getJobPropsSource();
		String propsSource = node.getPropsSource();

		Props parentProps = propsSource == null ? null : sharedProps.get(propsSource);
		
		// We add the previous job output and put into this props.
		if (previousOutput != null) {
			Props earliestParent = previousOutput.getEarliestAncestor();
			earliestParent.setParent(parentProps);
			
			parentProps = earliestParent;
		}
		
		File propsFile = new File(basePath, source);
		Props jobProps = new Props(parentProps, propsFile);
		
		JobRunner jobRunner = new JobRunner(node, jobProps, new File(flow.getExecutionPath()));
		jobRunner.addListener(listener);
		jobRunnersMap.put(node.getId(), jobRunner);
		
		return jobRunner;
	}
	
	private void loadAllProperties(ExecutableFlow flow) throws IOException {
		// First load all the properties
		for (FlowProps fprops: flow.getFlowProps()) {
			String source = fprops.getSource();
			File propsFile = new File(basePath, source);
			
			Props props = new Props(null, propsFile);
			sharedProps.put(source, props);
		}

		// Resolve parents
		for (FlowProps fprops: flow.getFlowProps()) {
			if (fprops.getInheritedSource() != null) {
				String source = fprops.getSource();
				String inherit = fprops.getInheritedSource();
				
				Props props = sharedProps.get(source);
				Props inherits = sharedProps.get(inherit);
				
				props.setParent(inherits);
			}
		}
	}
	
	private void handleSucceededJob(ExecutableNode node) {
		for(String dependent: node.getOutNodes()) {
			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
			
			// Check all dependencies
			boolean ready = true;
			for (String dependency: dependentNode.getInNodes()) {
				ExecutableNode dependencyNode = flow.getExecutableNode(dependency); 
				if (dependencyNode.getStatus() != Status.SUCCEEDED &&
					dependencyNode.getStatus() != Status.DISABLED) {
					ready = false;
					break;
				}
			}
			
			if (ready) {
				Props previousOutput = null;
				// Iterate the in nodes again and create the dependencies
				for (String dependency: node.getInNodes()) {
					Props output = outputProps.get(dependency);
					if (output != null) {
						output = Props.clone(output);
						
						output.setParent(previousOutput);
						previousOutput = output;
					}
				}
				
				JobRunner runner = null;
				try {
					runner = this.createJobRunner(dependentNode, previousOutput);
				} catch (IOException e) {
					System.err.println("Failed due to " + e.getMessage());
					dependentNode.setStatus(Status.FAILED);
					handleFailedJob(dependentNode);
					return;
				}
				
				jobsToRun.add(runner);
			}
		}
	}
	
	private void handleFailedJob(ExecutableNode node) {
		System.err.println("Job " + node.getId() + " failed.");
		this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
		
		switch (failedOptions) {
			// We finish running current jobs and then fail. Do not accept new jobs.
			case FINISH_RUNNING_JOBS:
				finalNodes.clear();
				executorService.shutdown();
				this.notify();
			break;
			// We kill all running jobs and fail immediately
			case KILL_ALL:
				this.cancel();
			break;
		}

	}
	
	private class JobRunnerEventListener implements EventListener {
		private FlowRunner flowRunner;
		
		public JobRunnerEventListener(FlowRunner flowRunner) {
			this.flowRunner = flowRunner;
		}

		@Override
		public synchronized void handleEvent(Event event) {
			JobRunner runner = (JobRunner)event.getRunner();
			String jobID = runner.getNode().getId();
			System.out.println("Event " + jobID + " " + event.getType().toString());

			// On Job success, we add the output props and then set up the next run.
			if (event.getType() == Type.JOB_SUCCEEDED) {
				Props props = runner.getOutputProps();
				outputProps.put(jobID, props);
				flowRunner.handleSucceededJob(runner.getNode());
			}
			else if (event.getType() == Type.JOB_FAILED) {
				flowRunner.handleFailedJob(runner.getNode());
			}
			
			flowRunner.commitFlow();
		}
	}
}