FileProjectManager.java

356 lines | 12.018 kB Blame History Raw Download
package azkaban.project;

import java.io.File;
import java.io.FileFilter;
import java.io.FileWriter;
import java.io.IOException;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.log4j.Logger;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import azkaban.flow.Flow;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.FlowUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;

/**
 * A project loader that stores everything on local file system.
 * The following global parameters should be set -
 * file.project.loader.path - The project install path where projects will be loaded installed to.
 */
public class FileProjectManager implements ProjectManager {
	public static final String DIRECTORY_PARAM = "file.project.loader.path";
	private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
	private static final String PROPERTIES_FILENAME = "project.json";
	private static final String PROJECT_DIRECTORY = "src";
	private static final String FLOW_EXTENSION = ".flow";
    private static final Logger logger = Logger.getLogger(FileProjectManager.class);
    private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
    
    // We store the flows for projects in the ProjectManager instead of the Project so we can employ different 
    // loading/caching techniques.
    private HashMap<String, Map<String, Flow>> projectFlows = new HashMap<String, Map<String, Flow>>();

	private File projectDirectory;
	
    public FileProjectManager(Props props) {
    	setupDirectories(props);
    	loadAllProjects();
    }

    private void setupDirectories(Props props) {
    	String projectDir = props.getString(DIRECTORY_PARAM);
		logger.info("Using directory " + projectDir + " as the project directory.");
    	projectDirectory = new File(projectDir);
    	if (!projectDirectory.exists()) {
    		logger.info("Directory " + projectDir + " doesn't exist. Creating.");
    		if (projectDirectory.mkdirs()) {
    			logger.info("Directory creation was successful.");
    		}
    		else {
    			throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
    		}
    	}
    	else if (projectDirectory.isFile()) {
			throw new RuntimeException("FileProjectManager directory " + projectDirectory + " is really a file.");
    	}
    }
    
