ProjectManager.java

415 lines | 14.625 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.project;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.zip.ZipFile;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
import azkaban.utils.Utils;

public class ProjectManager {
	private static final Logger logger = Logger.getLogger(ProjectManager.class);
	
	private ConcurrentHashMap<Integer, Project> projectsById = new ConcurrentHashMap<Integer, Project>();
	private ConcurrentHashMap<String, Project> projectsByName = new ConcurrentHashMap<String, Project>();
	private final ProjectLoader projectLoader;
	private final Props props;
	private final File tempDir;
	private final int projectVersionRetention;
	private final boolean creatorDefaultPermissions;
	
	public ProjectManager(ProjectLoader loader, Props props) {
		this.projectLoader = loader;
		this.props = props;
		this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
		this.projectVersionRetention = (props.getInt("project.version.retention", 3));
		logger.info("Project version retention is set to " + projectVersionRetention);
		
		this.creatorDefaultPermissions = props.getBoolean("creator.default.proxy", true);
		
		if (!tempDir.exists()) {
			tempDir.mkdirs();
		}
		
		loadAllProjects();
	}

	private void loadAllProjects() {
		List<Project> projects;
		try {
			projects = projectLoader.fetchAllActiveProjects();
		} catch (ProjectManagerException e) {
			throw new RuntimeException("Could not load projects from store.", e);
		}
		for (Project proj: projects) {
			projectsByName.put(proj.getName(), proj);
			projectsById.put(proj.getId(), proj);
		}
		
		for (Project proj: projects) {
			loadAllProjectFlows(proj);
		}
	}
	
	private void loadAllProjectFlows(Project project) {
		try {
			List<Flow> flows = projectLoader.fetchAllProjectFlows(project);
			Map<String, Flow> flowMap = new HashMap<String, Flow>();
			for (Flow flow: flows) {
				flowMap.put(flow.getId(), flow);
			}
			
			project.setFlows(flowMap);
		}
		catch (ProjectManagerException e) {
			throw new RuntimeException("Could not load projects flows from store.", e);
		}
	}
	
	public List<String> getProjectNames() {
		return new ArrayList<String>(projectsByName.keySet());
	}

