ExecutorManagerRemoteAdapter.java

762 lines | 26.045 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn, Inc
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;

import azkaban.project.Project;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;

/**
 * Executor manager used to manage the client side job.
 *
 */
public class ExecutorManagerRemoteAdapter implements ExecutorManagerAdapter {
	private static Logger logger = Logger.getLogger(ExecutorManagerRemoteAdapter.class);
	private ExecutorLoader executorLoader;
	private String executorHost;
	private int executorPort;
	private String executorManagerHost;
	private int executorManagerPort;
	
	private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
	private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();

	private UpdaterThread updater;
	
	private long lastThreadCheckTime = -1;
	
	public ExecutorManagerRemoteAdapter(Props props, ExecutorLoader loader) throws ExecutorManagerException {
		this.executorLoader = loader;
		this.loadRunningFlows();
		
		executorHost = props.getString("executor.host", "localhost");
		executorPort = props.getInt("executor.port");
		
		executorManagerHost = props.getString(REMOTE_EXECUTOR_MANAGER_HOST);
		executorManagerPort = props.getInt(REMOTE_EXECUTOR_MANAGER_PORT);
		
		updater = new UpdaterThread();
		updater.start();

	}
	
	@Override
	public State getExecutorManagerThreadState() {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_JMX_TYPE, "getExecutorManagerThreadState"));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
			return (State) response.get(INFO_JMX_DATA);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return null;
		}
	}
	
	@Override
	public boolean isExecutorManagerThreadActive() {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_JMX_TYPE, "isExecutorManagerThreadActive"));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
			return (Boolean) response.get(INFO_JMX_DATA);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return false;
		}		
	}
	
	@Override
	public long getLastExecutorManagerThreadCheckTime() {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_JMX_TYPE, "getLastExecutorManagerThreadCheckTime"));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_GET_JMX, "azkaban", params);
			return (Long) response.get(INFO_JMX_DATA);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return -1;
		}	
	}
	
	@Override
	public Set<String> getPrimaryServerHosts() {
		// Only one for now. More probably later.
		HashSet<String> ports = new HashSet<String>();
		ports.add(executorHost + ":" + executorPort);
		return ports;
	}
	
	@Override
	public Set<String> getAllActiveExecutorServerHosts() {
		// Includes non primary server/hosts
		HashSet<String> ports = new HashSet<String>();
		ports.add(executorHost + ":" + executorPort);
		for(Pair<ExecutionReference, ExecutableFlow> running: runningFlows.values()) {
			ExecutionReference ref = running.getFirst();
			ports.add(ref.getHost() + ":" + ref.getPort());
		}
		
		return ports;
	}
	
