ReportalServlet.java

1362 lines | 47.947 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.viewer.reportal;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.reportal.util.IStreamProvider;
import azkaban.reportal.util.Reportal;
import azkaban.reportal.util.Reportal.Query;
import azkaban.reportal.util.Reportal.Variable;
import azkaban.reportal.util.ReportalHelper;
import azkaban.reportal.util.ReportalUtil;
import azkaban.reportal.util.StreamProviderHDFS;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduleManagerException;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
import azkaban.webapp.servlet.Page;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.log4j.Logger;
import org.apache.velocity.tools.generic.EscapeTool;
import org.joda.time.DateTime;

public class ReportalServlet extends LoginAbstractAzkabanServlet {

  private static final String REPORTAL_VARIABLE_PREFIX = "reportal.variable.";
  private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
      "hadoop.security.manager.class";
  private static final long serialVersionUID = 1L;
  private static final Logger logger = Logger.getLogger(ReportalServlet.class);
  private final File reportalMailTempDirectory;
  private final Props props;
  private final String viewerName;
  private final String reportalStorageUser;
  private final File webResourcesFolder;
  private final int max_allowed_schedule_dates;
  private final int default_schedule_dates;
  private final boolean showNav;
  private CleanerThread cleanerThread;
  /**
   * A whitelist of allowed email domains (e.g.: example.com). If null, all
   * email domains are allowed.
   */
  private Set<String> allowedEmailDomains = null;
  private AzkabanWebServer server;
  private boolean shouldProxy;
  private int itemsPerPage = 20;
  private HadoopSecurityManager hadoopSecurityManager;

  public ReportalServlet(final Props props) {
    this.props = props;

    this.viewerName = props.getString("viewer.name");
    this.reportalStorageUser = props.getString("reportal.storage.user", "reportal");
    this.itemsPerPage = props.getInt("reportal.items_per_page", 20);
    this.showNav = props.getBoolean("reportal.show.navigation", false);

    this.max_allowed_schedule_dates = props.getInt("reportal.max.allowed.schedule.dates", 180);
    this.default_schedule_dates = props.getInt("reportal.default.schedule.dates", 30);

    this.reportalMailTempDirectory =
        new File(props.getString("reportal.mail.temp.dir", "/tmp/reportal"));
    this.reportalMailTempDirectory.mkdirs();
    ReportalMailCreator.reportalMailTempDirectory = this.reportalMailTempDirectory;

    final List<String> allowedDomains =
        props.getStringList("reportal.allowed.email.domains",
            (List<String>) null);
    if (allowedDomains != null) {
      this.allowedEmailDomains = new HashSet<>(allowedDomains);
    }

    ReportalMailCreator.outputLocation =
        props.getString("reportal.output.dir", "/tmp/reportal");
    ReportalMailCreator.outputFileSystem =
        props.getString("reportal.output.filesystem", "local");
    ReportalMailCreator.reportalStorageUser = this.reportalStorageUser;

    this.webResourcesFolder =
        new File(new File(props.getSource()).getParentFile().getParentFile(),
            "web");
    this.webResourcesFolder.mkdirs();
    setResourceDirectory(this.webResourcesFolder);
    System.out.println("Reportal web resources: "
        + this.webResourcesFolder.getAbsolutePath());
  }

  @Override
  public void init(final ServletConfig config) throws ServletException {
    super.init(config);
    this.server = (AzkabanWebServer) getApplication();
    ReportalMailCreator.azkaban = this.server;

    this.shouldProxy = this.props.getBoolean("azkaban.should.proxy", false);
    logger.info("Hdfs browser should proxy: " + this.shouldProxy);
    try {
      this.hadoopSecurityManager = loadHadoopSecurityManager(this.props, logger);
      ReportalMailCreator.hadoopSecurityManager = this.hadoopSecurityManager;
    } catch (final RuntimeException e) {
      e.printStackTrace();
      throw new RuntimeException("Failed to get hadoop security manager!"
          + e.getCause());
    }

    this.cleanerThread = new CleanerThread();
    this.cleanerThread.start();
  }

  private HadoopSecurityManager loadHadoopSecurityManager(final Props props,
      final Logger logger) throws RuntimeException {

    final Class<?> hadoopSecurityManagerClass =
        props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
            ReportalServlet.class.getClassLoader());
    logger.info("Initializing hadoop security manager "
        + hadoopSecurityManagerClass.getName());
    HadoopSecurityManager hadoopSecurityManager = null;

    try {
      final Method getInstanceMethod =
          hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
      hadoopSecurityManager =
          (HadoopSecurityManager) getInstanceMethod.invoke(
              hadoopSecurityManagerClass, props);
    } catch (final InvocationTargetException e) {
      logger.error("Could not instantiate Hadoop Security Manager "
          + hadoopSecurityManagerClass.getName() + e.getCause());
      throw new RuntimeException(e.getCause());
    } catch (final Exception e) {
      e.printStackTrace();
      throw new RuntimeException(e.getCause());
    }

