ExecutableNode.java

377 lines | 9.085 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import azkaban.flow.Node;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;

public class ExecutableNode {
	private String jobId;
	private int executionId;
	private String type;
	private String jobPropsSource;
	private String inheritPropsSource;
	private Status status = Status.READY;
	private long startTime = -1;
	private long endTime = -1;
	private long updateTime = -1;
	private int level = 0;
	private ExecutableFlow flow;
	private Props outputProps;
	private int attempt = 0;
	private boolean paused = false;
	
	private long delayExecution = 0;

	private Set<String> inNodes = new HashSet<String>();
	private Set<String> outNodes = new HashSet<String>();
	
	// Used if proxy node
	private Integer externalExecutionId;
	private ArrayList<Attempt> pastAttempts = null;
	
	public ExecutableNode(Node node, ExecutableFlow flow) {
		jobId = node.getId();
		executionId = flow.getExecutionId();
		type = node.getType();
		jobPropsSource = node.getJobSource();
		inheritPropsSource = node.getPropsSource();
		status = Status.READY;
		level = node.getLevel();
		this.flow = flow;
	}
	
	public ExecutableNode() {
	}
	
	public void resetForRetry() {
		Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
		attempt++;
		
		synchronized (this) {
			if (pastAttempts == null) {
				pastAttempts = new ArrayList<Attempt>();
			}
			
			pastAttempts.add(pastAttempt);
		}
		startTime = -1;
		endTime = -1;
		updateTime = System.currentTimeMillis();
		status = Status.READY;
	}
	
	public void setExecutableFlow(ExecutableFlow flow) {
		this.flow = flow;
	}
	
	public void setExecutionId(int id) {
		executionId = id;
	}

	public int getExecutionId() {
		return executionId;
	}

	public String getJobId() {
		return jobId;
	}

	public void setJobId(String id) {
		this.jobId = id;
	}

	public void addInNode(String exNode) {
		inNodes.add(exNode);
	}

	public void addOutNode(String exNode) {
		outNodes.add(exNode);
	}

	public Set<String> getOutNodes() {
		return outNodes;
	}
	
	public Set<String> getInNodes() {
		return inNodes;
	}
	
	public Status getStatus() {
		return status;
	}

	public void setStatus(Status status) {
		this.status = status;
	}
	
	public long getDelayedExecution() {
		return delayExecution;
	}
	
	public void setDelayedExecution(long delayMs) {
		delayExecution = delayMs;
	}
	
	public Object toObject() {
		HashMap<String, Object> objMap = new HashMap<String, Object>();
		objMap.put("id", jobId);
		objMap.put("jobSource", jobPropsSource);
		objMap.put("propSource", inheritPropsSource);
		objMap.put("jobType", type);
		objMap.put("status", status.toString());
		objMap.put("inNodes", new ArrayList<String>(inNodes));
		objMap.put("outNodes", new ArrayList<String>(outNodes));
		objMap.put("startTime", startTime);
		objMap.put("endTime", endTime);
		objMap.put("updateTime", updateTime);
		objMap.put("level", level);
		objMap.put("externalExecutionId", externalExecutionId);
		objMap.put("paused", paused);
		
		if (pastAttempts != null) {
			ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
			for (Attempt attempts : pastAttempts) {
				attemptsList.add(attempts.toObject());
			}
			objMap.put("pastAttempts", attemptsList);
		}
		
		return objMap;
	}

	@SuppressWarnings("unchecked")
	public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
		ExecutableNode exNode = new ExecutableNode();
		
		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
		exNode.executionId = flow == null ? 0 : flow.getExecutionId();
		exNode.jobId = (String)objMap.get("id");
		exNode.jobPropsSource = (String)objMap.get("jobSource");
		exNode.inheritPropsSource = (String)objMap.get("propSource");
		exNode.type = (String)objMap.get("jobType");
		exNode.status = Status.valueOf((String)objMap.get("status"));
		
		exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
		exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
		
		exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
		exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
		exNode.updateTime = JSONUtils.getLongFromObject(objMap.get("updateTime"));
		exNode.level = (Integer)objMap.get("level");
		
		exNode.externalExecutionId = (Integer)objMap.get("externalExecutionId");
		
