FileProjectManager.java

501 lines | 14.994 kB Blame History Raw Download
package azkaban.project;

import java.io.File;
import java.io.FileFilter;
import java.io.FileWriter;
import java.io.IOException;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;

import org.apache.log4j.Logger;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;

/**
 * A project loader that stores everything on local file system. The following
 * global parameters should be set - file.project.loader.path - The project
 * install path where projects will be loaded installed to.
 */
public class FileProjectManager implements ProjectManager {
	public static final String DIRECTORY_PARAM = "file.project.loader.path";
	private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
	private static final String PROPERTIES_FILENAME = "project.json";
	private static final String PROJECT_DIRECTORY = "src";
	private static final String FLOW_EXTENSION = ".flow";
	private static final Logger logger = Logger.getLogger(FileProjectManager.class);
	private static final int IDLE_SECONDS = 120;
	
	private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
	private CacheManager manager = CacheManager.create();
	private Cache sourceCache;

	private File projectDirectory;

	public FileProjectManager(Props props) {
		setupDirectories(props);
		loadAllProjects();
		setupCache();
	}

	private void setupCache() {
		CacheConfiguration cacheConfig = new CacheConfiguration("propsCache",2000)
				.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
				.overflowToDisk(false)
				.eternal(false)
				.timeToIdleSeconds(IDLE_SECONDS)
				.diskPersistent(false)
				.diskExpiryThreadIntervalSeconds(0);

		sourceCache = new Cache(cacheConfig);
		manager.addCache(sourceCache);
	}

	private void setupDirectories(Props props) {
		String projectDir = props.getString(DIRECTORY_PARAM);
		logger.info("Using directory " + projectDir + " as the project directory.");
		projectDirectory = new File(projectDir);
		
		if (!projectDirectory.exists()) {
			logger.info("Directory " + projectDir + " doesn't exist. Creating.");
			if (projectDirectory.mkdirs()) {
				logger.info("Directory creation was successful.");
			} 
			else {
				throw new RuntimeException(
						"FileProjectLoader cannot create directory " + projectDirectory);
			}
		} 
		else if (projectDirectory.isFile()) {
			throw new RuntimeException("FileProjectManager directory " + projectDirectory + " is really a file.");
		}
	}

	private void loadAllProjects() {
		File[] directories = projectDirectory.listFiles();

		for (File dir : directories) {
			if (!dir.isDirectory()) {
				logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory.");
			} 
			else {
				File propertiesFile = new File(dir, PROPERTIES_FILENAME);
				if (!propertiesFile.exists()) {
					logger.error("ERROR loading project from " + dir.getPath()
							+ ". Project file " + PROPERTIES_FILENAME
							+ " not found.");
				} 
				else {
					Object obj = null;
					try {
						obj = JSONUtils.parseJSONFromFile(propertiesFile);
					} 
					catch (IOException e) {
						logger.error( "ERROR loading project from " + dir.getPath() 
								+ ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e);
						continue;
					}

					Project project = Project.projectFromObject(obj);
					logger.info("Loading project " + project.getName());
					projects.put(project.getName(), project);

					String source = project.getSource();
					if (source == null) {
						logger.info(project.getName() + ": No flows uploaded");
						continue;
					}

					File projectDir = new File(dir, source);
					if (!projectDir.exists()) {
						logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
					} 
					else if (!projectDir.isDirectory()) {
						logger.error("ERROR project source dir " + projectDir + " is not a directory.");
					} 
					else {
						File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
						
						Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
						for (File flowFile : flowFiles) {
							Object objectizedFlow = null;
							try {
								objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
							} 
							catch (IOException e) {
								logger.error("Error parsing flow file " + flowFile.toString(), e);
								continue;
							}

							// Recreate Flow
							Flow flow = null;

							try {
								flow = Flow.flowFromObject(objectizedFlow);
							} 
							catch (Exception e) {
								logger.error(
										"Error loading flow "
												+ flowFile.getName()
												+ " in project "
												+ project.getName(), e);
								continue;
							}
							logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
							flow.initialize();

							flowMap.put(flow.getId(), flow);
						}

						synchronized (project) {
							project.setFlows(flowMap);
						}
					}
				}
			}
		}
	}

