/*
* Copyright 2012 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.project;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import azkaban.flow.Flow;
import azkaban.project.DirectoryFlowLoader;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidationStatus;
import azkaban.project.validator.ValidatorConfigs;
import azkaban.project.validator.ValidatorManager;
import azkaban.project.validator.XmlValidatorManager;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Props;
import azkaban.utils.Utils;
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
private ConcurrentHashMap<Integer, Project> projectsById =
new ConcurrentHashMap<Integer, Project>();
private ConcurrentHashMap<String, Project> projectsByName =
new ConcurrentHashMap<String, Project>();
private final ProjectLoader projectLoader;
private final Props props;
private final File tempDir;
private final int projectVersionRetention;
private final boolean creatorDefaultPermissions;
public ProjectManager(ProjectLoader loader, Props props) {
this.projectLoader = loader;
this.props = props;
this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
this.projectVersionRetention =
(props.getInt("project.version.retention", 3));
logger.info("Project version retention is set to "
+ projectVersionRetention);
this.creatorDefaultPermissions =
props.getBoolean("creator.default.proxy", true);
if (!tempDir.exists()) {
tempDir.mkdirs();
}
// The prop passed to XmlValidatorManager is used to initialize all the
// validators
// Each validator will take certain key/value pairs from the prop to
// initialize itself.
Props prop = new Props(props);
prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH, "initialize");
// By instantiating an object of XmlValidatorManager, this will verify the
// config files for the validators.
new XmlValidatorManager(prop);
loadAllProjects();
loadProjectWhiteList();
}
private void loadAllProjects() {
List<Project> projects;
try {
projects = projectLoader.fetchAllActiveProjects();
} catch (ProjectManagerException e) {
throw new RuntimeException("Could not load projects from store.", e);
}
for (Project proj : projects) {
projectsByName.put(proj.getName(), proj);
projectsById.put(proj.getId(), proj);
}
for (Project proj : projects) {
loadAllProjectFlows(proj);
}
}
private void loadAllProjectFlows(Project project) {
try {
List<Flow> flows = projectLoader.fetchAllProjectFlows(project);
Map<String, Flow> flowMap = new HashMap<String, Flow>();
for (Flow flow : flows) {
flowMap.put(flow.getId(), flow);
}
project.setFlows(flowMap);
} catch (ProjectManagerException e) {
throw new RuntimeException("Could not load projects flows from store.", e);
}
}
public List<String> getProjectNames() {
return new ArrayList<String>(projectsByName.keySet());
}
public Props getProps() {
return props;
}
public List<Project> getUserProjects(User user) {
ArrayList<Project> array = new ArrayList<Project>();
for (Project project : projectsById.values()) {
Permission perm = project.getUserPermission(user);
if (perm != null
&& (perm.isPermissionSet(Type.ADMIN) || perm
.isPermissionSet(Type.READ))) {
array.add(project);
}
}
return array;
}
public List<Project> getGroupProjects(User user) {
List<Project> array = new ArrayList<Project>();
for (Project project : projectsById.values()) {
if (project.hasGroupPermission(user, Type.READ)) {
array.add(project);
}
}
return array;
}
public List<Project> getUserProjectsByRegex(User user, String regexPattern) {
List<Project> array = new ArrayList<Project>();
Pattern pattern;
try {
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) {
logger.error("Bad regex pattern " + regexPattern);
return array;
}
for (Project project : projectsById.values()) {
Permission perm = project.getUserPermission(user);
if (perm != null
&& (perm.isPermissionSet(Type.ADMIN) || perm
.isPermissionSet(Type.READ))) {
if (pattern.matcher(project.getName()).find()) {
array.add(project);
}
}
}
return array;
}
public List<Project> getProjects() {
return new ArrayList<Project>(projectsById.values());
}
public List<Project> getProjectsByRegex(String regexPattern) {
List<Project> allProjects = new ArrayList<Project>();
Pattern pattern;
try {
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) {
logger.error("Bad regex pattern " + regexPattern);
return allProjects;
}
for (Project project : getProjects()) {
if (pattern.matcher(project.getName()).find()) {
allProjects.add(project);
}
}
return allProjects;
}
/**
* Checks if a project is active using project_name
*
* @param name
*/
public Boolean isActiveProject(String name) {
return projectsByName.containsKey(name);
}
/**
* Checks if a project is active using project_id
*
* @param name
*/
public Boolean isActiveProject(int id) {
return projectsById.containsKey(id);
}
/**
* fetch active project from cache and inactive projects from db by
* project_name
*
* @param name
* @return
*/
public Project getProject(String name) {
Project fetchedProject = null;
if (isActiveProject(name)) {
fetchedProject = projectsByName.get(name);
} else {
try {
fetchedProject = projectLoader.fetchProjectByName(name);
} catch (ProjectManagerException e) {
logger.error("Could not load project from store.", e);
}
}
return fetchedProject;
}
/**
* fetch active project from cache and inactive projects from db by
* project_id
*
* @param id
* @return
*/
public Project getProject(int id) {
Project fetchedProject = null;
if (isActiveProject(id)) {
fetchedProject = projectsById.get(id);
} else {
try {
fetchedProject = projectLoader.fetchProjectById(id);
} catch (ProjectManagerException e) {
logger.error("Could not load project from store.", e);
}
}
return fetchedProject;
}
public Project createProject(String projectName, String description,
User creator) throws ProjectManagerException {
if (projectName == null || projectName.trim().isEmpty()) {
throw new ProjectManagerException("Project name cannot be empty.");
} else if (description == null || description.trim().isEmpty()) {
throw new ProjectManagerException("Description cannot be empty.");
} else if (creator == null) {
throw new ProjectManagerException("Valid creator user must be set.");
} else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")) {
throw new ProjectManagerException(
"Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
}
if (projectsByName.containsKey(projectName)) {
throw new ProjectManagerException("Project already exists.");
}
logger.info("Trying to create " + projectName + " by user "
+ creator.getUserId());
Project newProject =
projectLoader.createNewProject(projectName, description, creator);
projectsByName.put(newProject.getName(), newProject);
projectsById.put(newProject.getId(), newProject);
if (creatorDefaultPermissions) {
// Add permission to project
projectLoader.updatePermission(newProject, creator.getUserId(),
new Permission(Permission.Type.ADMIN), false);
// Add proxy user
newProject.addProxyUser(creator.getUserId());
try {
updateProjectSetting(newProject);
} catch (ProjectManagerException e) {
e.printStackTrace();
throw e;
}
}
projectLoader.postEvent(newProject, EventType.CREATED, creator.getUserId(),
null);
return newProject;
}
/**
* Permanently delete all project files and properties data for all versions
* of a project and log event in project_events table
*
* @param project
* @param deleter
* @return
* @throws ProjectManagerException
*/
public synchronized Project purgeProject(Project project, User deleter)
throws ProjectManagerException {
projectLoader.cleanOlderProjectVersion(project.getId(),
project.getVersion() + 1);
projectLoader
.postEvent(project, EventType.PURGE, deleter.getUserId(), String
.format("Purged versions before %d", project.getVersion() + 1));
return project;
}
public synchronized Project removeProject(Project project, User deleter)
throws ProjectManagerException {
projectLoader.removeProject(project, deleter.getUserId());
projectLoader.postEvent(project, EventType.DELETED, deleter.getUserId(),
null);
projectsByName.remove(project.getName());
projectsById.remove(project.getId());
return project;
}
public void updateProjectDescription(Project project, String description,
User modifier) throws ProjectManagerException {
projectLoader.updateDescription(project, description, modifier.getUserId());
projectLoader.postEvent(project, EventType.DESCRIPTION,
modifier.getUserId(), "Description changed to " + description);
}
public List<ProjectLogEvent> getProjectEventLogs(Project project,
int results, int skip) throws ProjectManagerException {
return projectLoader.getProjectEvents(project, results, skip);
}
public Props getProperties(Project project, String source)
throws ProjectManagerException {
return projectLoader.fetchProjectProperty(project, source);
}
public Props getJobOverrideProperty(Project project, String jobName)
throws ProjectManagerException {
return projectLoader.fetchProjectProperty(project, jobName + ".jor");
}
public void setJobOverrideProperty(Project project, Props prop, String jobName)
throws ProjectManagerException {
prop.setSource(jobName + ".jor");
Props oldProps =
projectLoader.fetchProjectProperty(project, prop.getSource());
if (oldProps == null) {
projectLoader.uploadProjectProperty(project, prop);
} else {
projectLoader.updateProjectProperty(project, prop);
}
return;
}
public void updateProjectSetting(Project project)
throws ProjectManagerException {
projectLoader.updateProjectSettings(project);
}
public void addProjectProxyUser(Project project, String proxyName,
User modifier) throws ProjectManagerException {
logger.info("User " + modifier.getUserId() + " adding proxy user "
+ proxyName + " to project " + project.getName());
project.addProxyUser(proxyName);
projectLoader.postEvent(project, EventType.PROXY_USER,
modifier.getUserId(), "Proxy user " + proxyName
+ " is added to project.");
updateProjectSetting(project);
}
public void removeProjectProxyUser(Project project, String proxyName,
User modifier) throws ProjectManagerException {
logger.info("User " + modifier.getUserId() + " removing proxy user "
+ proxyName + " from project " + project.getName());
project.removeProxyUser(proxyName);
projectLoader.postEvent(project, EventType.PROXY_USER,
modifier.getUserId(), "Proxy user " + proxyName
+ " has been removed form the project.");
updateProjectSetting(project);
}
public void updateProjectPermission(Project project, String name,
Permission perm, boolean group, User modifier)
throws ProjectManagerException {
logger.info("User " + modifier.getUserId()
+ " updating permissions for project " + project.getName() + " for "
+ name + " " + perm.toString());
projectLoader.updatePermission(project, name, perm, group);
if (group) {
projectLoader.postEvent(project, EventType.GROUP_PERMISSION,
modifier.getUserId(), "Permission for group " + name + " set to "
+ perm.toString());
} else {
projectLoader.postEvent(project, EventType.USER_PERMISSION,
modifier.getUserId(), "Permission for user " + name + " set to "
+ perm.toString());
}
}
public void removeProjectPermission(Project project, String name,
boolean group, User modifier) throws ProjectManagerException {
logger.info("User " + modifier.getUserId()
+ " removing permissions for project " + project.getName() + " for "
+ name);
projectLoader.removePermission(project, name, group);
if (group) {
projectLoader.postEvent(project, EventType.GROUP_PERMISSION,
modifier.getUserId(), "Permission for group " + name + " removed.");
} else {
projectLoader.postEvent(project, EventType.USER_PERMISSION,
modifier.getUserId(), "Permission for user " + name + " removed.");
}
}
/**
* This method retrieves the uploaded project zip file from DB. A temporary
* file is created to hold the content of the uploaded zip file. This
* temporary file is provided in the ProjectFileHandler instance and the
* caller of this method should call method
* {@ProjectFileHandler.deleteLocalFile}
* to delete the temporary file.
*
* @param project
* @param version - latest version is used if value is -1
* @return ProjectFileHandler - null if can't find project zip file based on
* project name and version
* @throws ProjectManagerException
*/
public ProjectFileHandler getProjectFileHandler(Project project, int version)
throws ProjectManagerException {
if (version == -1) {
version = projectLoader.getLatestProjectVersion(project);
}
return projectLoader.getUploadedFile(project, version);
}
public Map<String, ValidationReport> uploadProject(Project project,
File archive, String fileType, User uploader, Props additionalProps)
throws ProjectManagerException {
logger.info("Uploading files to " + project.getName());
// Unzip.
File file = null;
try {
if (fileType == null) {
throw new ProjectManagerException("Unknown file type for "
+ archive.getName());
} else if ("zip".equals(fileType)) {
file = unzipFile(archive);
} else {
throw new ProjectManagerException("Unsupported archive type for file "
+ archive.getName());
}
} catch (IOException e) {
throw new ProjectManagerException("Error unzipping file.", e);
}
// Since props is an instance variable of ProjectManager, and each
// invocation to the uploadProject manager needs to pass a different
// value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
// create a new instance of Props to make sure these different values
// are isolated from each other.
Props prop = new Props(props);
prop.putAll(additionalProps);
prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH,
archive.getAbsolutePath());
// Basically, we want to make sure that for different invocations to the
// uploadProject method,
// the validators are using different values for the
// PROJECT_ARCHIVE_FILE_PATH configuration key.
// In addition, we want to reload the validator objects for each upload, so
// that we can change the validator configuration files without having to
// restart Azkaban web server. If the XmlValidatorManager is an instance
// variable, 2 consecutive invocations to the uploadProject
// method might cause the second one to overwrite the
// PROJECT_ARCHIVE_FILE_PATH configuration parameter
// of the first, thus causing a wrong archive file path to be passed to the
// validators. Creating a separate XmlValidatorManager object for each
// upload will prevent this issue without having to add
// synchronization between uploads. Since we're already reloading the XML
// config file and creating validator objects for each upload, this does
// not add too much additional overhead.
ValidatorManager validatorManager = new XmlValidatorManager(prop);
logger.info("Validating project " + archive.getName()
+ " using the registered validators "
+ validatorManager.getValidatorsInfo().toString());
Map<String, ValidationReport> reports = validatorManager.validate(project, file);
ValidationStatus status = ValidationStatus.PASS;
for (Entry<String, ValidationReport> report : reports.entrySet()) {
if (report.getValue().getStatus().compareTo(status) > 0) {
status = report.getValue().getStatus();
}
}
if (status == ValidationStatus.ERROR) {
logger.error("Error found in upload to " + project.getName()
+ ". Cleaning up.");
try {
FileUtils.deleteDirectory(file);
} catch (IOException e) {
file.deleteOnExit();
e.printStackTrace();
}
return reports;
}
DirectoryFlowLoader loader =
(DirectoryFlowLoader) validatorManager.getDefaultValidator();
Map<String, Props> jobProps = loader.getJobProps();
List<Props> propProps = loader.getProps();
synchronized (project) {
int newVersion = projectLoader.getLatestProjectVersion(project) + 1;
Map<String, Flow> flows = loader.getFlowMap();
for (Flow flow : flows.values()) {
flow.setProjectId(project.getId());
flow.setVersion(newVersion);
}
logger.info("Uploading file to db " + archive.getName());
projectLoader.uploadProjectFile(project, newVersion, fileType,
archive.getName(), archive, uploader.getUserId());
logger.info("Uploading flow to db " + archive.getName());
projectLoader.uploadFlows(project, newVersion, flows.values());
logger.info("Changing project versions " + archive.getName());
projectLoader.changeProjectVersion(project, newVersion,
uploader.getUserId());
project.setFlows(flows);
logger.info("Uploading Job properties");
projectLoader.uploadProjectProperties(project, new ArrayList<Props>(
jobProps.values()));
logger.info("Uploading Props properties");
projectLoader.uploadProjectProperties(project, propProps);
}
logger.info("Uploaded project files. Cleaning up temp files.");
projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
"Uploaded project files zip " + archive.getName());
try {
FileUtils.deleteDirectory(file);
} catch (IOException e) {
file.deleteOnExit();
e.printStackTrace();
}
logger.info("Cleaning up old install files older than "
+ (project.getVersion() - projectVersionRetention));
projectLoader.cleanOlderProjectVersion(project.getId(),
project.getVersion() - projectVersionRetention);
return reports;
}
public void updateFlow(Project project, Flow flow)
throws ProjectManagerException {
projectLoader.updateFlow(project, flow.getVersion(), flow);
}
private File unzipFile(File archiveFile) throws IOException {
ZipFile zipfile = new ZipFile(archiveFile);
File unzipped = Utils.createTempDir(tempDir);
Utils.unzip(zipfile, unzipped);
zipfile.close();
return unzipped;
}
public void postProjectEvent(Project project, EventType type, String user,
String message) {
projectLoader.postEvent(project, type, user, message);
}
public boolean loadProjectWhiteList() {
if (props.containsKey(ProjectWhitelist.XML_FILE_PARAM)) {
ProjectWhitelist.load(props);
return true;
}
return false;
}
}