		exNode.flow = flow;
		Boolean paused = (Boolean)objMap.get("paused");
		if (paused!=null) {
			exNode.paused = paused;
		}
		
		List<Object> pastAttempts = (List<Object>)objMap.get("pastAttempts");
		if (pastAttempts!=null) {
			ArrayList<Attempt> attempts = new ArrayList<Attempt>();
			for (Object attemptObj: pastAttempts) {
				Attempt attempt = Attempt.fromObject(attemptObj);
				attempts.add(attempt);
			}
			
			exNode.pastAttempts = attempts;
		}
		
		return exNode;
	}

	@SuppressWarnings("unchecked")
	public void updateNodeFromObject(Object obj) {
		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
		status = Status.valueOf((String)objMap.get("status"));

		startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
		endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
	}

	public long getStartTime() {
		return startTime;
	}

	public void setStartTime(long startTime) {
		this.startTime = startTime;
	}

	public long getEndTime() {
		return endTime;
	}

	public void setEndTime(long endTime) {
		this.endTime = endTime;
	}

	public String getJobPropsSource() {
		return jobPropsSource;
	}

	public String getPropsSource() {
		return inheritPropsSource;
	}

	public int getLevel() {
		return level;
	}

	public ExecutableFlow getFlow() {
		return flow;
	}

	public long getUpdateTime() {
		return updateTime;
	}

	public void setUpdateTime(long updateTime) {
		this.updateTime = updateTime;
	}

	public void setOutputProps(Props output) {
		this.outputProps = output;
	}

	public Props getOutputProps() {
		return outputProps;
	}

	public Integer getExternalExecutionId() {
		return externalExecutionId;
	}

	public void setExternalExecutionId(Integer externalExecutionId) {
		this.externalExecutionId = externalExecutionId;
	}

	public List<Attempt> getPastAttemptList() {
		return pastAttempts;
	}
	
	public int getAttempt() {
		return attempt;
	}

	public void setAttempt(int attempt) {
		this.attempt = attempt;
	}
	
	public boolean isPaused() {
		return paused;
	}
	
	public void setPaused(boolean paused) {
		this.paused = paused;
	}
	
	public List<Object> getAttemptObjects() {
		ArrayList<Object> array = new ArrayList<Object>();
		
		for (Attempt attempt: pastAttempts) {
			array.add(attempt.toObject());
		}
		
		return array;
	}
	
	
	public void updatePastAttempts(List<Object> pastAttemptsList) {
		if (pastAttemptsList == null) {
			return;
		}
		
		synchronized (this) {
			if (this.pastAttempts == null) {
				this.pastAttempts = new ArrayList<Attempt>();
			}

			// We just check size because past attempts don't change
			if (pastAttemptsList.size() <= this.pastAttempts.size()) {
				return;
			}

			Object[] pastAttemptArray = pastAttemptsList.toArray();
			for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
				Attempt attempt = Attempt.fromObject(pastAttemptArray[i]);
				this.pastAttempts.add(attempt);
			}
		}

	}

	public static class Attempt {
		private int attempt = 0;
		private long startTime = -1;
		private long endTime = -1;
		private Status status;
		
		public Attempt(int attempt, long startTime, long endTime, Status status) {
			this.attempt = attempt;
			this.startTime = startTime;
			this.endTime = endTime;
			this.status = status;
		}
		
		public long getStartTime() {
			return startTime;
		}

		public long getEndTime() {
			return endTime;
		}
		
		public Status getStatus() {
			return status;
		}
		
		public int getAttempt() {
			return attempt;
		}
		
		public static Attempt fromObject(Object obj) {
			@SuppressWarnings("unchecked")
			Map<String, Object> map = (Map<String, Object>)obj;
			int attempt = (Integer)map.get("attempt");
			long startTime = JSONUtils.getLongFromObject(map.get("startTime"));
			long endTime = JSONUtils.getLongFromObject(map.get("endTime"));
			Status status = Status.valueOf((String)map.get("status"));
			
			return new Attempt(attempt, startTime, endTime, status);
		}
		
		public Map<String, Object> toObject() {
			HashMap<String,Object> attempts = new HashMap<String,Object>();
			attempts.put("attempt", attempt);
			attempts.put("startTime", startTime);
			attempts.put("endTime", endTime);
			attempts.put("status", status.toString());
			return attempts;
		}
	}
}