//	private ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
//		ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
//		return exflow;
//	}
	
	private void loadRunningFlows() throws ExecutorManagerException {
		runningFlows.putAll(executorLoader.fetchActiveFlows());
	}
	
	@Override
	public List<Integer> getRunningFlows(int projectId, String flowId) {
		ArrayList<Integer> executionIds = new ArrayList<Integer>();
		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
			if (ref.getSecond().getFlowId().equals(flowId)) {
				executionIds.add(ref.getFirst().getExecId());
			}
		}
		return executionIds;
	}
	
	@Override
	public boolean isFlowRunning(int projectId, String flowId) {
		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
			if (ref.getSecond().getProjectId() == projectId && ref.getSecond().getFlowId().equals(flowId)) {
				return true;
			}
		}
		return false;
	}
	
	@Override
	public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
		Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
		if (active == null) {
			return executorLoader.fetchExecutableFlow(execId);
		}
		return active.getSecond();
	}
	
	@Override
	public List<ExecutableFlow> getRunningFlows() {
		ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
			flows.add(ref.getSecond());
		}
		return flows;
	}
	
	@Override
	public List<ExecutableFlow> getRecentlyFinishedFlows() {
		return new ArrayList<ExecutableFlow>(recentlyFinished.values());
	}
	
	@Override
	public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
		return flows;
	}
	
	@Override
	public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
		return flows;
	}
	
	@Override
	public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
		return flows;
	}

	@Override
	public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
		return flows;
	}
	
	@Override
	public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
		List<ExecutableJobInfo> nodes = executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
		return nodes;
	}
	
	@Override
	public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException{
		return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
	}
	
	@Override
	public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException{
		return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
	}
	
	@Override
	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
		if (pair != null) {
			Pair<String,String> typeParam = new Pair<String,String>("type", "flow");
			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
			
			@SuppressWarnings("unchecked")
			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, offsetParam, lengthParam);
			return LogData.createLogDataFromObject(result);
		}
		else {
			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
			return value;
		}
	}
	
	@Override
	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
		if (pair != null) {
			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
			
			@SuppressWarnings("unchecked")
			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
			return LogData.createLogDataFromObject(result);
		}
		else {
			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
			return value;
		}
	}
	
	@Override
	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
		if (pair != null) {

			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
			
			@SuppressWarnings("unchecked")
			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
			return JobMetaData.createJobMetaDataFromObject(result);
		}
		else {
			return null;
		}
	}
	
	@Override
	public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_CANCEL_FLOW, userId, params);
			if(response.containsKey(INFO_ERROR)) {
				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
			}
		} catch (Exception e) {
			throw new ExecutorManagerException(e);
		}	
	}
	
	@Override
	public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_RESUME_FLOW, userId, params);
			if(response.containsKey(INFO_ERROR)) {
				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
			}
		} catch (Exception e) {
			throw new ExecutorManagerException(e);
		}	
	}
	
	@Override
	public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_EXEC_ID, String.valueOf(exFlow.getExecutionId())));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_PAUSE_FLOW, userId, params);
			if(response.containsKey(INFO_ERROR)) {
				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
			}
		} catch (Exception e) {
			throw new ExecutorManagerException(e);
		}	
	}
	
	@Override
	public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
	}
	
	@Override
	public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
	}
	
	@Override
	public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
	}
	
	@Override
	public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
	}
	
	@Override
	public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
	}
	
	@Override
	public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
	}
	
	@Override
	public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
	}
	
	@SuppressWarnings("unchecked")
	private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
		synchronized(exFlow) {
			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
			if (pair == null) {
				throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
			}
			
			Map<String, Object> response = null;
			if (jobIds != null && jobIds.length > 0) {
				for (String jobId: jobIds) {
					if (!jobId.isEmpty()) {
						ExecutableNode node = exFlow.getExecutableNode(jobId);
						if (node == null) {
							throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
						}
					}
				}
				String ids = StringUtils.join(jobIds, ',');
				response = callExecutorServer(
						pair.getFirst(), 
						ConnectorParams.MODIFY_EXECUTION_ACTION, 
						userId, 
						new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command), 
						new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
			}
			else {
				response = callExecutorServer(
						pair.getFirst(), 
						ConnectorParams.MODIFY_EXECUTION_ACTION, 
						userId, 
						new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
			}
			
			return response;
		}
	}
	
	@Override
	public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
		List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
		params.add(new Pair<String, String>(INFO_EXEC_FLOW_JSON, JSONUtils.toJSON(exflow.toObject())));
		Map<String, Object> response;
		try {
			response = callRemoteExecutorManager(ACTION_SUBMIT_FLOW, userId, params);
			if(response.containsKey(INFO_ERROR)) {
				throw new ExecutorManagerException((String)response.get(INFO_ERROR));
			}
			String message = (String) response.get(INFO_MESSAGE);
			return message;
		} catch (Exception e) {
			throw new ExecutorManagerException(e);
		}	
	}
	
	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, Pair<String,String> ... params) throws ExecutorManagerException {
		try {
			return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), null, params);
		} catch (IOException e) {
			throw new ExecutorManagerException(e);
		}
	}
	
	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user, Pair<String,String> ... params) throws ExecutorManagerException {
		try {
			return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, params);
		} catch (IOException e) {
			throw new ExecutorManagerException(e);
		}
	}
	
	private Map<String, Object> callExecutorServer(String host, int port, String action, Integer executionId, String user, Pair<String,String> ... params) throws IOException {
		URIBuilder builder = new URIBuilder();
		builder.setScheme("http")
			.setHost(host)
			.setPort(port)
			.setPath("/executor");

		builder.setParameter(ConnectorParams.ACTION_PARAM, action);
		
		if (executionId != null) {
			builder.setParameter(ConnectorParams.EXECID_PARAM,String.valueOf(executionId));
		}
		
		if (user != null) {
			builder.setParameter(ConnectorParams.USER_PARAM, user);
		}
		
		if (params != null) {
			for (Pair<String, String> pair: params) {
				builder.setParameter(pair.getFirst(), pair.getSecond());
			}
		}

		URI uri = null;
		try {
			uri = builder.build();
		} catch (URISyntaxException e) {
			throw new IOException(e);
		}
		
		ResponseHandler<String> responseHandler = new BasicResponseHandler();
		
		HttpClient httpclient = new DefaultHttpClient();
		HttpGet httpget = new HttpGet(uri);
		String response = null;
		try {
			response = httpclient.execute(httpget, responseHandler);
		} catch (IOException e) {
			throw e;
		}
		finally {
			httpclient.getConnectionManager().shutdown();
		}
		
		@SuppressWarnings("unchecked")
		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
		String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
		if (error != null) {
			throw new IOException(error);
		}
		
		return jsonResponse;
	}
	
	@Override
	public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
		URIBuilder builder = new URIBuilder();
		
		String[] hostPortSplit = hostPort.split(":");
		builder.setScheme("http")
			.setHost(hostPortSplit[0])
			.setPort(Integer.parseInt(hostPortSplit[1]))
			.setPath("/jmx");

		builder.setParameter(action, "");
		if (mBean != null) {
			builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
		}

		URI uri = null;
		try {
			uri = builder.build();
		} catch (URISyntaxException e) {
			throw new IOException(e);
		}
		
		ResponseHandler<String> responseHandler = new BasicResponseHandler();
		
		HttpClient httpclient = new DefaultHttpClient();
		HttpGet httpget = new HttpGet(uri);
		String response = null;
		try {
			response = httpclient.execute(httpget, responseHandler);
		} catch (IOException e) {
			throw e;
		}
		finally {
			httpclient.getConnectionManager().shutdown();
		}
		
		@SuppressWarnings("unchecked")
		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
		String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
		if (error != null) {
			throw new IOException(error);
		}
		return jsonResponse;
	}
	
	private Map<String, Object> callRemoteExecutorManager(String action, String user, List<Pair<String,String>> params) throws IOException {
		URIBuilder builder = new URIBuilder();
		builder.setScheme("http")
			.setHost(executorManagerHost)
			.setPort(executorManagerPort)
			.setPath(ExecutorManagerServlet.URL);

		builder.setParameter(ConnectorParams.ACTION_PARAM, action);
		
		if (user != null) {
			builder.setParameter(ConnectorParams.USER_PARAM, user);
		}
		
		if (params != null) {
			for (Pair<String, String> pair: params) {
				builder.setParameter(pair.getFirst(), pair.getSecond());
			}
		}

		URI uri = null;
		try {
			uri = builder.build();
		} catch (URISyntaxException e) {
			throw new IOException(e);
		}
		
		ResponseHandler<String> responseHandler = new BasicResponseHandler();
		
		HttpClient httpclient = new DefaultHttpClient();
		HttpGet httpget = new HttpGet(uri);
		String response = null;
		try {
			response = httpclient.execute(httpget, responseHandler);
		} catch (IOException e) {
			throw e;
		}
		finally {
			httpclient.getConnectionManager().shutdown();
		}
		
		@SuppressWarnings("unchecked")
		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
		String error = (String)jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
		if (error != null) {
			throw new IOException(error);
		}
		
		return jsonResponse;
	}
	
	@Override
	public void shutdown() {
		updater.shutdown();
	}
	
	private class UpdaterThread extends Thread {
		private boolean shutdown = false;

		public UpdaterThread() {
			this.setName("ExecutorManagerRemoteUpdaterThread");
		}
		
		// 10 mins recently finished threshold.
		private long recentlyFinishedLifetimeMs = 600000;
		private int waitTimeIdleMs = 2000;
		private int waitTimeMs = 500;
		
		private void shutdown() {
			shutdown = true;
		}
		
		@SuppressWarnings("unchecked")
		public void run() {
			while(!shutdown) {
				try {
					lastThreadCheckTime = System.currentTimeMillis();
					
//					loadRunningFlows();
					List<Pair<String, String>> params = new ArrayList<Pair<String,String>>();
					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
					
					List<Long> updateTimesList = new ArrayList<Long>();
					List<Integer> executionIdsList = new ArrayList<Integer>();
				
					// We pack the parameters of the same host together before we query.
					fillUpdateTimeAndExecId(executionIdsList, updateTimesList);
					
					params.add(new Pair<String, String>(INFO_UPDATE_TIME_LIST, JSONUtils.toJSON(updateTimesList)));
					params.add(new Pair<String, String>(INFO_EXEC_ID_LIST, JSONUtils.toJSON(executionIdsList)));
					
					Map<String, Object> results = null;
					try {
						results = callRemoteExecutorManager(ACTION_UPDATE, "azkaban", params);
					} catch (IOException e) {
						logger.error(e);
					}
					
					// We gets results
					if (results != null) {
						List<Map<String,Object>> executionUpdates = (List<Map<String,Object>>)results.get(INFO_UPDATES);
						for (Map<String,Object> updateMap: executionUpdates) {
							try {
								ExecutableFlow flow = updateExecution(updateMap);
								if (isFinished(flow)) {
									finishedFlows.add(flow);
								}
							} catch (ExecutorManagerException e) {
								logger.error(e);
							}
						}

						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
						// Add new finished
						for (ExecutableFlow flow: finishedFlows) {
							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
								ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
							}
							recentlyFinished.put(flow.getExecutionId(), flow);
						}
						

					}
					
					synchronized(this) {
						try {
							if (runningFlows.size() > 0) {
								this.wait(waitTimeMs);
							}
							else {
								this.wait(waitTimeIdleMs);
							}
						} catch (InterruptedException e) {
						}
					}
				}
				catch (Exception e) {
					logger.error(e);
				}
			}
		}
	}
	
	private void evictOldRecentlyFinished(long ageMs) {
		ArrayList<Integer> recentlyFinishedKeys = new ArrayList<Integer>(recentlyFinished.keySet());
		long oldAgeThreshold = System.currentTimeMillis() - ageMs;
		for (Integer key: recentlyFinishedKeys) {
			ExecutableFlow flow = recentlyFinished.get(key);
			
			if (flow.getEndTime() < oldAgeThreshold) {
				// Evict
				recentlyFinished.remove(key);
			}
		}
	}
	
	private ExecutableFlow updateExecution(Map<String,Object> updateData) throws ExecutorManagerException {
		
		Integer execId = (Integer)updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
		if (execId == null) {
			throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
		}
		
		Pair<ExecutionReference, ExecutableFlow> refPair = this.runningFlows.get(execId);
		if (refPair == null) {
			throw new ExecutorManagerException("No running flow found with the execution id.");
		}
		
		ExecutionReference ref = refPair.getFirst();
		ExecutableFlow flow = refPair.getSecond();
		if (updateData.containsKey("error")) {
			// The flow should be finished here.
			throw new ExecutorManagerException((String)updateData.get("error"), flow);
		}
		
		// Reset errors.
		ref.setNextCheckTime(0);
		ref.setNumErrors(0);
		flow.applyUpdateObject(updateData);
	
		return flow;
	}
	
	public boolean isFinished(ExecutableFlow flow) {
		switch(flow.getStatus()) {
		case SUCCEEDED:
		case FAILED:
		case KILLED:
			return true;
		default:
			return false;
		}
	}
	
	private void fillUpdateTimeAndExecId(List<Integer> executionIds, List<Long> updateTimes) {
		for (Pair<ExecutionReference, ExecutableFlow> flow: runningFlows.values()) {
			executionIds.add(flow.getSecond().getExecutionId());
			updateTimes.add(flow.getSecond().getUpdateTime());
		}
	}
	
	@Override
	public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
		outputList.addAll(flows);
		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
	}

	@Override
	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
		return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
	}


}