ExecutableFlowBase.java

423 lines | 12.223 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn, Inc
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package azkaban.executor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
import azkaban.project.Project;
import azkaban.utils.TypedMapWrapper;

public class ExecutableFlowBase extends ExecutableNode {
	public static final String FLOW_ID_PARAM = "flowId";
	public static final String NODES_PARAM = "nodes";
	public static final String PROPERTIES_PARAM = "properties";
	public static final String SOURCE_PARAM = "source";
	public static final String INHERITED_PARAM = "inherited";
	
	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
	private ArrayList<String> startNodes;
	private ArrayList<String> endNodes;
	
	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
	private String flowId;
	
	public ExecutableFlowBase(Project project, Node node, Flow flow, ExecutableFlowBase parent) {
		super(node, parent);

		setFlow(project, flow);
	}
	
	public ExecutableFlowBase() {
	}
	
	public int getExecutionId() {
		if (this.getParentFlow() != null) {
			return this.getParentFlow().getExecutionId();
		}
		
		return -1;
 	}
	
	public int getProjectId() {
		if (this.getParentFlow() != null) {
			return this.getParentFlow().getProjectId();
		}
		
		return -1;
	}
	
	public int getVersion() {
		if (this.getParentFlow() != null) {
			return this.getParentFlow().getVersion();
		}
		
		return -1;
	}
	
	public Collection<FlowProps> getFlowProps() {
		return flowProps.values();
	}
	
	public String getFlowId() {
		return flowId;
	}
	
	protected void setFlow(Project project, Flow flow) {
		this.flowId = flow.getId();
		flowProps.putAll(flow.getAllFlowProps());
		
		for (Node node: flow.getNodes()) {
			String id = node.getId();
			if (node.getType().equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
				String embeddedFlowId = node.getEmbeddedFlowId();
				Flow subFlow = project.getFlow(embeddedFlowId);
				
				ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, this);
				executableNodes.put(id, embeddedFlow);
			}
			else {
				ExecutableNode exNode = new ExecutableNode(node, this);
				executableNodes.put(id, exNode);
			}
		}
		