    private void loadAllProjects() {
    	File[] directories = projectDirectory.listFiles();
    	
    	for (File dir: directories) {
    		if (!dir.isDirectory()) {
    			logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory." );
    		}
    		else {
    			File propertiesFile = new File(dir, PROPERTIES_FILENAME);
    			if (!propertiesFile.exists()) {
        			logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " not found." );
    			}
    			else {
    				Object obj = null;
    				try {
    					obj = JSONUtils.parseJSONFromFile(propertiesFile);
					} catch (IOException e) {
						logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e );
						continue;
					}
    				
    				Project project = Project.projectFromObject(obj);
    				logger.info("Loading project " + project.getName());
    				projects.put(project.getName(), project);
    			
    				String source = project.getSource();
    				if (source == null) {
    					logger.info(project.getName() + ": No flows uploaded");
    					return;
    				}
    				
    				File projectDir = new File(dir, source);
    				if (!projectDir.exists()) {
    					logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
    				}
    				else if (!projectDir.isDirectory()) {
    					logger.error("ERROR project source dir " + projectDir + " is not a directory.");
    				}
    				else {
    					File projectSourceDir = new File(projectDir, PROJECT_DIRECTORY);
    					FileResourceLoader loader = new FileResourceLoader(projectSourceDir);
    					File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
    					Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
    					for (File flowFile: flowFiles) {
							Object objectizedFlow = null;
    						try {
    							objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
							} catch (IOException e) {
								logger.error("Error parsing flow file " + flowFile.toString());
							}
    						
    						//Recreate Flow
    						Flow flow = Flow.flowFromObject(objectizedFlow, loader);
    						logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
    						flowMap.put(flow.getId(), flow);
    					}
    					
    					projectFlows.put(project.getName(), flowMap);
    				}
    			}
    		}
    	}
    }
    
    public List<String> getProjectNames() {
        return new ArrayList<String>(projects.keySet());
    }
    
    public List<Project> getProjects(User user) {
    	ArrayList<Project> array = new ArrayList<Project>();
    	for(Project project : projects.values()) {
    		Permission perm = project.getUserPermission(user);
    		if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
    			array.add(project);
    		}
    	}
    	return array;
    }
    
    public Project getProject(String name, User user) {
    	Project project = projects.get(name);
    	if (project != null) {
    		Permission perm = project.getUserPermission(user);
    		if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
    			return project;
    		}
    		else {
    			throw new AccessControlException("Permission denied. Do not have read access.");
    		}
    	}
    	return null;
    }
    
    public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
		logger.info("Uploading files to " + projectName);
    	Project project = projects.get(projectName);
    	
    	if (project == null) {
    		throw new ProjectManagerException("Project not found.");
    	}
		if (!project.hasPermission(uploader, Type.WRITE)) {
			throw new AccessControlException("Permission denied. Do not have write access.");
		}
    	
		Map<String, Flow> flows = new HashMap<String,Flow>();
		List<String> errors = new ArrayList<String>();
		FlowUtils.loadProjectFlows(dir, flows, errors);
		
    	File projectPath = new File(projectDirectory, projectName);
		File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
		if (!installDir.mkdir()) {
			throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
		}	
		
		for (Flow flow: flows.values()) {
	    	try {
				writeFlowFile(installDir, flow);
			} catch (IOException e) {
	    		throw new ProjectManagerException(
	    				"Project directory " + projectName + 
	    				" cannot be created in " + projectDirectory, e);
			}
		}
		
    	File destDirectory = new File(installDir, PROJECT_DIRECTORY);
		dir.renameTo(destDirectory);
		
		// We install only if the project is not forced install or has no errors
		if (force || errors.isEmpty()) {
	    	// We synchronize on project so that we don't collide when uploading.
	    	synchronized (project) {
	    		logger.info("Uploading files to " + projectName);
	    		project.setSource(installDir.getName());
	    		project.setLastModifiedTimestamp(System.currentTimeMillis());
	    		project.setLastModifiedUser(uploader.getUserId());
	    		projectFlows.put(projectName, flows);
	    	}
	    	
	    	try {
				writeProjectFile(projectPath, project);
			} catch (IOException e) {
	    		throw new ProjectManagerException(
	    				"Project directory " + projectName + 
	    				" cannot be created in " + projectDirectory, e);
			}
		}
		else {
			logger.info("Errors found loading project " + projectName);
			StringBuffer bufferErrors = new StringBuffer();
			for(String error : errors) {
				bufferErrors.append(error);
				bufferErrors.append("\n");
			}
			
			throw new ProjectManagerException(bufferErrors.toString());
		}
		
    }
    
    @Override
    public synchronized Project createProject(String projectName, String description, User creator) throws ProjectManagerException {
    	if (projectName == null || projectName.trim().isEmpty()) {
    		throw new ProjectManagerException("Project name cannot be empty.");
    	}
    	else if (description == null || description.trim().isEmpty()) {
    		throw new ProjectManagerException("Description cannot be empty.");
    	}
    	else if (creator == null) {
    		throw new ProjectManagerException("Valid creator user must be set.");
    	}

    	if (projects.contains(projectName)) {
    		throw new ProjectManagerException("Project already exists.");
    	}
    	
    	File projectPath = new File(projectDirectory, projectName);
    	if (projectPath.exists()) {
    		throw new ProjectManagerException("Project already exists.");
    	}
    	
    	if(!projectPath.mkdirs()) {
    		throw new ProjectManagerException(
    				"Project directory " + projectName + 
    				" cannot be created in " + projectDirectory);
    	}
    	
    	Permission perm = new Permission(Type.ADMIN);
    	long time = System.currentTimeMillis();
    	
    	Project project = new Project(projectName);
    	project.setUserPermission(creator.getUserId(), perm);
    	project.setDescription(description);
    	project.setCreateTimestamp(time);
    	project.setLastModifiedTimestamp(time);
    	project.setLastModifiedUser(creator.getUserId());
    	
    	logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
    	try {
			writeProjectFile(projectPath, project);
		} catch (IOException e) {
    		throw new ProjectManagerException(
    				"Project directory " + projectName + 
    				" cannot be created in " + projectDirectory, e);
		}
    	projects.put(projectName, project);
    	return project;
    }
    
    private synchronized void writeProjectFile(File directory, Project project) throws IOException {
    	Object object = project.toObject();
    	File tmpFile = File.createTempFile("project-",".json", directory);

    	if (tmpFile.exists()) {
    		tmpFile.delete();
    	}

    	logger.info("Writing project file " + tmpFile);
    	String output = JSONUtils.toJSON(object, true);
    	
    	FileWriter writer = new FileWriter(tmpFile);
    	try {
    		writer.write(output);
    	} catch (IOException e) {
    		if (writer != null) {
    			writer.close();
    		}
    		
    		throw e;
    	}
    	writer.close();

    	File projectFile = new File(directory, PROPERTIES_FILENAME);
    	File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
    	
    	projectFile.renameTo(swapFile);
    	tmpFile.renameTo(projectFile);
    	swapFile.delete();

    }
    
    private void writeFlowFile(File directory, Flow flow) throws IOException {
		Object object = flow.toObject();
    	String filename = flow.getId() + FLOW_EXTENSION;
    	File outputFile = new File(directory, filename);
    	logger.info("Writing flow file " + outputFile);
    	String output = JSONUtils.toJSON(object, true);
    	
    	FileWriter writer = new FileWriter(outputFile);
    	try {
    		writer.write(output);
    	} catch (IOException e) {
    		if (writer != null) {
    			writer.close();
    		}
    		
    		throw e;
    	}
    	writer.close();
    }

	@Override
	public synchronized Project removeProject(String projectName, User user) {
		return null;
	}

	@Override
	public List<Flow> getProjectFlows(String projectName, User user) throws ProjectManagerException {
		
		
		return null;
	}

	private static class SuffixFilter implements FileFilter {
		private String suffix;
		
		public SuffixFilter(String suffix) {
			this.suffix = suffix;
		}

		@Override
		public boolean accept(File pathname) {
			String name = pathname.getName();
			
			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
		}
	}
}