package azkaban.project;
import java.io.File;
import java.io.FileFilter;
import java.io.FileWriter;
import java.io.IOException;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.apache.log4j.Logger;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
public class FileProjectManager implements ProjectManager {
public static final String DIRECTORY_PARAM = "file.project.loader.path";
private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
private static final String PROPERTIES_FILENAME = "project.json";
private static final String PROJECT_DIRECTORY = "src";
private static final String FLOW_EXTENSION = ".flow";
private static final Logger logger = Logger.getLogger(FileProjectManager.class);
private static final int IDLE_SECONDS = 120;
private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
private CacheManager manager = CacheManager.create();
private Cache sourceCache;
private File projectDirectory;
public FileProjectManager(Props props) {
setupDirectories(props);
loadAllProjects();
setupCache();
}
private void setupCache() {
CacheConfiguration cacheConfig = new CacheConfiguration("propsCache",2000)
.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
.overflowToDisk(false)
.eternal(false)
.timeToIdleSeconds(IDLE_SECONDS)
.diskPersistent(false)
.diskExpiryThreadIntervalSeconds(0);
sourceCache = new Cache(cacheConfig);
manager.addCache(sourceCache);
}
private void setupDirectories(Props props) {
String projectDir = props.getString(DIRECTORY_PARAM);
logger.info("Using directory " + projectDir + " as the project directory.");
projectDirectory = new File(projectDir);
if (!projectDirectory.exists()) {
logger.info("Directory " + projectDir + " doesn't exist. Creating.");
if (projectDirectory.mkdirs()) {
logger.info("Directory creation was successful.");
}
else {
throw new RuntimeException(
"FileProjectLoader cannot create directory " + projectDirectory);
}
}
else if (projectDirectory.isFile()) {
throw new RuntimeException("FileProjectManager directory " + projectDirectory + " is really a file.");
}
}
private void loadAllProjects() {
File[] directories = projectDirectory.listFiles();
for (File dir : directories) {
if (!dir.isDirectory()) {
logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory.");
}
else {
File propertiesFile = new File(dir, PROPERTIES_FILENAME);
if (!propertiesFile.exists()) {
logger.error("ERROR loading project from " + dir.getPath()
+ ". Project file " + PROPERTIES_FILENAME
+ " not found.");
}
else {
Object obj = null;
try {
obj = JSONUtils.parseJSONFromFile(propertiesFile);
}
catch (IOException e) {
logger.error( "ERROR loading project from " + dir.getPath()
+ ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e);
continue;
}
Project project = Project.projectFromObject(obj);
logger.info("Loading project " + project.getName());
projects.put(project.getName(), project);
String source = project.getSource();
if (source == null) {
logger.info(project.getName() + ": No flows uploaded");
continue;
}
File projectDir = new File(dir, source);
if (!projectDir.exists()) {
logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
}
else if (!projectDir.isDirectory()) {
logger.error("ERROR project source dir " + projectDir + " is not a directory.");
}
else {
File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
for (File flowFile : flowFiles) {
Object objectizedFlow = null;
try {
objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
}
catch (IOException e) {
logger.error("Error parsing flow file " + flowFile.toString(), e);
continue;
}
Flow flow = null;
try {
flow = Flow.flowFromObject(objectizedFlow);
}
catch (Exception e) {
logger.error(
"Error loading flow "
+ flowFile.getName()
+ " in project "
+ project.getName(), e);
continue;
}
logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
flow.initialize();
flowMap.put(flow.getId(), flow);
}
synchronized (project) {
project.setFlows(flowMap);
}
}
}
}
}
}
public List<String> getProjectNames() {
return new ArrayList<String>(projects.keySet());
}
public List<Project> getProjects(User user) {
ArrayList<Project> array = new ArrayList<Project>();
for (Project project : projects.values()) {
Permission perm = project.getUserPermission(user);
if (perm != null && (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ))) {
array.add(project);
}
}
return array;
}
public Project getProject(String name, User user) {
Project project = projects.get(name);
if (project != null) {
Permission perm = project.getUserPermission(user);
if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
return project;
} else {
throw new AccessControlException( "Permission denied. Do not have read access.");
}
}
return project;
}
public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
logger.info("Uploading files to " + projectName);
Project project = projects.get(projectName);
if (project == null) {
throw new ProjectManagerException("Project not found.");
}
if (!project.hasPermission(uploader, Type.WRITE)) {
throw new AccessControlException("Permission denied. Do not have write access.");
}
List<String> errors = new ArrayList<String>();
DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
loader.loadProjectFlow(dir);
Map<String, Flow> flows = loader.getFlowMap();
File projectPath = new File(projectDirectory, projectName);
File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
if (!installDir.mkdir()) {
throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
}
for (Flow flow : flows.values()) {
try {
if (flow.getErrors() != null) {
errors.addAll(flow.getErrors());
}
flow.initialize();
writeFlowFile(installDir, flow);
}
catch (IOException e) {
throw new ProjectManagerException("Project directory "
+ projectName + " cannot be created in "
+ projectDirectory, e);
}
}
File destDirectory = new File(installDir, PROJECT_DIRECTORY);
dir.renameTo(destDirectory);
if (force || errors.isEmpty()) {
synchronized (project) {
logger.info("Uploading files to " + projectName);
project.setSource(installDir.getName());
project.setLastModifiedTimestamp(System.currentTimeMillis());
project.setLastModifiedUser(uploader.getUserId());
project.setFlows(flows);
}
try {
writeProjectFile(projectPath, project);
}
catch (IOException e) {
throw new ProjectManagerException("Project directory "
+ projectName + " cannot be created in "
+ projectDirectory, e);
}
}
else {
logger.info("Errors found loading project " + projectName);
StringBuffer bufferErrors = new StringBuffer();
for (String error : errors) {
bufferErrors.append(error);
bufferErrors.append("\n");
}
throw new ProjectManagerException(bufferErrors.toString());
}
}
@Override
public synchronized Project createProject(String projectName,
String description, User creator) throws ProjectManagerException {
if (projectName == null || projectName.trim().isEmpty()) {
throw new ProjectManagerException("Project name cannot be empty.");
}
else if (description == null || description.trim().isEmpty()) {
throw new ProjectManagerException("Description cannot be empty.");
}
else if (creator == null) {
throw new ProjectManagerException("Valid creator user must be set.");
}
else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")) {
throw new ProjectManagerException("Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
}
if (projects.contains(projectName)) {
throw new ProjectManagerException("Project already exists.");
}
File projectPath = new File(projectDirectory, projectName);
if (projectPath.exists()) {
throw new ProjectManagerException("Project already exists.");
}
if (!projectPath.mkdirs()) {
throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory);
}
Permission perm = new Permission(Type.ADMIN);
long time = System.currentTimeMillis();
Project project = new Project(projectName);
project.setUserPermission(creator.getUserId(), perm);
project.setDescription(description);
project.setCreateTimestamp(time);
project.setLastModifiedTimestamp(time);
project.setLastModifiedUser(creator.getUserId());
logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
try {
writeProjectFile(projectPath, project);
}
catch (IOException e) {
throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory, e);
}
projects.put(projectName, project);
return project;
}
private synchronized void writeProjectFile(File directory, Project project) throws IOException {
Object object = project.toObject();
File tmpFile = File.createTempFile("project-", ".json", directory);
if (tmpFile.exists()) {
tmpFile.delete();
}
logger.info("Writing project file " + tmpFile);
String output = JSONUtils.toJSON(object, true);
FileWriter writer = new FileWriter(tmpFile);
try {
writer.write(output);
}
catch (IOException e) {
if (writer != null) {
writer.close();
}
throw e;
}
writer.close();
File projectFile = new File(directory, PROPERTIES_FILENAME);
File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
projectFile.renameTo(swapFile);
tmpFile.renameTo(projectFile);
swapFile.delete();
}
private void writeFlowFile(File directory, Flow flow) throws IOException {
Object object = flow.toObject();
String filename = flow.getId() + FLOW_EXTENSION;
File outputFile = new File(directory, filename);
File oldOutputFile = new File(directory, filename + ".old");
if (outputFile.exists()) {
outputFile.renameTo(oldOutputFile);
}
logger.info("Writing flow file " + outputFile);
String output = JSONUtils.toJSON(object, true);
FileWriter writer = new FileWriter(outputFile);
try {
writer.write(output);
}
catch (IOException e) {
if (writer != null) {
writer.close();
}
throw e;
}
writer.close();
if (oldOutputFile.exists()) {
oldOutputFile.delete();
}
}
@Override
public Props getProperties(String projectName, String source, User user)
throws ProjectManagerException {
Project project = projects.get(projectName);
if (project == null) {
throw new ProjectManagerException("Project " + project + " cannot be found.");
}
return getProperties(project, source, user);
}
@Override
public Props getProperties(Project project, String source, User user)
throws ProjectManagerException {
if (!project.hasPermission(user, Type.READ)) {
throw new AccessControlException(
"Permission denied. Do not have read access.");
}
String mySource = project.getName() + File.separatorChar
+ project.getSource() + File.separatorChar + "src"
+ File.separatorChar + source;
Element sourceElement = sourceCache.get(mySource);
if (sourceElement != null) {
return Props.clone((Props) sourceElement.getObjectValue());
}
File file = new File(projectDirectory, mySource);
if (!file.exists()) {
throw new ProjectManagerException("Source file " + file.getAbsolutePath() + " doesn't exist.");
}
try {
Props props = new Props((Props) null, file);
return props;
}
catch (IOException e) {
throw new ProjectManagerException("Error loading file " + file.getPath(), e);
}
}
@Override
public synchronized Project removeProject(String projectName, User user) {
return null;
}
private static class SuffixFilter implements FileFilter {
private String suffix;
public SuffixFilter(String suffix) {
this.suffix = suffix;
}
@Override
public boolean accept(File pathname) {
String name = pathname.getName();
return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
}
}
@Override
public void commitProject(String projectName) throws ProjectManagerException {
Project project = projects.get(projectName);
if (project == null) {
throw new ProjectManagerException("Project " + projectName + " doesn't exist.");
}
File projectPath = new File(projectDirectory, projectName);
try {
writeProjectFile(projectPath, project);
}
catch (IOException e) {
throw new ProjectManagerException("Error committing project " + projectName, e);
}
}
@Override
public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException {
Flow flow = project.getFlow(flowId);
if (flow == null) {
throw new ProjectManagerException("Flow " + flowId + " doesn't exist in " + project.getName());
}
HashMap<String, Props> sourceMap = new HashMap<String, Props>();
for (Node node : flow.getNodes()) {
String source = node.getJobSource();
Props props = getProperties(project, node.getJobSource(), user);
sourceMap.put(source, props);
}
for(FlowProps flowProps: flow.getAllFlowProps().values()) {
String source = flowProps.getSource();
Props props = getProperties(project, source, user);
sourceMap.put(source, props);
}
return sourceMap;
}
}