		for (Edge edge: flow.getEdges()) {
			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
			
			if (sourceNode == null) {
				System.out.println("Source node " + edge.getSourceId() + " doesn't exist");
			}
			sourceNode.addOutNode(edge.getTargetId());
			targetNode.addInNode(edge.getSourceId());
		}
	}
	
	public List<ExecutableNode> getExecutableNodes() {
		return new ArrayList<ExecutableNode>(executableNodes.values());
	}
	
	public ExecutableNode getExecutableNode(String id) {
		return executableNodes.get(id);
	}
	
	public ExecutableNode getExecutableNodePath(String ids) {
		String[] split = ids.split(":");
		return getExecutableNodePath(split);
	}
	
	public ExecutableNode getExecutableNodePath(String ... ids) {
		return getExecutableNodePath(this, ids, 0);
	}
	
	private ExecutableNode getExecutableNodePath(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
		ExecutableNode node = flow.getExecutableNode(ids[currentIdIdx]);
		currentIdIdx++;
		
		if (node == null) {
			return null;
		}
		
		if (ids.length == currentIdIdx) {
			return node;
		}
		else if (node instanceof ExecutableFlowBase) {
			return getExecutableNodePath((ExecutableFlowBase)node, ids, currentIdIdx);
		}
		else {
			return null;
		}
		
	}
	
	public List<String> getStartNodes() {
		if (startNodes == null) {
			startNodes = new ArrayList<String>();
			for (ExecutableNode node: executableNodes.values()) {
				if (node.getInNodes().isEmpty()) {
					startNodes.add(node.getId());
				}
			}
		}
		
		return startNodes;
	}
	
	public List<String> getEndNodes() {
		if (endNodes == null) {
			endNodes = new ArrayList<String>();
			for (ExecutableNode node: executableNodes.values()) {
				if (node.getOutNodes().isEmpty()) {
					endNodes.add(node.getId());
				}
			}
		}
		
		return endNodes;
	}
	
	public Map<String,Object> toObject() {
		Map<String,Object> mapObj = new HashMap<String,Object>();
		fillMapFromExecutable(mapObj);
		
		return mapObj;
	}
	
	protected void fillMapFromExecutable(Map<String,Object> flowObjMap) {
		super.fillMapFromExecutable(flowObjMap);
		
		flowObjMap.put(FLOW_ID_PARAM, flowId);
		
		ArrayList<Object> nodes = new ArrayList<Object>();
		for (ExecutableNode node: executableNodes.values()) {
			nodes.add(node.toObject());
		}
		flowObjMap.put(NODES_PARAM, nodes);
		
		// Flow properties
		ArrayList<Object> props = new ArrayList<Object>();
		for (FlowProps fprop: flowProps.values()) {
			HashMap<String, Object> propObj = new HashMap<String, Object>();
			String source = fprop.getSource();
			String inheritedSource = fprop.getInheritedSource();
			
			propObj.put(SOURCE_PARAM, source);
			if (inheritedSource != null) {
				propObj.put(INHERITED_PARAM, inheritedSource);
			}
			props.add(propObj);
		}
		flowObjMap.put(PROPERTIES_PARAM, props);
	}

	@Override
	public void fillExecutableFromMapObject(TypedMapWrapper<String,Object> flowObjMap) {
		super.fillExecutableFromMapObject(flowObjMap);
		
		this.flowId = flowObjMap.getString(FLOW_ID_PARAM);
		List<Object> nodes = flowObjMap.<Object>getList(NODES_PARAM);
		
		if (nodes != null) {
			for (Object nodeObj: nodes) {
				@SuppressWarnings("unchecked")
				Map<String,Object> nodeObjMap = (Map<String,Object>)nodeObj;
				TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(nodeObjMap);
				
				String type = wrapper.getString(TYPE_PARAM);
				if (type != null && type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
					ExecutableFlowBase exFlow = new ExecutableFlowBase();
					exFlow.fillExecutableFromMapObject(wrapper);
					exFlow.setParentFlow(this);
					
					executableNodes.put(exFlow.getId(), exFlow);
				}
				else {
					ExecutableNode exJob = new ExecutableNode();
					exJob.fillExecutableFromMapObject(nodeObjMap);
					exJob.setParentFlow(this);
					
					executableNodes.put(exJob.getId(), exJob);
				}
			}
		}
		
		List<Object> properties = flowObjMap.<Object>getList(PROPERTIES_PARAM);
		for (Object propNode : properties) {
			@SuppressWarnings("unchecked")
			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
			String source = (String)fprop.get("source");
			String inheritedSource = (String)fprop.get("inherited");
			
			FlowProps flowProps = new FlowProps(inheritedSource, source);
			this.flowProps.put(source, flowProps);
		}
	}
	
	public Map<String, Object> toUpdateObject(long lastUpdateTime) {
		Map<String, Object> updateData = super.toUpdateObject();
		
		List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
		for (ExecutableNode node: executableNodes.values()) {
			if (node instanceof ExecutableFlowBase) {
				Map<String, Object> updatedNodeMap = ((ExecutableFlowBase)node).toUpdateObject(lastUpdateTime);
				// We add only flows to the list which either have a good update time, or has updated descendants.
				if (node.getUpdateTime() > lastUpdateTime || updatedNodeMap.containsKey(NODES_PARAM)) {
					updatedNodes.add(updatedNodeMap);
				}
			} 
			else {
				if (node.getUpdateTime() > lastUpdateTime) {
					Map<String, Object> updatedNodeMap = node.toUpdateObject();
					updatedNodes.add(updatedNodeMap);
				}
			}
		}
		
		// if there are no updated nodes, we just won't add it to the list. This is good
		// since if this is a nested flow, the parent is given the option to include or
		// discard these subflows.
		if (!updatedNodes.isEmpty()) {
			updateData.put(NODES_PARAM, updatedNodes);
		}
		return updateData;
	}
	
	public void applyUpdateObject(TypedMapWrapper<String, Object> updateData, List<ExecutableNode> updatedNodes) {
		super.applyUpdateObject(updateData);
		
		if (updatedNodes != null) {
			updatedNodes.add(this);
		}

		List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.<Map<String,Object>>getList(NODES_PARAM);
		if (nodes != null) {
			for (Map<String,Object> node: nodes) {
				TypedMapWrapper<String,Object> nodeWrapper = new TypedMapWrapper<String,Object>(node);
				String id = nodeWrapper.getString(ID_PARAM);
				if (id == null) {
					// Legacy case
					id = nodeWrapper.getString("jobId");				
				}
	
				ExecutableNode exNode = executableNodes.get(id);
				if (updatedNodes != null) {
					updatedNodes.add(exNode);
				}
				
				if (exNode instanceof ExecutableFlowBase) {
					((ExecutableFlowBase)exNode).applyUpdateObject(nodeWrapper, updatedNodes);
				}
				else {
					exNode.applyUpdateObject(nodeWrapper);
				}
			}
		}
	}
	
	public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
		TypedMapWrapper<String, Object> typedMapWrapper = new TypedMapWrapper<String,Object>(updateData);
		applyUpdateObject(typedMapWrapper, updatedNodes);
	}
	
	@Override
	public void applyUpdateObject(Map<String, Object> updateData) {
		TypedMapWrapper<String, Object> typedMapWrapper = new TypedMapWrapper<String,Object>(updateData);
		applyUpdateObject(typedMapWrapper, null);
	}
	
	public void reEnableDependents(ExecutableNode ... nodes) {
		for(ExecutableNode node: nodes) {
			for(String dependent: node.getOutNodes()) {
				ExecutableNode dependentNode = getExecutableNode(dependent);
				
				if (dependentNode.getStatus() == Status.KILLED) {
					dependentNode.setStatus(Status.READY);
					dependentNode.setUpdateTime(System.currentTimeMillis());
					reEnableDependents(dependentNode);
	
					if (dependentNode instanceof ExecutableFlowBase) {
						
						((ExecutableFlowBase)dependentNode).reEnableDependents();
					}
				}
				else if (dependentNode.getStatus() == Status.SKIPPED) {
					dependentNode.setStatus(Status.DISABLED);
					dependentNode.setUpdateTime(System.currentTimeMillis());
					reEnableDependents(dependentNode);
				}
			}
		}
	}
	
	/**
	 * Only returns true if the status of all finished nodes is true.
	 * @return
	 */
	public boolean isFlowFinished() {
		for (String end: getEndNodes()) {
			ExecutableNode node = getExecutableNode(end);
			if (!Status.isStatusFinished(node.getStatus()) ) {
				return false;
			}
		}
		
		return true;
	}
	
	/**
	 * Finds all jobs which are ready to run. This occurs when all of its 
	 * dependency nodes are finished running.
	 * 
	 * It will also return any subflow that has been completed such that the
	 * FlowRunner can properly handle them.
	 * 
	 * @param flow
	 * @return
	 */
	public List<ExecutableNode> findNextJobsToRun() {
		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
		
		if (isFlowFinished() && !Status.isStatusFinished(getStatus())) {
			jobsToRun.add(this);
		}
		else {
			nodeloop:
			for (ExecutableNode node: executableNodes.values()) {
				if(Status.isStatusFinished(node.getStatus())) {
					continue;
				}
	
				if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
					// If the flow is still running, we traverse into the flow
					jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
				}
				else if (Status.isStatusRunning(node.getStatus())) {
					continue;
				}
				else {
					for (String dependency: node.getInNodes()) {
						// We find that the outer-loop is unfinished.
						if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
							continue nodeloop;
						}
					}
	
					jobsToRun.add(node);
				}
			}
		}
		
		return jobsToRun;
	}
	
	public String getFlowPath() {
		if (this.getParentFlow() == null) {
			return this.getFlowId();
		}
		else {
			return this.getParentFlow().getFlowPath() + "," + this.getId() + ":"+ this.getFlowId();
		}
	}
}