	public List<String> getProjectNames() {
		return new ArrayList<String>(projects.keySet());
	}

	public List<Project> getProjects(User user) {
		ArrayList<Project> array = new ArrayList<Project>();
		for (Project project : projects.values()) {
			Permission perm = project.getUserPermission(user);

			if (perm != null && (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ))) {
				array.add(project);
			}
		}
		return array;
	}

	public Project getProject(String name, User user) {
		Project project = projects.get(name);
		if (project != null) {
			Permission perm = project.getUserPermission(user);
			if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
				return project;
			} else {
				throw new AccessControlException( "Permission denied. Do not have read access.");
			}
		}
		return project;
	}

	public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
		logger.info("Uploading files to " + projectName);
		Project project = projects.get(projectName);

		if (project == null) {
			throw new ProjectManagerException("Project not found.");
		}
		if (!project.hasPermission(uploader, Type.WRITE)) {
			throw new AccessControlException("Permission denied. Do not have write access.");
		}

		List<String> errors = new ArrayList<String>();
		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
		loader.loadProjectFlow(dir);
		Map<String, Flow> flows = loader.getFlowMap();

		File projectPath = new File(projectDirectory, projectName);
		File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
		
		if (!installDir.mkdir()) {
			throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
		}

		for (Flow flow : flows.values()) {
			try {
				if (flow.getErrors() != null) {
					errors.addAll(flow.getErrors());
				}
				flow.initialize();

				writeFlowFile(installDir, flow);
			}
			catch (IOException e) {
				throw new ProjectManagerException("Project directory "
						+ projectName + " cannot be created in "
						+ projectDirectory, e);
			}
		}

		File destDirectory = new File(installDir, PROJECT_DIRECTORY);
		dir.renameTo(destDirectory);

		// We install only if the project is not forced install or has no errors
		if (force || errors.isEmpty()) {
			// We synchronize on project so that we don't collide when
			// uploading.
			synchronized (project) {
				logger.info("Uploading files to " + projectName);
				project.setSource(installDir.getName());
				project.setLastModifiedTimestamp(System.currentTimeMillis());
				project.setLastModifiedUser(uploader.getUserId());
				project.setFlows(flows);
			}

			try {
				writeProjectFile(projectPath, project);
			} 
			catch (IOException e) {
				throw new ProjectManagerException("Project directory "
						+ projectName + " cannot be created in "
						+ projectDirectory, e);
			}
		} 
		else {
			logger.info("Errors found loading project " + projectName);
			StringBuffer bufferErrors = new StringBuffer();
			for (String error : errors) {
				bufferErrors.append(error);
				bufferErrors.append("\n");
			}

			throw new ProjectManagerException(bufferErrors.toString());
		}
	}

	@Override
	public synchronized Project createProject(String projectName,
			String description, User creator) throws ProjectManagerException {
		if (projectName == null || projectName.trim().isEmpty()) {
			throw new ProjectManagerException("Project name cannot be empty.");
		} 
		else if (description == null || description.trim().isEmpty()) {
			throw new ProjectManagerException("Description cannot be empty.");
		} 
		else if (creator == null) {
			throw new ProjectManagerException("Valid creator user must be set.");
		} 
		else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")) {
			throw new ProjectManagerException("Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
		}

		if (projects.contains(projectName)) {
			throw new ProjectManagerException("Project already exists.");
		}

		File projectPath = new File(projectDirectory, projectName);
		if (projectPath.exists()) {
			throw new ProjectManagerException("Project already exists.");
		}

		if (!projectPath.mkdirs()) {
			throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory);
		}

		Permission perm = new Permission(Type.ADMIN);
		long time = System.currentTimeMillis();

		Project project = new Project(projectName);
		project.setUserPermission(creator.getUserId(), perm);
		project.setDescription(description);
		project.setCreateTimestamp(time);
		project.setLastModifiedTimestamp(time);
		project.setLastModifiedUser(creator.getUserId());

		logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
		try {
			writeProjectFile(projectPath, project);
		} 
		catch (IOException e) {
			throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory, e);
		}
		projects.put(projectName, project);
		
		return project;
	}

	private synchronized void writeProjectFile(File directory, Project project) throws IOException {
		Object object = project.toObject();
		File tmpFile = File.createTempFile("project-", ".json", directory);

		if (tmpFile.exists()) {
			tmpFile.delete();
		}

		logger.info("Writing project file " + tmpFile);
		String output = JSONUtils.toJSON(object, true);

		FileWriter writer = new FileWriter(tmpFile);
		try {
			writer.write(output);
		} 
		catch (IOException e) {
			if (writer != null) {
				writer.close();
			}

			throw e;
		}
		writer.close();

		File projectFile = new File(directory, PROPERTIES_FILENAME);
		File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");

		projectFile.renameTo(swapFile);
		tmpFile.renameTo(projectFile);
		swapFile.delete();

	}

	private void writeFlowFile(File directory, Flow flow) throws IOException {
		Object object = flow.toObject();
		String filename = flow.getId() + FLOW_EXTENSION;
		File outputFile = new File(directory, filename);
		File oldOutputFile = new File(directory, filename + ".old");

		if (outputFile.exists()) {
			outputFile.renameTo(oldOutputFile);
		}

		logger.info("Writing flow file " + outputFile);
		String output = JSONUtils.toJSON(object, true);

		FileWriter writer = new FileWriter(outputFile);
		try {
			writer.write(output);
		} 
		catch (IOException e) {
			if (writer != null) {
				writer.close();
			}

			throw e;
		}
		writer.close();

		if (oldOutputFile.exists()) {
			oldOutputFile.delete();
		}
	}

	@Override
	public Props getProperties(String projectName, String source, User user)
			throws ProjectManagerException {
		Project project = projects.get(projectName);
		if (project == null) {
			throw new ProjectManagerException("Project " + project + " cannot be found.");
		}

		return getProperties(project, source, user);
	}

	@Override
	public Props getProperties(Project project, String source, User user)
			throws ProjectManagerException {
		if (!project.hasPermission(user, Type.READ)) {
			throw new AccessControlException(
					"Permission denied. Do not have read access.");
		}

		String mySource = project.getName() + File.separatorChar
				+ project.getSource() + File.separatorChar + "src"
				+ File.separatorChar + source;
		Element sourceElement = sourceCache.get(mySource);

		if (sourceElement != null) {
			return Props.clone((Props) sourceElement.getObjectValue());
		}

		File file = new File(projectDirectory, mySource);
		if (!file.exists()) {
			throw new ProjectManagerException("Source file " + file.getAbsolutePath() + " doesn't exist.");
		}

		try {
			Props props = new Props((Props) null, file);
			return props;
		} 
		catch (IOException e) {
			throw new ProjectManagerException("Error loading file " + file.getPath(), e);
		}
	}

	@Override
	public synchronized Project removeProject(String projectName, User user) {
		return null;
	}

	private static class SuffixFilter implements FileFilter {
		private String suffix;

		public SuffixFilter(String suffix) {
			this.suffix = suffix;
		}

		@Override
		public boolean accept(File pathname) {
			String name = pathname.getName();

			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
		}
	}

	@Override
	public void commitProject(String projectName) throws ProjectManagerException {
		Project project = projects.get(projectName);
		if (project == null) {
			throw new ProjectManagerException("Project " + projectName + " doesn't exist.");
		}

		File projectPath = new File(projectDirectory, projectName);
		try {
			writeProjectFile(projectPath, project);
		} 
		catch (IOException e) {
			throw new ProjectManagerException("Error committing project " + projectName, e);
		}
	}

	@Override
	public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException {
		Flow flow = project.getFlow(flowId);
		if (flow == null) {
			throw new ProjectManagerException("Flow " + flowId + " doesn't exist in " + project.getName());
		}

		// Resolve all the node probs
		HashMap<String, Props> sourceMap = new HashMap<String, Props>();
		for (Node node : flow.getNodes()) {
			String source = node.getJobSource();
			Props props = getProperties(project, node.getJobSource(), user);
			sourceMap.put(source, props);
		}

		// Resolve all the shared props.
		for(FlowProps flowProps: flow.getAllFlowProps().values()) {
			String source = flowProps.getSource();
			Props props = getProperties(project, source, user);
			sourceMap.put(source, props);
		}
		
		return sourceMap;
	}

}