    return hadoopSecurityManager;
  }

  @Override
  protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException, IOException {
    if (hasParam(req, "ajax")) {
      handleAJAXAction(req, resp, session);
    } else {
      if (hasParam(req, "view")) {
        try {
          handleViewReportal(req, resp, session);
        } catch (final Exception e) {
          e.printStackTrace();
        }
      } else if (hasParam(req, "new")) {
        handleNewReportal(req, resp, session);
      } else if (hasParam(req, "edit")) {
        handleEditReportal(req, resp, session);
      } else if (hasParam(req, "run")) {
        handleRunReportal(req, resp, session);
      } else {
        handleListReportal(req, resp, session);
      }
    }
  }

  private void handleAJAXAction(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final HashMap<String, Object> ret = new HashMap<>();
    final String ajaxName = getParam(req, "ajax");
    final User user = session.getUser();
    final int id = getIntParam(req, "id");
    final ProjectManager projectManager = this.server.getProjectManager();
    final Project project = projectManager.getProject(id);
    final Reportal reportal = Reportal.loadFromProject(project);

    // Delete report
    if (ajaxName.equals("delete")) {
      if (!project.hasPermission(user, Type.ADMIN)) {
        ret.put("error", "You do not have permissions to delete this reportal.");
      } else {
        try {
          final ScheduleManager scheduleManager = this.server.getScheduleManager();
          reportal.removeSchedules(scheduleManager);
          projectManager.removeProject(project, user);
        } catch (final Exception e) {
          e.printStackTrace();
          ret.put("error", "An exception occured while deleting this reportal.");
        }
        ret.put("result", "success");
      }
    }
    // Bookmark report
    else if (ajaxName.equals("bookmark")) {
      final boolean wasBookmarked = ReportalHelper.isBookmarkProject(project, user);
      try {
        if (wasBookmarked) {
          ReportalHelper.unBookmarkProject(this.server, project, user);
          ret.put("result", "success");
          ret.put("bookmark", false);
        } else {
          ReportalHelper.bookmarkProject(this.server, project, user);
          ret.put("result", "success");
          ret.put("bookmark", true);
        }
      } catch (final ProjectManagerException e) {
        e.printStackTrace();
        ret.put("error", "Error bookmarking reportal. " + e.getMessage());
      }
    }
    // Subscribe to report
    else if (ajaxName.equals("subscribe")) {
      final boolean wasSubscribed = ReportalHelper.isSubscribeProject(project, user);
      if (!wasSubscribed && reportal.getAccessViewers().size() > 0
          && !hasPermission(project, user, Type.READ)) {
        ret.put("error", "You do not have permissions to view this reportal.");
      } else {
        try {
          if (wasSubscribed) {
            ReportalHelper.unSubscribeProject(this.server, project, user);
            ret.put("result", "success");
            ret.put("subscribe", false);
          } else {
            ReportalHelper.subscribeProject(this.server, project, user,
                user.getEmail());
            ret.put("result", "success");
            ret.put("subscribe", true);
          }
        } catch (final ProjectManagerException e) {
          e.printStackTrace();
          ret.put("error", "Error subscribing to reportal. " + e.getMessage());
        }
      }
    }
    // Get a portion of logs
    else if (ajaxName.equals("log")) {
      final int execId = getIntParam(req, "execId");
      final String jobId = getParam(req, "jobId");
      final int offset = getIntParam(req, "offset");
      final int length = getIntParam(req, "length");
      final ExecutableFlow exec;
      final ExecutorManagerAdapter executorManagerAdapter = this.server.getExecutorManager();
      try {
        exec = executorManagerAdapter.getExecutableFlow(execId);
      } catch (final Exception e) {
        ret.put("error", "Log does not exist or isn't created yet.");
        return;
      }

      final LogData data;
      try {
        data =
            executorManagerAdapter.getExecutionJobLog(exec, jobId, offset, length,
                exec.getExecutableNode(jobId).getAttempt());
      } catch (final Exception e) {
        e.printStackTrace();
        ret.put("error", "Log does not exist or isn't created yet.");
        return;
      }
      if (data != null) {
        ret.put("result", "success");
        ret.put("log", data.getData());
        ret.put("offset", data.getOffset());
        ret.put("length", data.getLength());
        ret.put("completed", exec.getEndTime() != -1);
      } else {
        // Return an empty result to indicate the end
        ret.put("result", "success");
        ret.put("log", "");
        ret.put("offset", offset);
        ret.put("length", 0);
        ret.put("completed", exec.getEndTime() != -1);
      }
    }

    if (ret != null) {
      this.writeJSON(resp, ret);
    }
  }

  private void handleListReportal(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {

    final Page page =
        newPage(req, resp, session,
            "azkaban/viewer/reportal/reportallistpage.vm");
    preparePage(page, session);

    final List<Project> projects = ReportalHelper.getReportalProjects(this.server);
    page.add("ReportalHelper", ReportalHelper.class);
    page.add("user", session.getUser());

    final String startDate = DateTime.now().minusWeeks(1).toString("yyyy-MM-dd");
    final String endDate = DateTime.now().toString("yyyy-MM-dd");
    page.add("startDate", startDate);
    page.add("endDate", endDate);

    if (!projects.isEmpty()) {
      page.add("projects", projects);
    } else {
      page.add("projects", false);
    }

    page.render();
  }

  private void handleViewReportal(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      Exception {
    final int id = getIntParam(req, "id");
    final Page page =
        newPage(req, resp, session,
            "azkaban/viewer/reportal/reportaldatapage.vm");
    preparePage(page, session);

    final ProjectManager projectManager = this.server.getProjectManager();
    final ExecutorManagerAdapter executorManagerAdapter = this.server.getExecutorManager();

    final Project project = projectManager.getProject(id);
    final Reportal reportal = Reportal.loadFromProject(project);

    if (reportal == null) {
      page.add("errorMsg", "Report not found.");
      page.render();
      return;
    }

    if (reportal.getAccessViewers().size() > 0
        && !hasPermission(project, session.getUser(), Type.READ)) {
      page.add("errorMsg", "You are not allowed to view this report.");
      page.render();
      return;
    }

    page.add("project", project);
    page.add("title", project.getMetadata().get("title"));

    if (hasParam(req, "execid")) {
      final int execId = getIntParam(req, "execid");
      page.add("execid", execId);
      // Show logs
      if (hasParam(req, "logs")) {
        final ExecutableFlow exec;
        try {
          exec = executorManagerAdapter.getExecutableFlow(execId);
        } catch (final ExecutorManagerException e) {
          e.printStackTrace();
          page.add("errorMsg", "ExecutableFlow not found. " + e.getMessage());
          page.render();
          return;
        }
        // View single log
        if (hasParam(req, "log")) {
          page.add("view-log", true);
          final String jobId = getParam(req, "log");
          page.add("execid", execId);
          page.add("jobId", jobId);
        }
        // List files
        else {
          page.add("view-logs", true);
          final List<ExecutableNode> jobLogs = ReportalUtil.sortExecutableNodes(exec);

          final boolean showDataCollector = hasParam(req, "debug");
          if (!showDataCollector) {
            jobLogs.remove(jobLogs.size() - 1);
          }

          if (jobLogs.size() == 1) {
            resp.sendRedirect("/reportal?view&logs&id=" + project.getId()
                + "&execid=" + execId + "&log=" + jobLogs.get(0).getId());
          }
          page.add("logs", jobLogs);
        }
      }
      // Show data files
      else {
        final String outputFileSystem = ReportalMailCreator.outputFileSystem;
        final String outputBase = ReportalMailCreator.outputLocation;

        final String locationFull = (outputBase + "/" + execId).replace("//", "/");

        final IStreamProvider streamProvider =
            ReportalUtil.getStreamProvider(outputFileSystem);

        if (streamProvider instanceof StreamProviderHDFS) {
          final StreamProviderHDFS hdfsStreamProvider =
              (StreamProviderHDFS) streamProvider;
          hdfsStreamProvider.setHadoopSecurityManager(this.hadoopSecurityManager);
          hdfsStreamProvider.setUser(this.reportalStorageUser);
        }

        try {
          if (hasParam(req, "download")) {
            final String fileName = getParam(req, "download");
            final String filePath = locationFull + "/" + fileName;
            InputStream csvInputStream = null;
            OutputStream out = null;
            try {
              csvInputStream = streamProvider.getFileInputStream(filePath);
              resp.setContentType("application/octet-stream");

              out = resp.getOutputStream();
              IOUtils.copy(csvInputStream, out);
            } finally {
              IOUtils.closeQuietly(out);
              IOUtils.closeQuietly(csvInputStream);
            }
            return;
          }
          // Show file previews
          else {
            page.add("view-preview", true);

            try {
              String[] fileList = streamProvider.getFileList(locationFull);
              fileList = ReportalHelper.filterCSVFile(fileList);
              Arrays.sort(fileList);

              final List<Object> files =
                  getFilePreviews(fileList, locationFull, streamProvider,
                      reportal.renderResultsAsHtml);

              page.add("files", files);
            } catch (final Exception e) {
              logger.error("Error encountered while processing files in "
                  + locationFull, e);
            }
          }
        } finally {
          try {
            streamProvider.cleanUp();
          } catch (final IOException e) {
            e.printStackTrace();
          }
        }
      }
    }
    // List executions and their data
    else {
      page.add("view-executions", true);
      final ArrayList<ExecutableFlow> exFlows = new ArrayList<>();

      int pageNumber = 0;
      boolean hasNextPage = false;
      if (hasParam(req, "page")) {
        pageNumber = getIntParam(req, "page") - 1;
      }
      if (pageNumber < 0) {
        pageNumber = 0;
      }
      try {
        final Flow flow = project.getFlows().get(0);
        executorManagerAdapter.getExecutableFlows(project.getId(), flow.getId(),
            pageNumber * this.itemsPerPage, this.itemsPerPage, exFlows);
        final ArrayList<ExecutableFlow> tmp = new ArrayList<>();
        executorManagerAdapter.getExecutableFlows(project.getId(), flow.getId(),
            (pageNumber + 1) * this.itemsPerPage, 1, tmp);
        if (!tmp.isEmpty()) {
          hasNextPage = true;
        }
      } catch (final ExecutorManagerException e) {
        page.add("error", "Error retrieving executable flows");
      }

      if (!exFlows.isEmpty()) {
        final ArrayList<Object> history = new ArrayList<>();
        for (final ExecutableFlow exFlow : exFlows) {
          final HashMap<String, Object> flowInfo = new HashMap<>();
          flowInfo.put("execId", exFlow.getExecutionId());
          flowInfo.put("status", exFlow.getStatus().toString());
          flowInfo.put("startTime", exFlow.getStartTime());

          history.add(flowInfo);
        }
        page.add("executions", history);
      }
      if (pageNumber > 0) {
        page.add("pagePrev", pageNumber);
      }
      page.add("page", pageNumber + 1);
      if (hasNextPage) {
        page.add("pageNext", pageNumber + 2);
      }
    }

    page.render();
  }

  /**
   * Returns a list of file Objects that contain a "name" property with the file
   * name, a "content" property with the lines in the file, and a "hasMore"
   * property if the file contains more than NUM_PREVIEW_ROWS lines.
   */
  private List<Object> getFilePreviews(final String[] fileList, final String locationFull,
      final IStreamProvider streamProvider, final boolean renderResultsAsHtml) {
    final List<Object> files = new ArrayList<>();
    InputStream csvInputStream = null;

    try {
      for (final String fileName : fileList) {
        final Map<String, Object> file = new HashMap<>();
        file.put("name", fileName);

        final String filePath = locationFull + "/" + fileName;
        csvInputStream = streamProvider.getFileInputStream(filePath);
        final Scanner rowScanner = new Scanner(csvInputStream, StandardCharsets.UTF_8.toString());

        final List<Object> lines = new ArrayList<>();
        int lineNumber = 0;
        while (rowScanner.hasNextLine()
            && lineNumber < ReportalMailCreator.NUM_PREVIEW_ROWS) {
          final String csvLine = rowScanner.nextLine();
          final String[] data = csvLine.split("\",\"");
          final List<String> line = new ArrayList<>();
          for (final String item : data) {
            String column = item.replace("\"", "");
            if (!renderResultsAsHtml) {
              column = StringEscapeUtils.escapeHtml(column);
            }
            line.add(column);
          }
          lines.add(line);
          lineNumber++;
        }

        file.put("content", lines);

        if (rowScanner.hasNextLine()) {
          file.put("hasMore", true);
        }

        files.add(file);
        rowScanner.close();
      }
    } catch (final Exception e) {
      logger.debug("Error encountered while processing files in "
          + locationFull, e);
    } finally {
      IOUtils.closeQuietly(csvInputStream);
    }

    return files;
  }

  private void handleRunReportal(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final int id = getIntParam(req, "id");
    final ProjectManager projectManager = this.server.getProjectManager();
    final Page page =
        newPage(req, resp, session,
            "azkaban/viewer/reportal/reportalrunpage.vm");
    preparePage(page, session);

    final Project project = projectManager.getProject(id);
    final Reportal reportal = Reportal.loadFromProject(project);

    if (reportal == null) {
      page.add("errorMsg", "Report not found");
      page.render();
      return;
    }

    if (reportal.getAccessExecutors().size() > 0
        && !hasPermission(project, session.getUser(), Type.EXECUTE)) {
      page.add("errorMsg", "You are not allowed to run this report.");
      page.render();
      return;
    }

    page.add("projectId", id);
    page.add("title", reportal.title);
    page.add("description", reportal.description);

    final List<Variable> runtimeVariables =
        ReportalUtil.getRunTimeVariables(reportal.variables);
    if (runtimeVariables.size() > 0) {
      page.add("variableNumber", runtimeVariables.size());
      page.add("variables", runtimeVariables);
    }

    page.render();
  }

  private void handleNewReportal(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {

    final Page page =
        newPage(req, resp, session,
            "azkaban/viewer/reportal/reportaleditpage.vm");
    preparePage(page, session);

    page.add("title", "");
    page.add("description", "");

    page.add("queryNumber", 1);

    final List<Map<String, Object>> queryList = new ArrayList<>();
    page.add("queries", queryList);

    final Map<String, Object> query = new HashMap<>();
    queryList.add(query);
    query.put("title", "");
    query.put("type", "");
    query.put("script", "");

    page.add("accessViewer", "");
    page.add("accessExecutor", "");
    page.add("accessOwner", "");
    page.add("notifications", "");
    page.add("failureNotifications", "");

    page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
    page.add("default_schedule_dates", this.default_schedule_dates);

    page.render();
  }

  private void handleEditReportal(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final int id = getIntParam(req, "id");
    final ProjectManager projectManager = this.server.getProjectManager();

    final Page page =
        newPage(req, resp, session,
            "azkaban/viewer/reportal/reportaleditpage.vm");
    preparePage(page, session);
    page.add("ReportalHelper", ReportalHelper.class);

    final Project project = projectManager.getProject(id);
    final Reportal reportal = Reportal.loadFromProject(project);

    final List<String> errors = new ArrayList<>();

    if (reportal == null) {
      errors.add("Report not found");
      page.add("errorMsgs", errors);
      page.render();
      return;
    }

    if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
      errors.add("You are not allowed to edit this report.");
      page.add("errorMsgs", errors);
      page.render();
      return;
    }

    page.add("projectId", id);
    page.add("title", reportal.title);
    page.add("description", reportal.description);
    page.add("queryNumber", reportal.queries.size());
    page.add("queries", reportal.queries);
    page.add("variableNumber", reportal.variables.size());
    page.add("variables", reportal.variables);
    page.add("schedule", reportal.schedule);
    page.add("scheduleHour", reportal.scheduleHour);
    page.add("scheduleMinute", reportal.scheduleMinute);
    page.add("scheduleAmPm", reportal.scheduleAmPm);
    page.add("scheduleTimeZone", reportal.scheduleTimeZone);
    page.add("scheduleDate", reportal.scheduleDate);
    page.add("endScheduleDate", reportal.endSchedule);
    page.add("scheduleRepeat", reportal.scheduleRepeat);
    page.add("scheduleIntervalQuantity", reportal.scheduleIntervalQuantity);
    page.add("scheduleInterval", reportal.scheduleInterval);
    page.add("renderResultsAsHtml", reportal.renderResultsAsHtml);
    page.add("notifications", reportal.notifications);
    page.add("failureNotifications", reportal.failureNotifications);
    page.add("accessViewer", reportal.accessViewer);
    page.add("accessExecutor", reportal.accessExecutor);
    page.add("accessOwner", reportal.accessOwner);

    page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
    page.add("default_schedule_dates", this.default_schedule_dates);
    page.render();
  }

  @Override
  protected void handlePost(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException, IOException {
    if (hasParam(req, "ajax")) {
      final HashMap<String, Object> ret = new HashMap<>();

      handleRunReportalWithVariables(req, ret, session);

      if (ret != null) {
        this.writeJSON(resp, ret);
      }
    } else {
      handleSaveReportal(req, resp, session);
    }
  }

  private void handleSaveReportal(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final String projectId = validateAndSaveReport(req, resp, session);

    if (projectId != null) {
      this.setSuccessMessageInCookie(resp, "Report Saved.");

      final String submitType = getParam(req, "submit");
      if (submitType.equals("Save")) {
        resp.sendRedirect(req.getRequestURI() + "?edit&id=" + projectId);
      } else {
        resp.sendRedirect(req.getRequestURI() + "?run&id=" + projectId);
      }
    }
  }

  /**
   * Validates and saves a report, returning the project id of the saved report
   * if successful, and null otherwise.
   *
   * @return The project id of the saved report if successful, and null
   * otherwise
   */
  private String validateAndSaveReport(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {

    final ProjectManager projectManager = this.server.getProjectManager();
    final User user = session.getUser();

    final Page page =
        newPage(req, resp, session,
            "azkaban/viewer/reportal/reportaleditpage.vm");
    preparePage(page, session);
    page.add("ReportalHelper", ReportalHelper.class);

    final boolean isEdit = hasParam(req, "id");
    if (isEdit) {
      page.add("projectId", getIntParam(req, "id"));
    }

    Project project = null;
    final Reportal report = new Reportal();

    report.title = getParam(req, "title");
    report.description = getParam(req, "description");
    page.add("title", report.title);
    page.add("description", report.description);

    report.schedule = hasParam(req, "schedule");
    report.scheduleHour = getParam(req, "schedule-hour");
    report.scheduleMinute = getParam(req, "schedule-minute");
    report.scheduleAmPm = getParam(req, "schedule-am_pm");
    report.scheduleTimeZone = getParam(req, "schedule-timezone");
    report.scheduleDate = getParam(req, "schedule-date");
    report.scheduleRepeat = hasParam(req, "schedule-repeat");
    report.scheduleIntervalQuantity =
        getParam(req, "schedule-interval-quantity");
    report.scheduleInterval = getParam(req, "schedule-interval");
    report.renderResultsAsHtml = hasParam(req, "render-results-as-html");

    final boolean isEndSchedule = hasParam(req, "end-schedule-date");
    if (isEndSchedule) {
      report.endSchedule = getParam(req, "end-schedule-date");
    }

    page.add("schedule", report.schedule);
    page.add("scheduleHour", report.scheduleHour);
    page.add("scheduleMinute", report.scheduleMinute);
    page.add("scheduleAmPm", report.scheduleAmPm);
    page.add("scheduleTimeZone", report.scheduleTimeZone);
    page.add("scheduleDate", report.scheduleDate);
    page.add("scheduleRepeat", report.scheduleRepeat);
    page.add("scheduleIntervalQuantity", report.scheduleIntervalQuantity);
    page.add("scheduleInterval", report.scheduleInterval);
    page.add("renderResultsAsHtml", report.renderResultsAsHtml);
    page.add("endSchedule", report.endSchedule);
    page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
    page.add("default_schedule_dates", this.default_schedule_dates);

    report.accessViewer = getParam(req, "access-viewer");
    report.accessExecutor = getParam(req, "access-executor");
    report.accessOwner = getParam(req, "access-owner");
    page.add("accessViewer", report.accessViewer);
    page.add("accessExecutor", report.accessExecutor);

    // Adding report creator as explicit owner, if not present already
    if (report.accessOwner == null || report.accessOwner.isEmpty()) {
      report.accessOwner = user.getUserId();
    } else {
      final String[] splittedOwners = report.accessOwner.toLowerCase()
          .split(Reportal.ACCESS_LIST_SPLIT_REGEX);
      if (!Arrays.asList(splittedOwners).contains(user.getUserId())) {
        report.accessOwner = String.format("%s,%s", user.getUserId(),
            StringUtils.join(splittedOwners, ','));
      } else {
        report.accessOwner = StringUtils.join(splittedOwners, ',');
      }
    }

    page.add("accessOwner", report.accessOwner);

    report.notifications = getParam(req, "notifications");
    report.failureNotifications = getParam(req, "failure-notifications");
    page.add("notifications", report.notifications);
    page.add("failureNotifications", report.failureNotifications);

    final int numQueries = getIntParam(req, "queryNumber");
    page.add("queryNumber", numQueries);
    final List<Query> queryList = new ArrayList<>(numQueries);
    page.add("queries", queryList);
    report.queries = queryList;

    final List<String> errors = new ArrayList<>();
    for (int i = 0; i < numQueries; i++) {
      final Query query = new Query();

      query.title = getParam(req, "query" + i + "title");
      query.type = getParam(req, "query" + i + "type");
      query.script = getParam(req, "query" + i + "script");

      // Type check
      final ReportalType type = ReportalType.getTypeByName(query.type);
      if (type == null) {
        errors.add("Type " + query.type + " is invalid.");
      }

      if (!type.checkPermission(user) && report.schedule) {
        errors.add("You do not have permission to schedule Type " + query.type + ".");
      }

      queryList.add(query);
    }

    final int variables = getIntParam(req, "variableNumber");
    page.add("variableNumber", variables);
    final List<Variable> variableList = new ArrayList<>(variables);
    page.add("variables", variableList);
    report.variables = variableList;

    String proxyUser = null;

    for (int i = 0; i < variables; i++) {
      final Variable variable =
          new Variable(getParam(req, "variable" + i + "title"), getParam(req,
              "variable" + i + "name"));

      if (variable.title.isEmpty() || variable.name.isEmpty()) {
        errors.add("Variable title and name cannot be empty.");
      }

      if (variable.title.equals("reportal.config.reportal.execution.user")) {
        proxyUser = variable.name;
      }

      // Validate if the session user (who interact with UI) is part of specified user.to.proxy
      // user. If not, reportal can not be saved and warn users.
      if (variable.title.equals("reportal.config.user.to.proxy")) {
        final String userToProxy = variable.name;
        final UserManager userManager = getApplication().getUserManager();
        if (!userManager.validateProxyUser(userToProxy, user)) {
          errors.add("User " + user.getUserId() + " has no permission to add " + userToProxy
              + " as proxy user.");
        }
      }

      variableList.add(variable);
    }

    // Make sure title isn't empty
    if (report.title.isEmpty()) {
      errors.add("Title must not be empty.");
    }

    // Make sure description isn't empty
    if (report.description.isEmpty()) {
      errors.add("Description must not be empty.");
    }

    // Verify schedule and repeat
    if (report.schedule) {
      // Verify schedule time
      if (!NumberUtils.isDigits(report.scheduleHour)
          || !NumberUtils.isDigits(report.scheduleMinute)) {
        errors.add("Schedule time is invalid.");
      }

      // Verify schedule date is not empty
      if (report.scheduleDate.isEmpty()) {
        errors.add("Schedule date must not be empty.");
      }

      if (report.scheduleRepeat) {
        // Verify repeat interval
        if (!NumberUtils.isDigits(report.scheduleIntervalQuantity)) {
          errors.add("Repeat interval quantity is invalid.");
        }
      }
    }

    // Empty query check
    if (numQueries <= 0) {
      errors.add("There needs to have at least one query.");
    }

    // Validate access users
    final UserManager userManager = getApplication().getUserManager();
    final String[] accessLists =
        new String[]{report.accessViewer, report.accessExecutor,
            report.accessOwner};
    for (String accessList : accessLists) {
      if (accessList == null) {
        continue;
      }

      accessList = accessList.trim();
      if (!accessList.isEmpty()) {
        final String[] users = accessList.split(Reportal.ACCESS_LIST_SPLIT_REGEX);
        for (final String accessUser : users) {
          if (!userManager.validateUser(accessUser)) {
            errors.add("User " + accessUser + " in access list is invalid.");
          }
        }
      }
    }

    // Validate proxy user
    if (proxyUser != null) {
      if (!userManager.validateProxyUser(proxyUser, user)) {
        errors.add("User " + user.getUserId() + " has no permission to add " + proxyUser
            + " as proxy user.");
      }
      proxyUser = null;
    }

    // Validate email addresses
    final Set<String> emails =
        ReportalHelper.parseUniqueEmails(report.notifications + ","
            + report.failureNotifications, Reportal.ACCESS_LIST_SPLIT_REGEX);
    for (final String email : emails) {
      if (!ReportalHelper.isValidEmailAddress(email)) {
        errors.add("Invalid email address: " + email);
        continue;
      }

      final String domain = ReportalHelper.getEmailDomain(email);
      if (this.allowedEmailDomains != null && !this.allowedEmailDomains.contains(domain)) {
        errors.add("Email address '" + email + "' has an invalid domain '"
            + domain + "'. " + "Valid domains are: " + this.allowedEmailDomains);
      }
    }

    if (errors.size() > 0) {
      page.add("errorMsgs", errors);
      page.render();
      return null;
    }

    // Attempt to get a project object
    if (isEdit) {
      // Editing mode, load project
      final int projectId = getIntParam(req, "id");
      project = projectManager.getProject(projectId);
      report.loadImmutableFromProject(project);
    } else {
      // Creation mode, create project
      try {
        project =
            ReportalHelper.createReportalProject(this.server, report.title,
                report.description, user);
        report.reportalUser = user.getUserId();
        report.ownerEmail = user.getEmail();
      } catch (final Exception e) {
        e.printStackTrace();
        errors.add("Error while creating report. " + e.getMessage());
        page.add("errorMsgs", errors);
        page.render();
        return null;
      }

      // Project already exists
      if (project == null) {
        errors.add("A Report with the same name already exists.");
        page.add("errorMsgs", errors);
        page.render();
        return null;
      }
    }

    if (project == null) {
      errors.add("Internal Error: Report not found");
      page.add("errorMsgs", errors);
      page.render();
      return null;
    }

    report.project = project;
    page.add("projectId", project.getId());

    try {
      report.createZipAndUpload(projectManager, user, this.reportalStorageUser);
    } catch (final Exception e) {
      e.printStackTrace();
      errors.add("Error while creating Azkaban jobs. " + e.getMessage());
      page.add("errorMsgs", errors);
      page.render();
      if (!isEdit) {
        try {
          projectManager.removeProject(project, user);
        } catch (final ProjectManagerException e1) {
          e1.printStackTrace();
        }
      }
      return null;
    }

    // Prepare flow
    final Flow flow = project.getFlows().get(0);
    project.getMetadata().put("flowName", flow.getId());

    // Set Reportal mailer
    flow.setMailCreator(ReportalMailCreator.REPORTAL_MAIL_CREATOR);

    // Create/Save schedule
    final ScheduleManager scheduleManager = this.server.getScheduleManager();
    try {
      report.updateSchedules(report, scheduleManager, user, flow);
    } catch (final ScheduleManagerException e) {
      e.printStackTrace();
      errors.add(e.getMessage());
      page.add("errorMsgs", errors);
      page.render();
      return null;
    }

    report.saveToProject(project);

    try {
      ReportalHelper.updateProjectNotifications(project, projectManager);
      projectManager.updateProjectSetting(project);
      projectManager
          .updateProjectDescription(project, report.description, user);
      updateProjectPermissions(project, projectManager, report, user);
      projectManager.updateFlow(project, flow);
    } catch (final ProjectManagerException e) {
      e.printStackTrace();
      errors.add("Error while updating report. " + e.getMessage());
      page.add("errorMsgs", errors);
      page.render();
      if (!isEdit) {
        try {
          projectManager.removeProject(project, user);
        } catch (final ProjectManagerException e1) {
          e1.printStackTrace();
        }
      }
      return null;
    }

    return Integer.toString(project.getId());
  }

  private void updateProjectPermissions(final Project project,
      final ProjectManager projectManager, final Reportal report, final User currentUser)
      throws ProjectManagerException {
    // Old permissions and users
    final List<Pair<String, Permission>> oldPermissions =
        project.getUserPermissions();
    final Set<String> oldUsers = new HashSet<>();
    for (final Pair<String, Permission> userPermission : oldPermissions) {
      oldUsers.add(userPermission.getFirst());
    }

    // Update permissions
    report.updatePermissions();

    // New permissions and users
    final List<Pair<String, Permission>> newPermissions =
        project.getUserPermissions();
    final Set<String> newUsers = new HashSet<>();
    for (final Pair<String, Permission> userPermission : newPermissions) {
      newUsers.add(userPermission.getFirst());
    }

    // Save all new permissions
    for (final Pair<String, Permission> userPermission : newPermissions) {
      if (!oldPermissions.contains(userPermission)) {
        projectManager.updateProjectPermission(project,
            userPermission.getFirst(), userPermission.getSecond(), false,
            currentUser);
      }
    }

    // Remove permissions for any old users no longer in the new users
    for (final String oldUser : oldUsers) {
      if (!newUsers.contains(oldUser)) {
        projectManager.removeProjectPermission(project, oldUser, false,
            currentUser);
      }
    }
  }

  private void handleRunReportalWithVariables(final HttpServletRequest req,
      final HashMap<String, Object> ret, final Session session) throws ServletException,
      IOException {
    final boolean isTestRun = hasParam(req, "testRun");

    final int id = getIntParam(req, "id");
    final ProjectManager projectManager = this.server.getProjectManager();
    final Project project = projectManager.getProject(id);
    final Reportal report = Reportal.loadFromProject(project);
    final User user = session.getUser();

    if (report.getAccessExecutors().size() > 0
        && !hasPermission(project, user, Type.EXECUTE)) {
      ret.put("error", "You are not allowed to run this report.");
      return;
    }

    for (final Query query : report.queries) {
      final String jobType = query.type;
      final ReportalType type = ReportalType.getTypeByName(jobType);
      if (!type.checkPermission(user)) {
        ret.put(
            "error",
            "You are not allowed to run this report as you don't have permission to run job type "
                + type.toString() + ".");
        return;
      }
    }

    final Flow flow = project.getFlows().get(0);

    final ExecutableFlow exflow = new ExecutableFlow(project, flow);
    exflow.setSubmitUser(user.getUserId());
    exflow.addAllProxyUsers(project.getProxyUsers());

    final ExecutionOptions options = exflow.getExecutionOptions();

    int i = 0;
    for (final Variable variable : ReportalUtil.getRunTimeVariables(report.variables)) {
      options.getFlowParameters().put(REPORTAL_VARIABLE_PREFIX + i + ".from",
          variable.name);
      options.getFlowParameters().put(REPORTAL_VARIABLE_PREFIX + i + ".to",
          getParam(req, "variable" + i));
      i++;
    }

    options.getFlowParameters()
        .put("reportal.execution.user", user.getUserId());

    // Add the execution user's email to the list of success and failure emails.
    final String email = user.getEmail();

    if (email != null && !email.isEmpty()) {
      if (isTestRun) { // Only email the executor
        final List<String> emails = new ArrayList<>();
        emails.add(email);
        options.setSuccessEmails(emails);
        options.setFailureEmails(emails);
      } else {
        options.getSuccessEmails().add(email);
        options.getFailureEmails().add(email);
      }
    }

    options.getFlowParameters().put("reportal.title", report.title);
    options.getFlowParameters().put("reportal.render.results.as.html",
        report.renderResultsAsHtml ? "true" : "false");
    options.getFlowParameters().put("reportal.unscheduled.run", "true");

    try {
      final String message =
          this.server.getExecutorManager().submitExecutableFlow(exflow,
              session.getUser().getUserId())
              + ".";
      ret.put("message", message);
      ret.put("result", "success");
      ret.put("redirect", "/reportal?view&logs&id=" + project.getId()
          + "&execid=" + exflow.getExecutionId());
    } catch (final ExecutorManagerException e) {
      e.printStackTrace();
      ret.put("error",
          "Error running report " + report.title + ". " + e.getMessage());
    }
  }

  private void preparePage(final Page page, final Session session) {
    page.add("viewerName", this.viewerName);
    page.add("hideNavigation", !this.showNav);
    page.add("userid", session.getUser().getUserId());
    page.add("esc", new EscapeTool());
  }

  private class CleanerThread extends Thread {

    private static final long DEFAULT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 1000;
    private static final long DEFAULT_OUTPUT_DIR_RETENTION_MS = 7 * 24 * 60
        * 60 * 1000;
    private static final long DEFAULT_MAIL_TEMP_DIR_RETENTION_MS =
        24 * 60 * 60 * 1000;
    // The frequency, in milliseconds, that the Reportal output
    // and mail temp directories should be cleaned
    private final long CLEAN_INTERVAL_MS;
    // The duration, in milliseconds, that Reportal output should be retained
    // for
    private final long OUTPUT_DIR_RETENTION_MS;
    // The duration, in milliseconds, that Reportal mail temp files should be
    // retained for
    private final long MAIL_TEMP_DIR_RETENTION_MS;
    private boolean shutdown = false;

    public CleanerThread() {
      this.setName("Reportal-Cleaner-Thread");
      this.CLEAN_INTERVAL_MS =
          ReportalServlet.this.props
              .getLong("reportal.clean.interval.ms", DEFAULT_CLEAN_INTERVAL_MS);
      this.OUTPUT_DIR_RETENTION_MS =
          ReportalServlet.this.props.getLong("reportal.output.dir.retention.ms",
              DEFAULT_OUTPUT_DIR_RETENTION_MS);
      this.MAIL_TEMP_DIR_RETENTION_MS =
          ReportalServlet.this.props.getLong("reportal.mail.temp.dir.retention.ms",
              DEFAULT_MAIL_TEMP_DIR_RETENTION_MS);
    }

    @SuppressWarnings("unused")
    public void shutdown() {
      this.shutdown = true;
      this.interrupt();
    }

    @Override
    public void run() {
      while (!this.shutdown) {
        synchronized (this) {
          logger.info("Cleaning old execution output dirs");
          cleanOldReportalOutputDirs();

          logger.info("Cleaning Reportal mail temp directory");
          cleanReportalMailTempDir();
        }

        try {
          Thread.sleep(this.CLEAN_INTERVAL_MS);
        } catch (final InterruptedException e) {
          logger.error("CleanerThread's sleep was interrupted.", e);
        }
      }
    }

    private void cleanOldReportalOutputDirs() {
      final IStreamProvider streamProvider =
          ReportalUtil.getStreamProvider(ReportalMailCreator.outputFileSystem);

      if (streamProvider instanceof StreamProviderHDFS) {
        final StreamProviderHDFS hdfsStreamProvider =
            (StreamProviderHDFS) streamProvider;
        hdfsStreamProvider.setHadoopSecurityManager(ReportalServlet.this.hadoopSecurityManager);
        hdfsStreamProvider.setUser(ReportalServlet.this.reportalStorageUser);
      }

      final long pastTimeThreshold =
          System.currentTimeMillis() - this.OUTPUT_DIR_RETENTION_MS;

      String[] oldFiles = null;
      try {
        oldFiles =
            streamProvider.getOldFiles(ReportalMailCreator.outputLocation,
                pastTimeThreshold);
      } catch (final Exception e) {
        logger.error("Error getting old files from "
            + ReportalMailCreator.outputLocation + " on "
            + ReportalMailCreator.outputFileSystem + " file system.", e);
      }

      if (oldFiles != null) {
        for (final String file : oldFiles) {
          final String filePath = ReportalMailCreator.outputLocation + "/" + file;
          try {
            streamProvider.deleteFile(filePath);
          } catch (final Exception e) {
            logger.error("Error deleting file " + filePath + " from "
                + ReportalMailCreator.outputFileSystem + " file system.", e);
          }
        }
      }
    }

    private void cleanReportalMailTempDir() {
      final File dir = ReportalServlet.this.reportalMailTempDirectory;
      final long pastTimeThreshold =
          System.currentTimeMillis() - this.MAIL_TEMP_DIR_RETENTION_MS;

      final File[] oldMailTempDirs = dir.listFiles(new FileFilter() {
        @Override
        public boolean accept(final File path) {
          if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
            return true;
          }
          return false;
        }
      });

      for (final File tempDir : oldMailTempDirs) {
        try {
          FileUtils.deleteDirectory(tempDir);
        } catch (final IOException e) {
          logger.error(
              "Error cleaning Reportal mail temp dir " + tempDir.getPath(), e);
        }
      }
    }
  }
}