FlowRunnerManager.java

695 lines | 22.111 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.execapp;

import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

import azkaban.project.ProjectLoader;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;

import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;

/**
 * Execution manager for the server side execution.
 * 
 */
public class FlowRunnerManager implements EventListener {
	private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
	private File executionDirectory;
	private File projectDirectory;

	private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60*1000; // recently finished secs to clean up. 1 minute
	
	private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
	private Map<Pair<Integer,Integer>, ProjectVersion> installedProjects = new ConcurrentHashMap<Pair<Integer,Integer>, ProjectVersion>();
	private Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();
	private Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
	private LinkedBlockingQueue<FlowRunner> flowQueue = new LinkedBlockingQueue<FlowRunner>();
	private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;

	private ExecutorService executorService;
	private SubmitterThread submitterThread;
	private CleanerThread cleanerThread;
	private int numJobThreadPerFlow = 10;
	
	private ExecutorLoader executorLoader;
	private ProjectLoader projectLoader;
	
	private JobTypeManager jobtypeManager;
	
	private Props globalProps;
	
	private final Props azkabanProps;
	
	private long lastSubmitterThreadCheckTime = -1;
	private long lastCleanerThreadCheckTime = -1;
	private long executionDirRetention = 1*24*60*60*1000;
	
	// We want to limit the log sizes to about 20 megs
	private String jobLogChunkSize = "5MB";
	private int jobLogNumFiles = 4;
	
	// If true, jobs will validate proxy user against a list of valid proxy users.
	private boolean validateProxyUser = false;
	
	private Object executionDirDeletionSync = new Object();
		
	public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
		executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
		projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
		
		azkabanProps = props;
		
		//JobWrappingFactory.init(props, getClass().getClassLoader());
		executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
		logger.info("Execution dir retention set to " + executionDirRetention + " ms");
		
		if (!executionDirectory.exists()) {
			executionDirectory.mkdirs();
		}
		if (!projectDirectory.exists()) {
			projectDirectory.mkdirs();
		}

		installedProjects = loadExistingProjects();
		
		//azkaban.temp.dir
		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
		numJobThreadPerFlow = props.getInt("flow.num.job.threads", numJobThreadPerFlow);
		executorService = Executors.newFixedThreadPool(numThreads);
		
		this.executorLoader = executorLoader;
		this.projectLoader = projectLoader;
		
		this.jobLogChunkSize = azkabanProps.getString("job.log.chunk.size", "5MB");
		this.jobLogNumFiles = azkabanProps.getInt("job.log.backup.index", 4);
		
		this.validateProxyUser = azkabanProps.getBoolean("proxy.user.lock.down", false);
		
		submitterThread = new SubmitterThread(flowQueue);
		submitterThread.start();
		
