package azkaban.webapp.servlet;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipFile;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.fileupload.FileItem;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Utils;
import azkaban.webapp.session.Session;
import azkaban.webapp.servlet.MultipartParser;
public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1;
private static final Logger logger = Logger.getLogger(ProjectManagerServlet.class);
private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
private static final NodeLevelComparator NODE_LEVEL_COMPARATOR = new NodeLevelComparator();
private ProjectManager manager;
private MultipartParser multipartParser;
private File tempDir;
private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
@Override
public int compare(Flow f1, Flow f2) {
return f1.getId().compareTo(f2.getId());
}
};
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
manager = this.getApplication().getProjectManager();
tempDir = this.getApplication().getTempDirectory();
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
}
@Override
protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if ( hasParam(req, "project") ) {
if (hasParam(req, "json")) {
handleJSONAction(req, resp, session);
}
else if (hasParam(req, "flow")) {
handleFlowPage(req, resp, session);
}
else {
handleProjectPage(req, resp, session);
}
return;
}
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/projectpage.vm");
page.add("errorMsg", "No project set.");
page.render();
}
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if (ServletFileUpload.isMultipartContent(req)) {
logger.info("Post is multipart");
Map<String, Object> params = multipartParser.parseMultipart(req);
if (params.containsKey("action")) {
String action = (String)params.get("action");
if (action.equals("upload")) {
handleUpload(req, resp, params, session);
}
}
}
else if (hasParam(req, "action")) {
String action = getParam(req, "action");
if (action.equals("create")) {
handleCreate(req, resp, session);
}
}
}
private void handleJSONAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
String projectName = getParam(req, "project");
User user = session.getUser();
HashMap<String, Object> ret = new HashMap<String, Object>();
ret.put("project", projectName);
Project project = null;
try {
project = manager.getProject(projectName, user);
} catch (Exception e) {
ret.put("error", e.getMessage());
this.writeJSON(resp, ret);
return;
}
String jsonName = getParam(req, "json");
if (jsonName.equals("expandflow")) {
String flowId = getParam(req, "flow");
Flow flow = project.getFlow(flowId);
ArrayList<Node> flowNodes = new ArrayList<Node>(flow.getNodes());
Collections.sort(flowNodes, NODE_LEVEL_COMPARATOR);
ArrayList<Object> nodeList = new ArrayList<Object>();
for (Node node: flowNodes) {
HashMap<String, Object> nodeObj = new HashMap<String, Object>();
nodeObj.put("id", node.getId());
ArrayList<String> dependencies = new ArrayList<String>();
Collection<Edge> collection = flow.getInEdges(node.getId());
if (collection != null) {
for (Edge edge: collection) {
dependencies.add(edge.getSourceId());
}
}
ArrayList<String> dependents = new ArrayList<String>();
collection = flow.getOutEdges(node.getId());
if (collection != null) {
for (Edge edge: collection) {
dependents.add(edge.getTargetId());
}
}
nodeObj.put("dependencies", dependencies);
nodeObj.put("dependents", dependents);
nodeObj.put("level", node.getLevel());
nodeList.add(nodeObj);
}
ret.put("flowId", flowId);
ret.put("nodes", nodeList);
}
this.writeJSON(resp, ret);
}
private void handleFlowPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/flowpage.vm");
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
User user = session.getUser();
Project project = null;
Flow flow = null;
try {
project = manager.getProject(projectName, user);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
else {
page.add("project", project);
flow = project.getFlow(flowName);
if (flow == null) {
page.add("errorMsg", "Flow " + flowName + " not found.");
}
page.add("flowid", flow.getId());
}
}
catch (AccessControlException e) {
page.add("errorMsg", e.getMessage());
}
page.render();
}
private void handleProjectPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/projectpage.vm");
String projectName = getParam(req, "project");
User user = session.getUser();
Project project = null;
try {
project = manager.getProject(projectName, user);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
else {
page.add("project", project);
page.add("admins", Utils.flattenToString(project.getUsersWithPermission(Type.ADMIN), ","));
page.add("permissions", project.getUserPermission(user));
List<Flow> flows = project.getFlows();
if (!flows.isEmpty()) {
Collections.sort(flows, FLOW_ID_COMPARATOR);
page.add("flows", flows);
}
}
}
catch (AccessControlException e) {
page.add("errorMsg", e.getMessage());
}
page.render();
}
private void handleCreate(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException {
String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
String projectDescription = hasParam(req, "description") ? getParam(req, "description") : null;
logger.info("Create project " + projectName);
User user = session.getUser();
String status = null;
String action = null;
String message = null;
HashMap<String, Object> params = null;
try {
manager.createProject(projectName, projectDescription, user);
status = "success";
action = "redirect";
String redirect = "manager?project=" + projectName;
params = new HashMap<String, Object>();
params.put("path", redirect);
} catch (ProjectManagerException e) {
message = e.getMessage();
status = "error";
}
String response = createJsonResponse(status, message, action, params);
try {
Writer write = resp.getWriter();
write.append(response);
write.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart,
Session session) throws ServletException, IOException {
User user = session.getUser();
String projectName = (String) multipart.get("project");
FileItem item = (FileItem) multipart.get("file");
String forceStr = (String) multipart.get("force");
boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
File projectDir = null;
if (projectName == null || projectName.isEmpty()) {
setErrorMessageInCookie(resp, "No project name found.");
}
else if (item == null) {
setErrorMessageInCookie(resp, "No file found.");
}
else {
try {
projectDir = extractFile(item);
manager.uploadProject(projectName, projectDir, user, force);
setSuccessMessageInCookie(resp, "Project Uploaded");
}
catch (Exception e) {
logger.info("Installation Failed.", e);
setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
}
if (projectDir != null && projectDir.exists() ) {
FileUtils.deleteDirectory(projectDir);
}
resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
}
}
private File extractFile(FileItem item) throws IOException, ServletException {
final String contentType = item.getContentType();
if (contentType.startsWith("application/zip")) {
return unzipFile(item);
}
throw new ServletException(String.format("Unsupported file type[%s].", contentType));
}
private File unzipFile(FileItem item) throws ServletException, IOException {
File temp = File.createTempFile("job-temp", ".zip");
temp.deleteOnExit();
OutputStream out = new BufferedOutputStream(new FileOutputStream(temp));
IOUtils.copy(item.getInputStream(), out);
out.close();
ZipFile zipfile = new ZipFile(temp);
File unzipped = Utils.createTempDir(tempDir);
Utils.unzip(zipfile, unzipped);
temp.delete();
return unzipped;
}
private static class NodeLevelComparator implements Comparator<Node> {
@Override
public int compare(Node node1, Node node2) {
return node1.getLevel() - node2.getLevel();
}
}
}