FlowRunnerHelper.java

98 lines | 2.397 kB Blame History Raw Download
package azkaban.execapp;

import java.util.ArrayList;
import java.util.List;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;

public class FlowRunnerHelper {
	public static boolean isFlowFinished(ExecutableFlow flow) {
		for (String end: flow.getEndNodes()) {
			ExecutableNode node = flow.getExecutableNode(end);
			if (!Status.isStatusFinished(node.getStatus()) ) {
				return false;
			}
		}
		
		return true;
	}
	
	public static List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
		for (ExecutableNode node : flow.getExecutableNodes()) {
			if (Status.isStatusFinished(node.getStatus())) {
				continue;
			}
			else {
				// Check the dependencies to see if execution conditions are met,
				// and what the status should be set to.
				Status impliedStatus = getImpliedStatus(node);
				if (impliedStatus != null) {
					node.setStatus(impliedStatus);
					jobsToRun.add(node);
				}
				else if (node instanceof ExecutableFlowBase && node.getStatus() == Status.RUNNING) {
					// We want to seek into a running flow
				}
			}
		}
		
		return jobsToRun;
	}
	
	public static Status getImpliedStatus(ExecutableNode node) {
		switch(node.getStatus()) {
			case FAILED:
			case KILLED:
			case SKIPPED:
			case SUCCEEDED:
			case FAILED_SUCCEEDED:
			case QUEUED:
			case RUNNING:
				return null;
			default:
				break;
		}
		
		ExecutableFlowBase flow = node.getParentFlow();
		
		boolean shouldKill = false;
		for (String dependency : node.getInNodes()) {
			ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
			
			Status depStatus = dependencyNode.getStatus();
			switch (depStatus) {
			case FAILED:
			case KILLED:
				shouldKill = true;
			case SKIPPED:
			case SUCCEEDED:
			case FAILED_SUCCEEDED:
				continue;
			case RUNNING:
			case QUEUED:
			case DISABLED:
				return null;
			default:
				// Return null means it's not ready to run.
				return null;
			}
		}
		
		if (shouldKill || flow.getStatus() == Status.KILLED || flow.getStatus() == Status.FAILED) {
			return Status.KILLED;
		}
		
		// If it's disabled but ready to run, we want to make sure it continues being disabled.
		if (node.getStatus() == Status.DISABLED) {
			return Status.DISABLED;
		}
		
		// All good to go, ready to run.
		return Status.READY;
	}
}