		cleanerThread = new CleanerThread();
		cleanerThread.start();
		
		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
	}

	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
		Map<Pair<Integer, Integer>, ProjectVersion> allProjects = new HashMap<Pair<Integer,Integer>, ProjectVersion>();
		for(File project : projectDirectory.listFiles(new FilenameFilter() {
			
			String pattern = "[0-9]+\\.[0-9]+";
			@Override
			public boolean accept(File dir, String name) {
				return name.matches(pattern);
			}
		})) {
			if(project.isDirectory()) {
				try {
					String fileName = new File(project.getAbsolutePath()).getName();
					int projectId = Integer.parseInt(fileName.split("\\.")[0]);
					int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
					ProjectVersion version = new ProjectVersion(projectId, versionNum, project);
					allProjects.put(new Pair<Integer, Integer>(projectId, versionNum), version);
				}
				catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		return allProjects;
	}

	public Props getGlobalProps() {
		return globalProps;
	}
	
	public void setGlobalProps(Props globalProps) {
		this.globalProps = globalProps;
	}
	
	private class SubmitterThread extends Thread {
		private BlockingQueue<FlowRunner> queue;
		private boolean shutdown = false;
		
		public SubmitterThread(BlockingQueue<FlowRunner> queue) {
			this.setName("FlowRunnerManager-Submitter-Thread");
			this.queue = queue;
		}

		@SuppressWarnings("unused")
		public void shutdown() {
			shutdown = true;
			this.interrupt();
		}

		public void run() {
			while (!shutdown) {
				try {
					lastSubmitterThreadCheckTime = System.currentTimeMillis();
					FlowRunner flowRunner = queue.take();
					executorService.submit(flowRunner);
				} catch (InterruptedException e) {
					logger.info("Interrupted. Probably to shut down.");
				}
			}
		}
	}
	
	private class CleanerThread extends Thread {
		// Every hour, clean execution dir.
		private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60*60*1000;
		// Every 5 mins clean the old project dir
		private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5*60*1000;
		// Every 2 mins clean the recently finished list
		private static final long RECENTLY_FINISHED_INTERVAL_MS = 2*60*1000;
		
		private boolean shutdown = false;
		private long lastExecutionDirCleanTime = -1;
		private long lastOldProjectCleanTime = -1;
		private long lastRecentlyFinishedCleanTime = -1;
		
		public CleanerThread() {
			this.setName("FlowRunnerManager-Cleaner-Thread");
		}
		
		@SuppressWarnings("unused")
		public void shutdown() {
			shutdown = true;
			this.interrupt();
		}
		
		public void run() {
			while (!shutdown) {
				synchronized (this) {
					try {
						lastCleanerThreadCheckTime = System.currentTimeMillis();
						
						// Cleanup old stuff.
						long currentTime = System.currentTimeMillis();
						if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > lastRecentlyFinishedCleanTime) {
							logger.info("Cleaning recently finished");
							cleanRecentlyFinished();
							lastRecentlyFinishedCleanTime = currentTime;
						}
						
						if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime) {
							logger.info("Cleaning old projects");
							cleanOlderProjects();
							
							lastOldProjectCleanTime = currentTime;
						}
						
						if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > lastExecutionDirCleanTime) {
							logger.info("Cleaning old execution dirs");
							cleanOlderExecutionDirs();
							lastExecutionDirCleanTime = currentTime;
						}
						
						wait(RECENTLY_FINISHED_TIME_TO_LIVE);
					} catch (InterruptedException e) {
						logger.info("Interrupted. Probably to shut down.");
					}
				}
			}
		}
	
		private void cleanOlderExecutionDirs() {
			File dir = executionDirectory;
			
			final long pastTimeThreshold = System.currentTimeMillis() - executionDirRetention;
			File[] executionDirs = dir.listFiles(new FileFilter() {
				@Override
				public boolean accept(File path) {
					if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
						return true;
					}
					return false;
				}
			});
			
			for (File exDir : executionDirs) {
				try {
					int execId = Integer.valueOf(exDir.getName());
					if (runningFlows.containsKey(execId) || recentlyFinishedFlows.containsKey(execId)) {
						continue;
					}
				}
				catch (NumberFormatException e) {
					logger.error("Can't delete exec dir " + exDir.getName() + " it is not a number");
					continue;
				}
			
				synchronized(executionDirDeletionSync) {
					try {
						FileUtils.deleteDirectory(exDir);
					} catch (IOException e) {
						logger.error("Error cleaning execution dir " + exDir.getPath(), e);
					}
				}
			}
		}
		
		private void cleanRecentlyFinished() {
			long cleanupThreshold = System.currentTimeMillis() - RECENTLY_FINISHED_TIME_TO_LIVE;
			ArrayList<Integer> executionToKill = new ArrayList<Integer>();
			for (ExecutableFlow flow : recentlyFinishedFlows.values()) {
				if (flow.getEndTime() < cleanupThreshold) {
					executionToKill.add(flow.getExecutionId());
				}
			}
			
			for (Integer id: executionToKill) {
				logger.info("Cleaning execution " + id + " from recently finished flows list.");
				recentlyFinishedFlows.remove(id);
			}
		}
		
		private void cleanOlderProjects() {
			Map<Integer, ArrayList<ProjectVersion>> projectVersions = new HashMap<Integer, ArrayList<ProjectVersion>>();
			for (ProjectVersion version : installedProjects.values() ) {
				ArrayList<ProjectVersion> versionList = projectVersions.get(version.getProjectId());
				if (versionList == null) {
					versionList = new ArrayList<ProjectVersion>();
					projectVersions.put(version.getProjectId(), versionList);
				}
				versionList.add(version);
			}
			
			HashSet<Pair<Integer,Integer>> activeProjectVersions = new HashSet<Pair<Integer,Integer>>();
			for(FlowRunner runner: runningFlows.values()) {
				ExecutableFlow flow = runner.getExecutableFlow();
				activeProjectVersions.add(new Pair<Integer,Integer>(flow.getProjectId(), flow.getVersion()));
			}
			
			for (Map.Entry<Integer, ArrayList<ProjectVersion>> entry: projectVersions.entrySet()) {
				//Integer projectId = entry.getKey();
				ArrayList<ProjectVersion> installedVersions = entry.getValue();
				
				// Keep one version of the project around.
				if (installedVersions.size() == 1) {
					continue;
				}
				
				Collections.sort(installedVersions);
				for (int i = 0; i < installedVersions.size() - 1; ++i) {
					ProjectVersion version = installedVersions.get(i);
					Pair<Integer,Integer> versionKey = new Pair<Integer,Integer>(version.getProjectId(), version.getVersion());
					if (!activeProjectVersions.contains(versionKey)) {
						try {
							logger.info("Removing old unused installed project " + version.getProjectId() + ":" + version.getVersion());
							version.deleteDirectory();
							installedProjects.remove(new Pair<Integer, Integer>(version.getProjectId(), version.getVersion()));
						} catch (IOException e) {
							e.printStackTrace();
						}
						
						installedVersions.remove(versionKey);
					}
				}
			}
		}
	}
	
	
	public void submitFlow(int execId) throws ExecutorManagerException {
		// Load file and submit
		if (runningFlows.containsKey(execId)) {
			throw new ExecutorManagerException("Execution " + execId + " is already running.");
		}
		
		ExecutableFlow flow = null;
		flow = executorLoader.fetchExecutableFlow(execId);
		if (flow == null) {
			throw new ExecutorManagerException("Error loading flow with exec " + execId);
		}
		
		// Sets up the project files and execution directory.
		setupFlow(flow);
		
		// Setup flow runner
		FlowWatcher watcher = null;
		ExecutionOptions options = flow.getExecutionOptions();
		if (options.getPipelineExecutionId() != null) {
			Integer pipelineExecId = options.getPipelineExecutionId();
			FlowRunner runner = runningFlows.get(pipelineExecId);
			
			if (runner != null) {
				watcher = new LocalFlowWatcher(runner);
			}
			else {
				watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
			}
		}

		int numJobThreads = numJobThreadPerFlow;
		if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
			try{
				int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
				if(numJobs > 0 && numJobs <= numJobThreads) {
					numJobThreads = numJobs;
				}
			} catch (Exception e) {
				throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
			}
		}
		
		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
		runner.setFlowWatcher(watcher)
			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
			.setValidateProxyUser(validateProxyUser)
			.setGlobalProps(globalProps)
			.setNumJobThreads(numJobThreads)
			.addListener(this);
		
		// Check again.
		if (runningFlows.containsKey(execId)) {
			throw new ExecutorManagerException("Execution " + execId + " is already running.");
		}
		
		// Finally, queue the sucker.
		runningFlows.put(execId, runner);
		flowQueue.add(runner);
	}
	
	private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
		int execId = flow.getExecutionId();
		File execPath = new File(executionDirectory, String.valueOf(execId));
		flow.setExecutionPath(execPath.getPath());
		logger.info("Flow " + execId + " submitted with path " + execPath.getPath());
		execPath.mkdirs();
		
		// We're setting up the installed projects. First time, it may take a while to set up.
		Pair<Integer, Integer> projectVersionKey = new Pair<Integer,Integer>(flow.getProjectId(), flow.getVersion());
		
		// We set up project versions this way
		ProjectVersion projectVersion = null;
		synchronized(installedProjects) {
			projectVersion = installedProjects.get(projectVersionKey);
			if (projectVersion == null) {
				projectVersion = new ProjectVersion(flow.getProjectId(), flow.getVersion());
				installedProjects.put(projectVersionKey, projectVersion);
			}
		}

		try {
			projectVersion.setupProjectFiles(projectLoader, projectDirectory, logger);
			projectVersion.copyCreateSymlinkDirectory(execPath);
		} catch (Exception e) {
			e.printStackTrace();
			if (execPath.exists()) {
				try {
					FileUtils.deleteDirectory(execPath);
				}
				catch (IOException e1) {
					e1.printStackTrace();
				}
			}
			throw new ExecutorManagerException(e);
		}
	}
	
	public void cancelFlow(int execId, String user) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		
		if (runner == null) {
			throw new ExecutorManagerException("Execution " + execId + " is not running.");
		}
		
		runner.kill(user);
	}
	
	public void pauseFlow(int execId, String user) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		
		if (runner == null) {
			throw new ExecutorManagerException("Execution " + execId + " is not running.");
		}
		
		runner.pause(user);
	}
	
	public void resumeFlow(int execId, String user) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		
		if (runner == null) {
			throw new ExecutorManagerException("Execution " + execId + " is not running.");
		}
		
		runner.resume(user);
	}
	
	public void retryFailures(int execId, String user) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		
		if (runner == null) {
			throw new ExecutorManagerException("Execution " + execId + " is not running.");
		}
		
		runner.retryFailures(user);
	}
	
	public ExecutableFlow getExecutableFlow(int execId) {
		FlowRunner runner = runningFlows.get(execId);
		if (runner == null) {
			return recentlyFinishedFlows.get(execId);
		}
		return runner.getExecutableFlow();
	}

	@Override
	public void handleEvent(Event event) {
		if (event.getType() == Event.Type.FLOW_FINISHED) {
			
			FlowRunner flowRunner = (FlowRunner)event.getRunner();
			ExecutableFlow flow = flowRunner.getExecutableFlow();

			recentlyFinishedFlows.put(flow.getExecutionId(), flow);
			logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
			runningFlows.remove(flow.getExecutionId());
		}
	}
	
	public LogData readFlowLogs(int execId, int startByte, int length) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		if (runner == null) {
			throw new ExecutorManagerException("Running flow " + execId + " not found.");
		}
		
		File dir = runner.getExecutionDir();
		if (dir != null && dir.exists()) {
			try {
				synchronized(executionDirDeletionSync) {
					if (!dir.exists()) {
						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
					}
					
					File logFile = runner.getFlowLogFile();
					if (logFile != null && logFile.exists()) {
						return FileIOUtils.readUtf8File(logFile, startByte, length);
					}
					else {
						throw new ExecutorManagerException("Flow log file doesn't exist.");
					}
				}
			} catch (IOException e) {
				throw new ExecutorManagerException(e);
			}
		}
		
		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
	}
	
	public LogData readJobLogs(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		if (runner == null) {
			throw new ExecutorManagerException("Running flow " + execId + " not found.");
		}
		
		File dir = runner.getExecutionDir();
		if (dir != null && dir.exists()) {
			try {
				synchronized (executionDirDeletionSync) {
					if (!dir.exists()) {
						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
					}
					File logFile = runner.getJobLogFile(jobId, attempt);
					if (logFile != null && logFile.exists()) {
						return FileIOUtils.readUtf8File(logFile, startByte, length);
					}
					else {
						throw new ExecutorManagerException("Job log file doesn't exist.");
					}
				}
			} catch (IOException e) {
				throw new ExecutorManagerException(e);
			}
		}
		
		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
	}

	public List<Object> readJobAttachments(int execId, String jobId, int attempt)
			throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		if (runner == null) {
			throw new ExecutorManagerException(
					"Running flow " + execId + " not found.");
		}

		File dir = runner.getExecutionDir();
		if (dir == null || !dir.exists()) {
			throw new ExecutorManagerException(
					"Error reading file. Log directory doesn't exist.");
		}

		try {
			synchronized (executionDirDeletionSync) {
				if (!dir.exists()) {
					throw new ExecutorManagerException(
							"Execution dir file doesn't exist. Probably has beend deleted");
				}

				File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
				if (attachmentFile == null || !attachmentFile.exists()) {
					return null;
				}
				
				@SuppressWarnings("unchecked")
        List<Object> jobAttachments = (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile); 
				
				return jobAttachments;
			}
		}
		catch (IOException e) {
			throw new ExecutorManagerException(e);
		}
	}
	
	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
		FlowRunner runner = runningFlows.get(execId);
		if (runner == null) {
			throw new ExecutorManagerException("Running flow " + execId + " not found.");
		}
		
		File dir = runner.getExecutionDir();
		if (dir != null && dir.exists()) {
			try {
				synchronized(executionDirDeletionSync) {
					if (!dir.exists()) {
						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
					}
					File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
					if (metaDataFile != null && metaDataFile.exists()) {
						return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte, length);
					}
					else {
						throw new ExecutorManagerException("Job log file doesn't exist.");
					}
				}
			} catch (IOException e) {
				throw new ExecutorManagerException(e);
			}
		}
		
		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
	}

	public long getLastCleanerThreadCheckTime() {
		return lastCleanerThreadCheckTime;
	}

	public long getLastSubmitterThreadCheckTime() {
		return lastSubmitterThreadCheckTime;
	}

	public boolean isSubmitterThreadActive() {
		return this.submitterThread.isAlive();
	}

	public boolean isCleanerThreadActive() {
		return this.cleanerThread.isAlive();
	}
	
	public State getSubmitterThreadState() {
		return this.submitterThread.getState();
	}

	public State getCleanerThreadState() {
		return this.cleanerThread.getState();
	}
	
	public boolean isExecutorThreadPoolShutdown() {
		return executorService.isShutdown();
	}
	
	public int getNumExecutingFlows() {
		return runningFlows.size();
	}
	
	public String getRunningFlowIds() {
		ArrayList<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
		Collections.sort(ids);
		return ids.toString();
	}

	public int getNumExecutingJobs() {
		int jobCount = 0;
		for (FlowRunner runner: runningFlows.values()) {
			jobCount += runner.getNumRunningJobs();
		}
		
		return jobCount;
	}

	
	
}