	public List<Project> getUserProjects(User user) {
		ArrayList<Project> array = new ArrayList<Project>();
		for (Project project : projectsById.values()) {
			Permission perm = project.getUserPermission(user);

			if (perm != null && (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ))) {
				array.add(project);
			}
		}
		return array;
	}

  public List<Project> getGroupProjects(User user) {
    List<Project> array = new ArrayList<Project>();
    for (Project project : projectsById.values()) {
      if (project.hasGroupPermission(user, Type.READ)) {
        array.add(project);
      }
    }
    return array;
  }

	public List<Project> getUserProjectsByRegex(User user, String regexPattern) {
		List<Project> array = new ArrayList<Project>();
		Pattern pattern;
		try {
			pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE);
		} catch (PatternSyntaxException e) {
			logger.error("Bad regex pattern " + regexPattern);
			return array;
		}
		
		
		for (Project project : projectsById.values()) {
			Permission perm = project.getUserPermission(user);

			if (perm != null && (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ))) {
				if(pattern.matcher(project.getName()).find() ) {
					array.add(project);
				}
			}
		}
		return array;
	}

	public List<Project> getProjects() {
		return new ArrayList<Project>(projectsById.values());
	}

	public List<Project> getProjectsByRegex(String regexPattern) {
		List<Project> allProjects = new ArrayList<Project>();
		Pattern pattern;
		try {
			pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE);
		} catch (PatternSyntaxException e) {
			logger.error("Bad regex pattern " + regexPattern);
			return allProjects;
		}
		for(Project project : getProjects()) {
			if(pattern.matcher(project.getName()).find()) {
				allProjects.add(project);
			}
		}
		return allProjects;
	}

	public Project getProject(String name) {
		return projectsByName.get(name);
	}

	public Project getProject(int id) {
		return projectsById.get(id);
	}

	public Project createProject(String projectName, String description, User creator) throws ProjectManagerException {
		if (projectName == null || projectName.trim().isEmpty()) {
			throw new ProjectManagerException("Project name cannot be empty.");
		} 
		else if (description == null || description.trim().isEmpty()) {
			throw new ProjectManagerException("Description cannot be empty.");
		} 
		else if (creator == null) {
			throw new ProjectManagerException("Valid creator user must be set.");
		} 
		else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")) {
			throw new ProjectManagerException("Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
		}

		if (projectsByName.contains(projectName)) {
			throw new ProjectManagerException("Project already exists.");
		}
		
		logger.info("Trying to create " + projectName + " by user " + creator.getUserId());
		Project newProject = projectLoader.createNewProject(projectName, description, creator);
		projectsByName.put(newProject.getName(), newProject);
		projectsById.put(newProject.getId(), newProject);
		
		if(creatorDefaultPermissions) {
		// Add permission to project
			projectLoader.updatePermission(newProject, creator.getUserId(), new Permission(Permission.Type.ADMIN), false);
			
			// Add proxy user 
			newProject.addProxyUser(creator.getUserId());
			try {
				updateProjectSetting(newProject);
			} catch (ProjectManagerException e) {
				e.printStackTrace();
				throw e;
			}
		}
		
		projectLoader.postEvent(newProject, EventType.CREATED, creator.getUserId(), null);
		
		return newProject;
	}

	public synchronized Project removeProject(Project project, User deleter) throws ProjectManagerException {
		projectLoader.removeProject(project, deleter.getUserId());
		projectLoader.postEvent(project, EventType.DELETED, deleter.getUserId(), null);
		
		projectsByName.remove(project.getName());
		projectsById.remove(project.getId());
		
		return project;
	}

	public void updateProjectDescription(Project project, String description, User modifier) throws ProjectManagerException {
		projectLoader.updateDescription(project, description, modifier.getUserId());
		projectLoader.postEvent(project, EventType.DESCRIPTION, modifier.getUserId(), "Description changed to " + description);
	}

	public List<ProjectLogEvent> getProjectEventLogs(Project project, int results, int skip) throws ProjectManagerException {
		return projectLoader.getProjectEvents(project, results, skip);
	}
	
	public Props getProperties(Project project, String source) throws ProjectManagerException {
		return projectLoader.fetchProjectProperty(project, source);
	}
	
	public Props getJobOverrideProperty(Project project, String jobName) throws ProjectManagerException {
		return projectLoader.fetchProjectProperty(project, jobName+".jor");
	}
	
	public void setJobOverrideProperty(Project project, Props prop, String jobName) throws ProjectManagerException {
		prop.setSource(jobName+".jor");
		Props oldProps = projectLoader.fetchProjectProperty(project, prop.getSource());
		if(oldProps == null) {
			projectLoader.uploadProjectProperty(project, prop);
		}
		else {
			projectLoader.updateProjectProperty(project, prop);
		}
		return;
	}

	public void updateProjectSetting(Project project) throws ProjectManagerException {
		projectLoader.updateProjectSettings(project);		
	}
	
	public void addProjectProxyUser(Project project, String proxyName, User modifier) throws ProjectManagerException {
		logger.info("User " + modifier.getUserId() + " adding proxy user " + proxyName + " to project " + project.getName());
		project.addProxyUser(proxyName);
		
		projectLoader.postEvent(project, EventType.PROXY_USER, modifier.getUserId(), "Proxy user " + proxyName + " is added to project.");
		updateProjectSetting(project);
	}
	
	public void removeProjectProxyUser(Project project, String proxyName, User modifier) throws ProjectManagerException {
		logger.info("User " + modifier.getUserId() + " removing proxy user " + proxyName + " from project " + project.getName());
		project.removeProxyUser(proxyName);
		
		projectLoader.postEvent(project, EventType.PROXY_USER, modifier.getUserId(), "Proxy user " + proxyName + " has been removed form the project.");
		updateProjectSetting(project);
	}
	
	public void updateProjectPermission(Project project, String name, Permission perm, boolean group, User modifier) throws ProjectManagerException {
		logger.info("User " + modifier.getUserId() + " updating permissions for project " + project.getName() + " for " + name + " " + perm.toString());
		projectLoader.updatePermission(project, name, perm, group);
		if (group) {
			projectLoader.postEvent(project, EventType.GROUP_PERMISSION, modifier.getUserId(), "Permission for group " + name + " set to " + perm.toString());
		}
		else {
			projectLoader.postEvent(project, EventType.USER_PERMISSION, modifier.getUserId(), "Permission for user " + name + " set to " + perm.toString());
		}
	}
	
	public void removeProjectPermission(Project project, String name, boolean group, User modifier) throws ProjectManagerException {
		logger.info("User " + modifier.getUserId() + " removing permissions for project " + project.getName() + " for " + name);
		projectLoader.removePermission(project, name, group);
		if (group) {
			projectLoader.postEvent(project, EventType.GROUP_PERMISSION, modifier.getUserId(), "Permission for group " + name + " removed.");
		}
		else {
			projectLoader.postEvent(project, EventType.USER_PERMISSION, modifier.getUserId(), "Permission for user " + name + " removed.");
		}
	}
	
	public void uploadProject(Project project, File archive, String fileType, User uploader) throws ProjectManagerException {
		logger.info("Uploading files to " + project.getName());
		
		// Unzip.
		File file = null;
		try {
			if (fileType == null) {
				throw new ProjectManagerException("Unknown file type for " + archive.getName());
			}
			else if ("zip".equals(fileType)) {
				file = unzipFile(archive);
			}
			else {
				throw new ProjectManagerException("Unsupported archive type for file " + archive.getName());
			}
		} catch(IOException e) {
			throw new ProjectManagerException("Error unzipping file.", e);
		}

		logger.info("Validating Flow for upload " + archive.getName());
		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
		loader.loadProjectFlow(file);
		if(!loader.getErrors().isEmpty()) {
			logger.error("Error found in upload to " + project.getName() + ". Cleaning up.");
			
			try {
				FileUtils.deleteDirectory(file);
			} catch (IOException e) {
				file.deleteOnExit();
				e.printStackTrace();
			}
			
			StringBuffer errorMessage = new StringBuffer();
			errorMessage.append("Error found in upload. Cannot upload.\n");
			for (String error: loader.getErrors()) {
				errorMessage.append(error);
				errorMessage.append('\n');
			}

			throw new ProjectManagerException(errorMessage.toString());
		}
		
		Map<String, Props> jobProps = loader.getJobProps();
		List<Props> propProps = loader.getProps();
		
		synchronized(project) {
			int newVersion = projectLoader.getLatestProjectVersion(project) + 1;
			Map<String, Flow> flows = loader.getFlowMap();
			for (Flow flow: flows.values()) {
				flow.setProjectId(project.getId());
				flow.setVersion(newVersion);
			}
			
			logger.info("Uploading file to db " + archive.getName());
			projectLoader.uploadProjectFile(project, newVersion, fileType, archive.getName(), archive, uploader.getUserId());
			logger.info("Uploading flow to db " + archive.getName());
			projectLoader.uploadFlows(project, newVersion, flows.values());
			logger.info("Changing project versions " + archive.getName());
			projectLoader.changeProjectVersion(project, newVersion, uploader.getUserId());
			project.setFlows(flows);
			logger.info("Uploading Job properties");
			projectLoader.uploadProjectProperties(project, new ArrayList<Props>(jobProps.values()));
			logger.info("Uploading Props properties");
			projectLoader.uploadProjectProperties(project, propProps);
		}
	
		//TODO: find something else to load triggers
//		if(loadTriggerFromFile) {
//			logger.info("Loading triggers.");
//			Props triggerProps = new Props();
//			triggerProps.put("projectId", project.getId());
//			triggerProps.put("projectName", project.getName());
//			triggerProps.put("submitUser", uploader.getUserId());
//			try {
//				triggerManager.loadTriggerFromDir(file, triggerProps);
//			} catch (Exception e) {
//				// TODO Auto-generated catch block
//				e.printStackTrace();
//				logger.error("Failed to load triggers.", e);
//			}
//		}
		
		logger.info("Uploaded project files. Cleaning up temp files.");
		projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(), "Uploaded project files zip " + archive.getName());
		try {
			FileUtils.deleteDirectory(file);
		} catch (IOException e) {
			file.deleteOnExit();
			e.printStackTrace();
		}

		logger.info("Cleaning up old install files older than " + (project.getVersion() - projectVersionRetention));
		projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
	}
	
	public void updateFlow(Project project, Flow flow) throws ProjectManagerException {
		projectLoader.updateFlow(project, flow.getVersion(), flow);
	}
	
	private File unzipFile(File archiveFile) throws IOException {
		ZipFile zipfile = new ZipFile(archiveFile);
		File unzipped = Utils.createTempDir(tempDir);
		Utils.unzip(zipfile, unzipped);

		return unzipped;
	}

	public void postProjectEvent(Project project, EventType type, String user,String message) {
		projectLoader.postEvent(project, type, user, message);
	}


}