azkaban-aplcache

Moving reportal codebase to main Azkaban repo (#1712) Today

3/29/2018 3:20:42 PM

Changes

build.gradle 3(+3 -0)

settings.gradle 2(+2 -0)

Details

diff --git a/az-reportal/build.gradle b/az-reportal/build.gradle
new file mode 100644
index 0000000..2e3cd0a
--- /dev/null
+++ b/az-reportal/build.gradle
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+apply plugin: 'distribution'
+
+dependencies {
+    compile project(':az-core')
+    compile project(":azkaban-common")
+    compile project(":azkaban-web-server")
+    compile project(":azkaban-hadoop-security-plugin")
+
+    compileOnly deps.hadoopCommon
+    compileOnly deps.hadoopMRClientCommon
+    compileOnly deps.hadoopMRClientCore
+    compileOnly (deps.hiveCli) {
+        transitive = false
+    }
+    compileOnly deps.hiveMetastore
+    compileOnly(deps.hiveExecCore) {
+        exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+        exclude group: 'eigenbase', module: 'eigenbase-properties'
+    }
+    compileOnly(deps.pig) {
+        transitive = false
+    }
+}
+
+distributions {
+    main {
+        contents {
+            from(jar) {
+                into 'lib'
+            }
+        }
+    }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalAbstractRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalAbstractRunner.java
new file mode 100644
index 0000000..56e6d81
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalAbstractRunner.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import static azkaban.security.commons.SecurityUtils.MAPREDUCE_JOB_CREDENTIALS_BINARY;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.reportal.util.BoundedOutputStream;
+import azkaban.reportal.util.ReportalRunnerException;
+import azkaban.utils.Props;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TimeZone;
+import org.apache.hadoop.conf.Configuration;
+
+public abstract class ReportalAbstractRunner {
+
+  private static final String REPORTAL_VARIABLE_PREFIX = "reportal.variable.";
+  protected Props props;
+  protected OutputStream outputStream;
+  protected String proxyUser;
+  protected String jobQuery;
+  protected String jobTitle;
+  protected String reportalTitle;
+  protected String reportalStorageUser;
+  protected int outputCapacity;
+  protected Map<String, String> variables = new HashMap<>();
+
+  public ReportalAbstractRunner(final Properties props) {
+    final Props prop = new Props();
+    prop.put(props);
+    this.props = prop;
+  }
+
+  public void run() throws Exception {
+    System.out.println("Reportal: Setting up environment");
+
+    // Check the properties file
+    if (this.props == null) {
+      throw new ReportalRunnerException("Properties file not loaded correctly.");
+    }
+
+    // Get the hadoop token
+    final Configuration conf = new Configuration();
+    if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+      conf.set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
+          System.getenv(HADOOP_TOKEN_FILE_LOCATION));
+    }
+
+    // Get properties
+    final String execId = this.props.getString(CommonJobProperties.EXEC_ID);
+    this.outputCapacity = this.props.getInt("reportal.output.capacity", 10 * 1024 * 1024);
+    this.proxyUser = this.props.getString("reportal.proxy.user");
+    this.jobQuery = this.props.getString("reportal.job.query");
+    this.jobTitle = this.props.getString("reportal.job.title");
+    this.reportalTitle = this.props.getString("reportal.title");
+    this.reportalStorageUser = this.props.getString("reportal.storage.user", "reportal");
+    final Map<String, String> reportalVariables =
+        this.props.getMapByPrefix(REPORTAL_VARIABLE_PREFIX);
+
+    // Parse variables
+    for (final Entry<String, String> entry : reportalVariables.entrySet()) {
+      if (entry.getKey().endsWith("from")) {
+        final String fromValue = entry.getValue();
+        final String toKey =
+            entry.getKey().substring(0, entry.getKey().length() - 4) + "to";
+        final String toValue = reportalVariables.get(toKey);
+        if (toValue != null) {
+          this.variables.put(fromValue, toValue);
+        }
+      }
+    }
+
+    // Built-in variables
+    this.variables.put("run_id", execId);
+    this.variables.put("sys_date", Long.toString(System.currentTimeMillis() / 1000));
+
+    final Calendar cal = Calendar.getInstance();
+    final Date date = new Date();
+    cal.setTime(date);
+
+    final String timeZone = this.props.getString("reportal.default.timezone", "UTC");
+    TimeZone.setDefault(TimeZone.getTimeZone(timeZone));
+
+    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    final SimpleDateFormat hourFormat = new SimpleDateFormat("yyyy-MM-dd-HH");
+
+    this.variables.put("hive_current_hour", hourFormat.format(cal.getTime()));
+    this.variables.put("hive_current_day", dateFormat.format(cal.getTime()));
+    cal.add(Calendar.HOUR, -1);
+    this.variables.put("hive_last_hour", hourFormat.format(cal.getTime()));
+    cal.add(Calendar.HOUR, 1);
+    cal.add(Calendar.DATE, -1);
+    this.variables.put("hive_yesterday", dateFormat.format(cal.getTime()));
+    cal.add(Calendar.DATE, -6);
+    this.variables.put("hive_last_seven_days", dateFormat.format(cal.getTime()));
+    cal.add(Calendar.DATE, -1);
+    this.variables.put("hive_last_eight_days", dateFormat.format(cal.getTime()));
+    this.variables.put("owner", this.proxyUser);
+    this.variables.put("title", this.reportalTitle);
+
+    // Props debug
+    System.out.println("Reportal Variables:");
+    for (final Entry<String, String> data : this.variables.entrySet()) {
+      System.out.println(data.getKey() + " -> " + data.getValue());
+    }
+
+    if (requiresOutput()) {
+      // Get output stream to data
+      final String locationTemp =
+          ("./reportal/" + this.jobTitle + ".csv").replace("//", "/");
+      final File tempOutput = new File(locationTemp);
+      tempOutput.getParentFile().mkdirs();
+      tempOutput.createNewFile();
+      this.outputStream =
+          new BoundedOutputStream(new BufferedOutputStream(
+              new FileOutputStream(tempOutput)), this.outputCapacity);
+
+      // Run the reportal
+      runReportal();
+
+      // Cleanup the reportal
+      try {
+        this.outputStream.close();
+      } catch (final IOException e) {
+        // We can safely ignore this exception since we're just making sure the
+        // stream is closed.
+      }
+    } else {
+      runReportal();
+    }
+  }
+
+  protected abstract void runReportal() throws Exception;
+
+  protected boolean requiresOutput() {
+    return true;
+  }
+
+  protected String injectVariables(String line) {
+    for (final Entry<String, String> entry : this.variables.entrySet()) {
+      line =
+          line.replace(":" + entry.getKey(), sanitizeVariable(entry.getValue()));
+    }
+    return line;
+  }
+
+  private String sanitizeVariable(final String variable) {
+    return variable.replace("'", "\\'").replace("\"", "\\\"");
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalDataCollector.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalDataCollector.java
new file mode 100644
index 0000000..9e13d15
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalDataCollector.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.reportal.util.CompositeException;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.utils.Props;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.io.IOUtils;
+
+public class ReportalDataCollector extends ReportalAbstractRunner {
+
+  Props prop;
+
+  public ReportalDataCollector(final String jobName, final Properties props) {
+    super(props);
+    this.prop = new Props();
+    this.prop.put(props);
+  }
+
+  @Override
+  protected void runReportal() throws Exception {
+    System.out.println("Reportal Data Collector: Initializing");
+
+    final String outputFileSystem =
+        this.props.getString("reportal.output.filesystem", "local");
+    final String outputBase = this.props.getString("reportal.output.dir", "/tmp/reportal");
+    final String execId = this.props.getString(CommonJobProperties.EXEC_ID);
+
+    final int jobNumber = this.prop.getInt("reportal.job.number");
+    final List<Exception> exceptions = new ArrayList<>();
+    for (int i = 0; i < jobNumber; i++) {
+      InputStream tempStream = null;
+      IStreamProvider outputProvider = null;
+      OutputStream persistentStream = null;
+      try {
+        final String jobTitle = this.prop.getString("reportal.job." + i);
+        System.out.println("Reportal Data Collector: Job name=" + jobTitle);
+
+        final String tempFileName = jobTitle + ".csv";
+        // We add the job index to the beginning of the job title to allow us to
+        // sort the files correctly.
+        final String persistentFileName = i + "-" + tempFileName;
+
+        final String subPath = "/" + execId + "/" + persistentFileName;
+        final String locationFull = (outputBase + subPath).replace("//", "/");
+        final String locationTemp = ("./reportal/" + tempFileName).replace("//", "/");
+        final File tempOutput = new File(locationTemp);
+        if (!tempOutput.exists()) {
+          throw new FileNotFoundException("File: "
+              + tempOutput.getAbsolutePath() + " does not exist.");
+        }
+
+        // Copy file to persistent saving location
+        System.out
+            .println("Reportal Data Collector: Saving output to persistent storage");
+        System.out.println("Reportal Data Collector: FS=" + outputFileSystem
+            + ", Location=" + locationFull);
+        // Open temp file
+        tempStream = new BufferedInputStream(new FileInputStream(tempOutput));
+        // Open file from HDFS if specified
+        outputProvider = ReportalUtil.getStreamProvider(outputFileSystem);
+        persistentStream = outputProvider.getFileOutputStream(locationFull);
+        // Copy it
+        IOUtils.copy(tempStream, persistentStream);
+
+      } catch (final Exception e) {
+        System.out.println("Reportal Data Collector: Data collection failed. "
+            + e.getMessage());
+        e.printStackTrace();
+        exceptions.add(e);
+      } finally {
+        IOUtils.closeQuietly(tempStream);
+        IOUtils.closeQuietly(persistentStream);
+
+        try {
+          outputProvider.cleanUp();
+        } catch (final IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    if (exceptions.size() > 0) {
+      throw new CompositeException(exceptions);
+    }
+
+    System.out.println("Reportal Data Collector: Ended successfully");
+  }
+
+  @Override
+  protected boolean requiresOutput() {
+    return false;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalHiveRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalHiveRunner.java
new file mode 100644
index 0000000..e641adb
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalHiveRunner.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import static azkaban.security.commons.SecurityUtils.MAPREDUCE_JOB_CREDENTIALS_BINARY;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+import azkaban.reportal.util.BoundedOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Scanner;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.cli.CliDriver;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.cli.OptionsProcessor;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+public class ReportalHiveRunner extends ReportalAbstractRunner {
+
+  public ReportalHiveRunner(final String jobName, final Properties props) {
+    super(props);
+  }
+
+  /**
+   * Normally hive.aux.jars.path is expanded from just being a path to the full
+   * list of files in the directory by the hive shell script. Since we normally
+   * won't be running from the script, it's up to us to do that work here. We
+   * use a heuristic that if there is no occurrence of ".jar" in the original,
+   * it needs expansion. Otherwise it's already been done for us. Also, surround
+   * the files with uri niceities.
+   */
+  static String expandHiveAuxJarsPath(final String original) throws IOException {
+    if (original == null || original.endsWith(".jar")) {
+      return original;
+    }
+
+    final File[] files = new File(original).listFiles();
+
+    if (files == null || files.length == 0) {
+      return original;
+    }
+
+    return filesToURIString(files);
+
+  }
+
+  static String filesToURIString(final File[] files) throws IOException {
+    final StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < files.length; i++) {
+      sb.append("file:///").append(files[i].getCanonicalPath());
+      if (i != files.length - 1) {
+        sb.append(",");
+      }
+    }
+
+    return sb.toString();
+  }
+
+  @Override
+  protected void runReportal() throws Exception {
+    System.out.println("Reportal Hive: Setting up Hive");
+    final HiveConf conf = new HiveConf(SessionState.class);
+
+    if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+      conf.set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
+          System.getenv(HADOOP_TOKEN_FILE_LOCATION));
+    }
+
+    final File tempTSVFile = new File("./temp.tsv");
+    final OutputStream tsvTempOutputStream =
+        new BoundedOutputStream(new BufferedOutputStream(new FileOutputStream(
+            tempTSVFile)), this.outputCapacity);
+    final PrintStream logOut = System.out;
+
+    final String orig = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+
+    final CliSessionState sessionState = new CliSessionState(conf);
+    sessionState.in = System.in;
+    sessionState.out = new PrintStream(tsvTempOutputStream, true, "UTF-8");
+    sessionState.err = new PrintStream(logOut, true, "UTF-8");
+
+    final OptionsProcessor oproc = new OptionsProcessor();
+
+    // Feed in Hive Args
+    final String[] args = buildHiveArgs();
+    if (!oproc.process_stage1(args)) {
+      throw new Exception("unable to parse options stage 1");
+    }
+
+    if (!oproc.process_stage2(sessionState)) {
+      throw new Exception("unable to parse options stage 2");
+    }
+
+    // Set all properties specified via command line
+    for (final Map.Entry<Object, Object> item : sessionState.cmdProperties.entrySet()) {
+      conf.set((String) item.getKey(), (String) item.getValue());
+    }
+
+    SessionState.start(sessionState);
+
+    final String expanded = expandHiveAuxJarsPath(orig);
+    if (orig == null || orig.equals(expanded)) {
+      System.out.println("Hive aux jars variable not expanded");
+    } else {
+      System.out.println("Expanded aux jars variable from [" + orig + "] to ["
+          + expanded + "]");
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEAUXJARS, expanded);
+    }
+
+    // hadoop-20 and above - we need to augment classpath using hiveconf
+    // components
+    // see also: code in ExecDriver.java
+    ClassLoader loader = conf.getClassLoader();
+    final String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+
+    System.out.println("Got auxJars = " + auxJars);
+
+    if (StringUtils.isNotBlank(auxJars)) {
+      loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
+    }
+    conf.setClassLoader(loader);
+    Thread.currentThread().setContextClassLoader(loader);
+
+    final CliDriver cli = new CliDriver();
+    int returnValue = 0;
+    String prefix = "";
+
+    returnValue = cli.processLine("set hive.cli.print.header=true;");
+    final String[] queries = this.jobQuery.split("\n");
+    for (String line : queries) {
+      if (!prefix.isEmpty()) {
+        prefix += '\n';
+      }
+      if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
+        line = prefix + line;
+        line = injectVariables(line);
+        System.out.println("Reportal Hive: Running Hive Query: " + line);
+        returnValue = cli.processLine(line);
+        prefix = "";
+      } else {
+        prefix = prefix + line;
+        continue;
+      }
+    }
+
+    tsvTempOutputStream.close();
+
+    // convert tsv to csv and write it do disk
+    System.out.println("Reportal Hive: Converting output");
+    final InputStream tsvTempInputStream =
+        new BufferedInputStream(new FileInputStream(tempTSVFile));
+    final Scanner rowScanner = new Scanner(tsvTempInputStream, StandardCharsets.UTF_8.toString());
+    final PrintStream csvOutputStream = new PrintStream(this.outputStream);
+    while (rowScanner.hasNextLine()) {
+      final String tsvLine = rowScanner.nextLine();
+      // strip all quotes, and then quote the columns
+      csvOutputStream.println("\""
+          + tsvLine.replace("\"", "").replace("\t", "\",\"") + "\"");
+    }
+    rowScanner.close();
+    csvOutputStream.close();
+
+    // Flush the temp file out
+    tempTSVFile.delete();
+
+    if (returnValue != 0) {
+      throw new Exception("Hive query finished with a non zero return code");
+    }
+
+    System.out.println("Reportal Hive: Ended successfully");
+  }
+
+  private String[] buildHiveArgs() {
+    final List<String> confBuilder = new ArrayList<>();
+
+    if (this.proxyUser != null) {
+      confBuilder.add("hive.exec.scratchdir=/tmp/hive-" + this.proxyUser);
+    }
+
+    if (this.jobTitle != null) {
+      confBuilder.add("mapred.job.name=\"Reportal: " + this.jobTitle + "\"");
+    }
+
+    confBuilder.add("mapreduce.job.complete.cancel.delegation.tokens=false");
+
+    final String[] args = new String[confBuilder.size() * 2];
+
+    for (int i = 0; i < confBuilder.size(); i++) {
+      args[i * 2] = "--hiveconf";
+      args[i * 2 + 1] = confBuilder.get(i);
+    }
+
+    return args;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalPigRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalPigRunner.java
new file mode 100644
index 0000000..f17fbeb
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalPigRunner.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.reportal.util.BoundedOutputStream;
+import azkaban.reportal.util.ReportalRunnerException;
+import azkaban.utils.Props;
+import azkaban.utils.StringUtils;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Scanner;
+import java.util.Set;
+import org.apache.pig.PigRunner;
+import org.apache.pig.tools.pigstats.PigStats;
+
+public class ReportalPigRunner extends ReportalAbstractRunner {
+
+  public static final String PIG_PARAM_PREFIX = "param.";
+  public static final String PIG_PARAM_FILES = "paramfile";
+  public static final String PIG_SCRIPT = "reportal.pig.script";
+  public static final String UDF_IMPORT_LIST = "udf.import.list";
+  public static final String PIG_ADDITIONAL_JARS = "pig.additional.jars";
+  Props prop;
+
+  public ReportalPigRunner(final String jobName, final Properties props) {
+    super(props);
+    this.prop = new Props();
+    this.prop.put(props);
+  }
+
+  private static void handleError(final File pigLog) throws Exception {
+    System.out.println();
+    System.out.println("====Pig logfile dump====");
+    System.out.println("File: " + pigLog.getAbsolutePath());
+    System.out.println();
+    try {
+      final BufferedReader reader = new BufferedReader(
+          new InputStreamReader(
+              new FileInputStream(pigLog), StandardCharsets.UTF_8.toString()
+          ));
+
+      String line = reader.readLine();
+      while (line != null) {
+        System.out.println(line);
+        line = reader.readLine();
+      }
+      reader.close();
+      System.out.println();
+      System.out.println("====End logfile dump====");
+    } catch (final FileNotFoundException e) {
+      System.out.println("pig log file: " + pigLog + "  not found.");
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  protected void runReportal() throws Exception {
+    System.out.println("Reportal Pig: Setting up Pig");
+
+    injectAllVariables(this.prop.getString(PIG_SCRIPT));
+
+    final String[] args = getParams();
+
+    System.out.println("Reportal Pig: Running pig script");
+    final PrintStream oldOutputStream = System.out;
+
+    final File tempOutputFile = new File("./temp.out");
+    final OutputStream tempOutputStream =
+        new BoundedOutputStream(new BufferedOutputStream(new FileOutputStream(
+            tempOutputFile)), this.outputCapacity);
+    final PrintStream printStream = new PrintStream(tempOutputStream);
+    System.setOut(printStream);
+
+    final PigStats stats = PigRunner.run(args, null);
+
+    System.setOut(oldOutputStream);
+
+    printStream.close();
+
+    // convert pig output to csv and write it to disk
+    System.out.println("Reportal Pig: Converting output");
+    final InputStream tempInputStream =
+        new BufferedInputStream(new FileInputStream(tempOutputFile));
+    final Scanner rowScanner = new Scanner(tempInputStream, StandardCharsets.UTF_8.toString());
+    final PrintStream csvOutputStream = new PrintStream(this.outputStream);
+    while (rowScanner.hasNextLine()) {
+      String line = rowScanner.nextLine();
+      // strip all quotes, and then quote the columns
+      if (line.startsWith("(")) {
+        line = transformDumpLine(line);
+      } else {
+        line = transformDescriptionLine(line);
+      }
+      csvOutputStream.println(line);
+    }
+    rowScanner.close();
+    csvOutputStream.close();
+
+    // Flush the temp file out
+    tempOutputFile.delete();
+
+    if (!stats.isSuccessful()) {
+      System.out.println("Reportal Pig: Handling errors");
+
+      final File pigLogFile = new File("./");
+
+      final File[] fileList = pigLogFile.listFiles();
+
+      for (final File file : fileList) {
+        if (file.isFile() && file.getName().matches("^pig_.*\\.log$")) {
+          handleError(file);
+        }
+      }
+
+      // see jira ticket PIG-3313. Will remove these when we use pig binary with
+      // that patch.
+      // /////////////////////
+      System.out.println("Trying to do self kill, in case pig could not.");
+      final Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+      final Thread[] threadArray = threadSet.toArray(new Thread[threadSet.size()]);
+      for (final Thread t : threadArray) {
+        if (!t.isDaemon() && !t.equals(Thread.currentThread())) {
+          System.out.println("Killing thread " + t);
+          t.interrupt();
+          t.stop();
+        }
+      }
+      System.exit(1);
+      // ////////////////////
+
+      throw new ReportalRunnerException("Pig job failed.");
+    } else {
+      System.out.println("Reportal Pig: Ended successfully");
+    }
+  }
+
+  private String[] getParams() {
+    final ArrayList<String> list = new ArrayList<>();
+
+    final Map<String, String> map = getPigParams();
+    if (map != null) {
+      for (final Map.Entry<String, String> entry : map.entrySet()) {
+        list.add("-param");
+        list.add(StringUtils.shellQuote(
+            entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
+      }
+    }
+
+    // Run in local mode if filesystem is set to local.
+    if (this.prop.getString("reportal.output.filesystem", "local").equals("local")) {
+      list.add("-x");
+      list.add("local");
+    }
+
+    // Register any additional Pig jars
+    final String additionalPigJars = this.prop.getString(PIG_ADDITIONAL_JARS, null);
+    if (additionalPigJars != null && additionalPigJars.length() > 0) {
+      list.add("-Dpig.additional.jars=" + additionalPigJars);
+    }
+
+    // Add UDF import list
+    final String udfImportList = this.prop.getString(UDF_IMPORT_LIST, null);
+    if (udfImportList != null && udfImportList.length() > 0) {
+      list.add("-Dudf.import.list=" + udfImportList);
+    }
+
+    // Add the script to execute
+    list.add(this.prop.getString(PIG_SCRIPT));
+    return list.toArray(new String[0]);
+  }
+
+  protected Map<String, String> getPigParams() {
+    return this.prop.getMapByPrefix(PIG_PARAM_PREFIX);
+  }
+
+  private String transformDescriptionLine(final String line) {
+    final int start = line.indexOf(':');
+    String cleanLine = line;
+    if (start != -1 && start + 3 < line.length()) {
+      cleanLine = line.substring(start + 3, line.length() - 1);
+    }
+    return "\"" + cleanLine.replace("\"", "").replace(",", "\",\"") + "\"";
+  }
+
+  private String transformDumpLine(final String line) {
+    final String cleanLine = line.substring(1, line.length() - 1);
+    return "\"" + cleanLine.replace("\"", "").replace(",", "\",\"") + "\"";
+  }
+
+  private void injectAllVariables(final String file) throws FileNotFoundException {
+    // Inject variables into the script
+    System.out.println("Reportal Pig: Replacing variables");
+    final File inputFile = new File(file);
+    final File outputFile = new File(file + ".bak");
+    final InputStream scriptInputStream =
+        new BufferedInputStream(new FileInputStream(inputFile));
+    final Scanner rowScanner = new Scanner(scriptInputStream, StandardCharsets.UTF_8.toString());
+    final PrintStream scriptOutputStream =
+        new PrintStream(new BufferedOutputStream(new FileOutputStream(
+            outputFile)));
+    while (rowScanner.hasNextLine()) {
+      String line = rowScanner.nextLine();
+      line = injectVariables(line);
+      scriptOutputStream.println(line);
+    }
+    rowScanner.close();
+    scriptOutputStream.close();
+    outputFile.renameTo(inputFile);
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
new file mode 100644
index 0000000..81b0e5e
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.reportal.util.CompositeException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.sql.DataSource;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.lang.StringUtils;
+
+public class ReportalTeradataRunner extends ReportalAbstractRunner {
+
+  public ReportalTeradataRunner(final String jobName, final Properties props) {
+    super(props);
+  }
+
+  @Override
+  protected void runReportal() throws Exception {
+    System.out.println("Reportal Teradata: Setting up Teradata");
+    final List<Exception> exceptions = new ArrayList<>();
+
+    Class.forName("com.teradata.jdbc.TeraDriver");
+    final String connectionString =
+        this.props.getString("reportal.teradata.connection.string", null);
+
+    final String user = this.props.getString("reportal.teradata.username", null);
+    final String pass = this.props.getString("reportal.teradata.password", null);
+
+    final Map<String, String> queryBandProperties = new HashMap<>();
+    queryBandProperties.put("USER", this.proxyUser);
+    queryBandProperties
+        .put(CommonJobProperties.EXEC_ID, this.props.getString(CommonJobProperties.EXEC_ID));
+    queryBandProperties
+        .put(CommonJobProperties.PROJECT_NAME,
+            this.props.getString(CommonJobProperties.PROJECT_NAME));
+    queryBandProperties
+        .put(CommonJobProperties.FLOW_ID, this.props.getString(CommonJobProperties.FLOW_ID));
+    queryBandProperties
+        .put(CommonJobProperties.JOB_ID, this.props.getString(CommonJobProperties.JOB_ID));
+    final String attemptUrl = this.props.getString(CommonJobProperties.ATTEMPT_LINK);
+    queryBandProperties.put(CommonJobProperties.ATTEMPT_LINK, attemptUrl);
+    final URI attemptUri = new URI(attemptUrl);
+    queryBandProperties.put("azkaban.server", attemptUri.getHost());
+
+    if (user == null) {
+      System.out.println("Reportal Teradata: Configuration incomplete");
+      throw new RuntimeException(
+          "The reportal.teradata.username variable was not defined.");
+    }
+    if (pass == null) {
+      System.out.println("Reportal Teradata: Configuration incomplete");
+      throw new RuntimeException(
+          "The reportal.teradata.password variable was not defined.");
+    }
+
+    final DataSource teraDataSource =
+        new TeradataDataSource(connectionString, user, pass);
+    final Connection conn = teraDataSource.getConnection();
+
+    final String[] sqlQueries = cleanAndGetQueries(this.jobQuery, queryBandProperties);
+
+    final int numQueries = sqlQueries.length;
+
+    for (int i = 0; i < numQueries; i++) {
+      try {
+        final String queryLine = sqlQueries[i];
+
+        // Only store results from the last statement
+        if (i == numQueries - 1) {
+          final PreparedStatement stmt = prepareStatement(conn, queryLine);
+          stmt.execute();
+          final ResultSet rs = stmt.getResultSet();
+          outputQueryResult(rs, this.outputStream);
+          stmt.close();
+        } else {
+          try {
+            final PreparedStatement stmt = prepareStatement(conn, queryLine);
+            stmt.execute();
+            stmt.close();
+          } catch (final NullPointerException e) {
+            // An empty query (or comment) throws a NPE in JDBC. Yay!
+            System.err
+                .println("Caught NPE in execute call because report has a NOOP query: "
+                    + queryLine);
+          }
+        }
+      } catch (final Exception e) {
+        // Catch and continue. Delay exception throwing until we've run all
+        // queries in this task.
+        System.out.println("Reportal Teradata: SQL query failed. "
+            + e.getMessage());
+        e.printStackTrace();
+        exceptions.add(e);
+      }
+    }
+
+    if (exceptions.size() > 0) {
+      throw new CompositeException(exceptions);
+    }
+
+    System.out.println("Reportal Teradata: Ended successfully");
+  }
+
+  protected String[] cleanAndGetQueries(final String sqlQuery,
+      final Map<String, String> queryBandProperties) {
+
+    /**
+     * Teradata's SET Query_Band allows use to "proxy" to an LDAP user. This
+     * makes queries appear to admins as though it's being issued by the owner
+     * of the report, rather than the 'Reportal' user. Tables will still be
+     * "owned" by Reportal, but admins will be able to send angry emails to the
+     * proper user when a reportal query is impacting the system negatively.
+     * Best we could do.
+     */
+
+    final StringBuilder queryBandBuilder = new StringBuilder();
+    for (final Map.Entry<String, String> pair : queryBandProperties.entrySet()) {
+      queryBandBuilder.append("" + pair.getKey() + "=" + pair.getValue() + ";");
+    }
+
+    final String queryBand =
+        "SET Query_Band = '" + queryBandBuilder.toString()
+            + "' FOR SESSION;";
+    final ArrayList<String> injectedQueries = new ArrayList<>();
+
+    injectedQueries.add(queryBand);
+    final String[] queries = StringUtils.split(sqlQuery.trim(), ";");
+    for (String query : queries) {
+      query = cleanQueryLine(query);
+      if (query == null || query.isEmpty()) {
+        continue;
+      }
+      injectedQueries.add(query);
+    }
+
+    return injectedQueries.toArray(new String[]{});
+  }
+
+  private String cleanQueryLine(final String line) {
+    if (line != null) {
+      return line.trim();
+    }
+    return null;
+  }
+
+  private void outputQueryResult(final ResultSet result, final OutputStream outputStream)
+      throws SQLException {
+    final PrintStream outFile = new PrintStream(outputStream);
+    final String delim = ",";
+    boolean isHeaderPending = true;
+    if (result != null) {
+      while (result.next()) {
+        final int numColumns = result.getMetaData().getColumnCount();
+        final StringBuilder dataString = new StringBuilder();
+
+        if (isHeaderPending) {
+          final StringBuilder headerString = new StringBuilder();
+          for (int j = 1; j <= numColumns; j++) {
+            final String colName = formatValue(result.getMetaData().getColumnName(j));
+            if (j > 1) {
+              headerString.append(delim).append(colName);
+            } else {
+              headerString.append(colName);
+            }
+          }
+          isHeaderPending = false;
+          outFile.println(headerString.toString());
+        }
+
+        for (int j = 1; j <= numColumns; j++) {
+          String colVal = result.getString(j);
+
+          if (colVal == null) {
+            colVal = "\"null\"";
+          } else {
+            colVal = formatValue(colVal);
+          }
+
+          if (j > 1) {
+            dataString.append(delim).append(colVal);
+          } else {
+            dataString.append(colVal);
+          }
+        }
+
+        outFile.println(dataString.toString());
+      }
+    }
+    outFile.close();
+  }
+
+  private String formatValue(final String value) {
+    return "\"" + value.replace("\"", "") + "\"";
+  }
+
+  private PreparedStatement prepareStatement(final Connection conn, String line)
+      throws SQLException {
+    line = injectVariables(line);
+
+    // For some reason, teradata's adapter can't seem to handle this well
+    // List<String> variableReplacements = new ArrayList<String>();
+    //
+    // for (Entry<String, String> entry : variables.entrySet()) {
+    // String key = ":" + entry.getKey();
+    // int index;
+    // while ((index = line.indexOf(key)) != -1) {
+    // line = line.substring(0, index) + "?" + line.substring(index +
+    // key.length());
+    // variableReplacements.add(entry.getValue());
+    // }
+    // }
+
+    // StringBuilder sb = new StringBuilder();
+    final PreparedStatement stmt = conn.prepareStatement(line);
+    // for (int i = 0; i < variableReplacements.size(); i++) {
+    // stmt.setString(i + 1, variableReplacements.get(i));
+    // sb.append(variableReplacements.get(i)).append(",");
+    // }
+
+    System.out.println("Reportal Teradata: Teradata query: " + line);
+    // System.out.println("Reportal Teradata: Variables: " + sb.toString());
+    return stmt;
+  }
+
+  private static class TeradataDataSource extends BasicDataSource {
+
+    private TeradataDataSource(final String connectionString, final String user,
+        final String password) {
+      super();
+      setDriverClassName("com.teradata.jdbc.TeraDriver");
+      setUrl(connectionString);
+      setUsername(user);
+      setPassword(password);
+    }
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/BoundedOutputStream.java b/az-reportal/src/main/java/azkaban/reportal/util/BoundedOutputStream.java
new file mode 100644
index 0000000..ba29818
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/BoundedOutputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class BoundedOutputStream extends OutputStream {
+
+  OutputStream out;
+  int totalCapacity;
+  int remainingCapacity;
+  boolean hasExceededSize = false;
+  boolean havePrintedErrorMessage = false;
+
+  public BoundedOutputStream(final OutputStream out, final int size) {
+    this.out = out;
+    this.totalCapacity = size;
+    this.remainingCapacity = size;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    this.out.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.out.close();
+  }
+
+  @Override
+  public void write(final byte[] b) throws IOException {
+    if (this.remainingCapacity <= 0) {
+      this.hasExceededSize = true;
+    } else if (this.remainingCapacity - b.length < 0) {
+      this.out.write(b, 0, this.remainingCapacity);
+      this.remainingCapacity = 0;
+      this.hasExceededSize = true;
+    } else {
+      this.out.write(b);
+      this.remainingCapacity -= b.length;
+    }
+
+    if (this.hasExceededSize && !this.havePrintedErrorMessage) {
+      System.err.println("Output has exceeded the max limit of "
+          + this.totalCapacity + " bytes. Truncating remaining output.");
+      this.havePrintedErrorMessage = true;
+    }
+  }
+
+  @Override
+  public void write(final byte[] b, final int off, final int len) throws IOException {
+    if (this.remainingCapacity <= 0) {
+      this.hasExceededSize = true;
+    } else if (this.remainingCapacity - len < 0) {
+      this.out.write(b, off, this.remainingCapacity);
+      this.remainingCapacity = 0;
+      this.hasExceededSize = true;
+    } else {
+      this.out.write(b, off, len);
+      this.remainingCapacity -= len;
+    }
+
+    if (this.hasExceededSize && !this.havePrintedErrorMessage) {
+      System.err.println("Output has exceeded the max limit of "
+          + this.totalCapacity + " bytes. Truncating remaining output.");
+      this.havePrintedErrorMessage = true;
+    }
+  }
+
+  @Override
+  public void write(final int b) throws IOException {
+    if (this.remainingCapacity <= 0) {
+      this.hasExceededSize = true;
+
+      if (!this.havePrintedErrorMessage) {
+        System.err.println("Output has exceeded the max limit of "
+            + this.totalCapacity + " bytes. Truncating remaining output.");
+        this.havePrintedErrorMessage = true;
+      }
+
+      return;
+    }
+    this.out.write(b);
+    this.remainingCapacity--;
+  }
+
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/CompositeException.java b/az-reportal/src/main/java/azkaban/reportal/util/CompositeException.java
new file mode 100644
index 0000000..41e6746
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/CompositeException.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.util.List;
+
+public class CompositeException extends Exception {
+  private static final long serialVersionUID = 1L;
+  private final List<? extends Exception> es;
+
+  public CompositeException(final List<? extends Exception> es) {
+    this.es = es;
+  }
+
+  public List<? extends Exception> getExceptions() {
+    return this.es;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder str = new StringBuilder();
+    boolean pastFirst = false;
+
+    for (final Exception e : this.es) {
+      if (pastFirst) {
+        str.append("\n\n");
+      }
+      str.append(e.toString());
+      pastFirst = true;
+    }
+
+    return str.toString();
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/IStreamProvider.java b/az-reportal/src/main/java/azkaban/reportal/util/IStreamProvider.java
new file mode 100644
index 0000000..8c374a0
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/IStreamProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface IStreamProvider {
+
+  public void setUser(String user);
+
+  public String[] getFileList(String pathString) throws Exception;
+
+  /**
+   * Returns a list of all files in a directory with a modification time less
+   * than the specified time
+   */
+  public String[] getOldFiles(String pathString, long thresholdTime)
+      throws Exception;
+
+
+  /**
+   * Deletes the file denoted by the specified path. If the file is a directory,
+   * this method recursively deletes the files in the directory and the
+   * directory itself.
+   */
+  public void deleteFile(String pathString) throws Exception;
+
+  public InputStream getFileInputStream(String pathString) throws Exception;
+
+  public OutputStream getFileOutputStream(String pathString) throws Exception;
+
+  public void cleanUp() throws IOException;
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/Reportal.java b/az-reportal/src/main/java/azkaban/reportal/util/Reportal.java
new file mode 100644
index 0000000..6d3df92
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/Reportal.java
@@ -0,0 +1,504 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.scheduler.Schedule;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.Utils;
+import azkaban.viewer.reportal.ReportalMailCreator;
+import azkaban.viewer.reportal.ReportalTypeManager;
+import java.io.File;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Weeks;
+import org.joda.time.Years;
+import org.joda.time.format.DateTimeFormat;
+
+public class Reportal {
+
+  public static final String REPORTAL_CONFIG_PREFIX = "reportal.config.";
+  public static final String REPORTAL_CONFIG_PREFIX_REGEX =
+      "^reportal[.]config[.].+";
+  public static final String REPORTAL_CONFIG_PREFIX_NEGATION_REGEX =
+      "(?!(^reportal[.]config[.])).+";
+  public static final String ACCESS_LIST_SPLIT_REGEX =
+      "\\s*,\\s*|\\s*;\\s*|\\s+";
+  // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
+  private static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+  private static final Logger logger = Logger.getLogger(Reportal.class);
+  public static Getter<Boolean> boolGetter = new Getter<>(false,
+      Boolean.class);
+  public static Getter<Integer> intGetter = new Getter<>(0,
+      Integer.class);
+  public static Getter<String> stringGetter = new Getter<>("",
+      String.class);
+  public String reportalUser;
+  public String ownerEmail;
+  public String title;
+  public String description;
+  public List<Query> queries;
+  public List<Variable> variables;
+  public boolean schedule;
+  public String scheduleHour;
+  public String scheduleMinute;
+  public String scheduleAmPm;
+  public String scheduleTimeZone;
+  public String scheduleDate;
+  public boolean scheduleRepeat;
+  public String scheduleIntervalQuantity;
+  public String scheduleInterval;
+  public String endSchedule;
+  public boolean renderResultsAsHtml;
+  public String accessViewer;
+  public String accessExecutor;
+  public String accessOwner;
+  public String notifications;
+  public String failureNotifications;
+  public Project project;
+
+  public static Reportal loadFromProject(final Project project) {
+    if (project == null) {
+      return null;
+    }
+
+    final Reportal reportal = new Reportal();
+    final Map<String, Object> metadata = project.getMetadata();
+
+    reportal.loadImmutableFromProject(project);
+
+    if (reportal.reportalUser == null || reportal.reportalUser.isEmpty()) {
+      return null;
+    }
+
+    reportal.title = stringGetter.get(metadata.get("title"));
+    reportal.description = project.getDescription();
+    final int queries = intGetter.get(project.getMetadata().get("queryNumber"));
+    final int variables = intGetter.get(project.getMetadata().get("variableNumber"));
+
+    reportal.schedule = boolGetter.get(project.getMetadata().get("schedule"));
+    reportal.scheduleHour =
+        stringGetter.get(project.getMetadata().get("scheduleHour"));
+    reportal.scheduleMinute =
+        stringGetter.get(project.getMetadata().get("scheduleMinute"));
+    reportal.scheduleAmPm =
+        stringGetter.get(project.getMetadata().get("scheduleAmPm"));
+    reportal.scheduleTimeZone =
+        stringGetter.get(project.getMetadata().get("scheduleTimeZone"));
+    reportal.scheduleDate =
+        stringGetter.get(project.getMetadata().get("scheduleDate"));
+    reportal.scheduleRepeat =
+        boolGetter.get(project.getMetadata().get("scheduleRepeat"));
+    reportal.scheduleIntervalQuantity =
+        stringGetter.get(project.getMetadata().get("scheduleIntervalQuantity"));
+    reportal.scheduleInterval =
+        stringGetter.get(project.getMetadata().get("scheduleInterval"));
+    reportal.endSchedule =
+        stringGetter.get(project.getMetadata().get("endSchedule"));
+
+    reportal.renderResultsAsHtml =
+        boolGetter.get(project.getMetadata().get("renderResultsAsHtml"));
+
+    reportal.accessViewer =
+        stringGetter.get(project.getMetadata().get("accessViewer"));
+    reportal.accessExecutor =
+        stringGetter.get(project.getMetadata().get("accessExecutor"));
+    reportal.accessOwner =
+        stringGetter.get(project.getMetadata().get("accessOwner"));
+
+    reportal.notifications =
+        stringGetter.get(project.getMetadata().get("notifications"));
+    reportal.failureNotifications =
+        stringGetter.get(project.getMetadata().get("failureNotifications"));
+
+    reportal.queries = new ArrayList<>();
+
+    for (int i = 0; i < queries; i++) {
+      final Query query = new Query();
+      reportal.queries.add(query);
+      query.title =
+          stringGetter.get(project.getMetadata().get("query" + i + "title"));
+      query.type =
+          stringGetter.get(project.getMetadata().get("query" + i + "type"));
+      query.script =
+          stringGetter.get(project.getMetadata().get("query" + i + "script"));
+    }
+
+    reportal.variables = new ArrayList<>();
+
+    for (int i = 0; i < variables; i++) {
+      final String title =
+          stringGetter.get(project.getMetadata().get("variable" + i + "title"));
+      final String name =
+          stringGetter.get(project.getMetadata().get("variable" + i + "name"));
+      final Variable variable = new Variable(title, name);
+      reportal.variables.add(variable);
+    }
+
+    reportal.project = project;
+
+    return reportal;
+  }
+
+  public void saveToProject(final Project project) {
+    this.project = project;
+
+    project.getMetadata().put("reportal-user", this.reportalUser);
+    project.getMetadata().put("owner-email", this.ownerEmail);
+
+    project.getMetadata().put("title", this.title);
+    project.setDescription(this.description);
+
+    project.getMetadata().put("schedule", this.schedule);
+    project.getMetadata().put("scheduleHour", this.scheduleHour);
+    project.getMetadata().put("scheduleMinute", this.scheduleMinute);
+    project.getMetadata().put("scheduleAmPm", this.scheduleAmPm);
+    project.getMetadata().put("scheduleTimeZone", this.scheduleTimeZone);
+    project.getMetadata().put("scheduleDate", this.scheduleDate);
+    project.getMetadata().put("scheduleRepeat", this.scheduleRepeat);
+    project.getMetadata().put("scheduleIntervalQuantity",
+        this.scheduleIntervalQuantity);
+    project.getMetadata().put("scheduleInterval", this.scheduleInterval);
+    project.getMetadata().put("endSchedule", this.endSchedule);
+
+    project.getMetadata().put("renderResultsAsHtml", this.renderResultsAsHtml);
+
+    project.getMetadata().put("accessViewer", this.accessViewer);
+    project.getMetadata().put("accessExecutor", this.accessExecutor);
+    project.getMetadata().put("accessOwner", this.accessOwner);
+
+    project.getMetadata().put("queryNumber", this.queries.size());
+    for (int i = 0; i < this.queries.size(); i++) {
+      final Query query = this.queries.get(i);
+      project.getMetadata().put("query" + i + "title", query.title);
+      project.getMetadata().put("query" + i + "type", query.type);
+      project.getMetadata().put("query" + i + "script", query.script);
+    }
+
+    project.getMetadata().put("variableNumber", this.variables.size());
+    for (int i = 0; i < this.variables.size(); i++) {
+      final Variable variable = this.variables.get(i);
+      project.getMetadata().put("variable" + i + "title", variable.title);
+      project.getMetadata().put("variable" + i + "name", variable.name);
+    }
+
+    project.getMetadata().put("notifications", this.notifications);
+    project.getMetadata().put("failureNotifications", this.failureNotifications);
+  }
+
+  public void removeSchedules(final ScheduleManager scheduleManager)
+      throws ScheduleManagerException {
+    final List<Flow> flows = this.project.getFlows();
+    for (final Flow flow : flows) {
+      final Schedule sched =
+          scheduleManager.getSchedule(this.project.getId(), flow.getId());
+      if (sched != null) {
+        scheduleManager.removeSchedule(sched);
+      }
+    }
+  }
+
+  public void updateSchedules(final Reportal report, final ScheduleManager scheduleManager,
+      final User user, final Flow flow) throws ScheduleManagerException {
+    // Clear previous schedules
+    removeSchedules(scheduleManager);
+    // Add new schedule
+    if (this.schedule) {
+      final int hour =
+          (Integer.parseInt(this.scheduleHour) % 12)
+              + (this.scheduleAmPm.equalsIgnoreCase("pm") ? 12 : 0);
+      final int minute = Integer.parseInt(this.scheduleMinute) % 60;
+      final DateTimeZone timeZone =
+          this.scheduleTimeZone.equalsIgnoreCase("UTC") ? DateTimeZone.UTC
+              : DateTimeZone.getDefault();
+      DateTime firstSchedTime =
+          DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timeZone)
+              .parseDateTime(this.scheduleDate);
+      firstSchedTime =
+          firstSchedTime.withHourOfDay(hour).withMinuteOfHour(minute)
+              .withSecondOfMinute(0).withMillisOfSecond(0);
+
+      ReadablePeriod period = null;
+      if (this.scheduleRepeat) {
+        final int intervalQuantity = Integer.parseInt(this.scheduleIntervalQuantity);
+
+        if (this.scheduleInterval.equals("y")) {
+          period = Years.years(intervalQuantity);
+        } else if (this.scheduleInterval.equals("m")) {
+          period = Months.months(intervalQuantity);
+        } else if (this.scheduleInterval.equals("w")) {
+          period = Weeks.weeks(intervalQuantity);
+        } else if (this.scheduleInterval.equals("d")) {
+          period = Days.days(intervalQuantity);
+        } else if (this.scheduleInterval.equals("h")) {
+          period = Hours.hours(intervalQuantity);
+        } else if (this.scheduleInterval.equals("M")) {
+          period = Minutes.minutes(intervalQuantity);
+        }
+      }
+
+      final ExecutionOptions options = new ExecutionOptions();
+      options.getFlowParameters().put("reportal.execution.user",
+          user.getUserId());
+      options.getFlowParameters().put("reportal.title", report.title);
+      options.getFlowParameters().put("reportal.render.results.as.html",
+          report.renderResultsAsHtml ? "true" : "false");
+      options.setMailCreator(ReportalMailCreator.REPORTAL_MAIL_CREATOR);
+
+      final long endScheduleTime = report.endSchedule == null ?
+          DEFAULT_SCHEDULE_END_EPOCH_TIME : parseDateToEpoch(report.endSchedule);
+
+      logger.info("This report scheudle end time is " + endScheduleTime);
+
+      scheduleManager.scheduleFlow(-1, this.project.getId(), this.project.getName(),
+          flow.getId(), "ready", firstSchedTime.getMillis(), endScheduleTime,
+          firstSchedTime.getZone(), period, DateTime.now().getMillis(),
+          firstSchedTime.getMillis(), firstSchedTime.getMillis(),
+          user.getUserId(), options, null);
+    }
+  }
+
+  private long parseDateToEpoch(final String date) throws ScheduleManagerException {
+    final DateFormat dffrom = new SimpleDateFormat("MM/dd/yyyy h:mm a");
+    try {
+      // this string will be parsed according to system's timezone setting.
+      return dffrom.parse(date).getTime();
+    } catch (final Exception ex) {
+      throw new ScheduleManagerException("can not parse this date " + date);
+    }
+  }
+
+  /**
+   * Updates the project permissions in MEMORY, but does NOT update the project
+   * in the database.
+   */
+  public void updatePermissions() {
+    final String[] accessViewerList =
+        this.accessViewer.trim().split(ACCESS_LIST_SPLIT_REGEX);
+    final String[] accessExecutorList =
+        this.accessExecutor.trim().split(ACCESS_LIST_SPLIT_REGEX);
+    final String[] accessOwnerList =
+        this.accessOwner.trim().split(ACCESS_LIST_SPLIT_REGEX);
+    // Prepare permission types
+    final Permission admin = new Permission();
+    admin.addPermission(Type.READ);
+    admin.addPermission(Type.EXECUTE);
+    admin.addPermission(Type.ADMIN);
+    final Permission executor = new Permission();
+    executor.addPermission(Type.READ);
+    executor.addPermission(Type.EXECUTE);
+    final Permission viewer = new Permission();
+    viewer.addPermission(Type.READ);
+    // Sets the permissions
+    this.project.clearUserPermission();
+    for (String user : accessViewerList) {
+      user = user.trim();
+      if (!user.isEmpty()) {
+        this.project.setUserPermission(user, viewer);
+      }
+    }
+    for (String user : accessExecutorList) {
+      user = user.trim();
+      if (!user.isEmpty()) {
+        this.project.setUserPermission(user, executor);
+      }
+    }
+    for (String user : accessOwnerList) {
+      user = user.trim();
+      if (!user.isEmpty()) {
+        this.project.setUserPermission(user, admin);
+      }
+    }
+    this.project.setUserPermission(this.reportalUser, admin);
+  }
+
+  public void createZipAndUpload(final ProjectManager projectManager, final User user,
+      final String reportalStorageUser) throws Exception {
+    // Create temp folder to make the zip file for upload
+    final File tempDir = Utils.createTempDir();
+    final File dataDir = new File(tempDir, "data");
+    dataDir.mkdirs();
+
+    // Create all job files
+    String dependentJob = null;
+    final List<String> jobs = new ArrayList<>();
+    final Map<String, String> extraProps =
+        ReportalUtil.getVariableMapByPrefix(this.variables, REPORTAL_CONFIG_PREFIX);
+    for (final Query query : this.queries) {
+      // Create .job file
+      final File jobFile =
+          ReportalHelper.findAvailableFileName(dataDir,
+              ReportalHelper.sanitizeText(query.title), ".job");
+
+      final String fileName = jobFile.getName();
+      final String jobName = fileName.substring(0, fileName.length() - 4);
+      jobs.add(jobName);
+
+      // Populate the job file
+      ReportalTypeManager.createJobAndFiles(this, jobFile, jobName,
+          query.title, query.type, query.script, dependentJob, this.reportalUser,
+          extraProps);
+
+      // For dependency of next query
+      dependentJob = jobName;
+    }
+
+    // Create the data collector job
+    if (dependentJob != null) {
+      final String jobName = "data-collector";
+
+      // Create .job file
+      final File jobFile =
+          ReportalHelper.findAvailableFileName(dataDir,
+              ReportalHelper.sanitizeText(jobName), ".job");
+      final Map<String, String> extras = new HashMap<>();
+      extras.put("reportal.job.number", Integer.toString(jobs.size()));
+      for (int i = 0; i < jobs.size(); i++) {
+        extras.put("reportal.job." + i, jobs.get(i));
+      }
+      ReportalTypeManager.createJobAndFiles(this, jobFile, jobName, "",
+          ReportalTypeManager.DATA_COLLECTOR_JOB, "", dependentJob,
+          reportalStorageUser, extras);
+    }
+
+    // Zip jobs together
+    final File archiveFile = new File(tempDir, this.project.getName() + ".zip");
+    Utils.zipFolderContent(dataDir, archiveFile);
+
+    // Upload zip
+    projectManager.uploadProject(this.project, archiveFile, "zip", user, null);
+
+    // Empty temp
+    if (tempDir.exists()) {
+      FileUtils.deleteDirectory(tempDir);
+    }
+  }
+
+  public void loadImmutableFromProject(final Project project) {
+    this.reportalUser = stringGetter.get(project.getMetadata().get("reportal-user"));
+    this.ownerEmail = stringGetter.get(project.getMetadata().get("owner-email"));
+  }
+
+  /**
+   * @return A set of users explicitly granted viewer access to the report.
+   */
+  public Set<String> getAccessViewers() {
+    final Set<String> viewers = new HashSet<>();
+    for (final String user : this.accessViewer.trim().split(ACCESS_LIST_SPLIT_REGEX)) {
+      if (!user.isEmpty()) {
+        viewers.add(user);
+      }
+    }
+    return viewers;
+  }
+
+  /**
+   * @return A set of users explicitly granted executor access to the report.
+   */
+  public Set<String> getAccessExecutors() {
+    final Set<String> executors = new HashSet<>();
+    for (final String user : this.accessExecutor.trim().split(ACCESS_LIST_SPLIT_REGEX)) {
+      if (!user.isEmpty()) {
+        executors.add(user);
+
+      }
+    }
+    return executors;
+  }
+
+  public static class Getter<T> {
+
+    Class<?> cls;
+    T defaultValue;
+
+    public Getter(final T defaultValue, final Class<?> cls) {
+      this.cls = cls;
+      this.defaultValue = defaultValue;
+    }
+
+    @SuppressWarnings("unchecked")
+    public T get(final Object object) {
+      if (object == null || !(this.cls.isAssignableFrom(object.getClass()))) {
+        return this.defaultValue;
+      }
+      return (T) object;
+    }
+  }
+
+  public static class Query {
+
+    public String title;
+    public String type;
+    public String script;
+
+    public String getTitle() {
+      return this.title;
+    }
+
+    public String getType() {
+      return this.type;
+    }
+
+    public String getScript() {
+      return this.script;
+    }
+  }
+
+  public static class Variable {
+
+    public String title;
+    public String name;
+
+    public Variable() {
+    }
+
+    public Variable(final String title, final String name) {
+      this.title = title;
+      this.name = name;
+    }
+
+    public String getTitle() {
+      return this.title;
+    }
+
+    public String getName() {
+      return this.name;
+    }
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalHelper.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalHelper.java
new file mode 100644
index 0000000..18ceb73
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalHelper.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.User;
+import azkaban.webapp.AzkabanWebServer;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+import org.apache.commons.lang.StringUtils;
+
+public class ReportalHelper {
+
+  public static List<Project> getReportalProjects(final AzkabanWebServer server) {
+    final List<Project> projects = server.getProjectManager().getProjects();
+
+    final List<Project> reportalProjects = new ArrayList<>();
+
+    for (final Project project : projects) {
+      if (project.getMetadata().containsKey("reportal-user")) {
+        reportalProjects.add(project);
+      }
+    }
+
+    return reportalProjects;
+  }
+
+  public static void bookmarkProject(final AzkabanWebServer server, final Project project,
+      final User user) throws ProjectManagerException {
+    project.getMetadata().put("bookmark-" + user.getUserId(), true);
+    server.getProjectManager().updateProjectSetting(project);
+  }
+
+  public static void unBookmarkProject(final AzkabanWebServer server,
+      final Project project, final User user) throws ProjectManagerException {
+    project.getMetadata().remove("bookmark-" + user.getUserId());
+    server.getProjectManager().updateProjectSetting(project);
+  }
+
+  public static boolean isBookmarkProject(final Project project, final User user) {
+    return project.getMetadata().containsKey("bookmark-" + user.getUserId());
+  }
+
+  public static void subscribeProject(final AzkabanWebServer server, final Project project,
+      final User user, final String email) throws ProjectManagerException {
+    @SuppressWarnings("unchecked")
+    Map<String, String> subscription =
+        (Map<String, String>) project.getMetadata().get("subscription");
+    if (subscription == null) {
+      subscription = new HashMap<>();
+    }
+
+    if (email != null && !email.isEmpty()) {
+      subscription.put(user.getUserId(), email);
+    }
+
+    project.getMetadata().put("subscription", subscription);
+    updateProjectNotifications(project, server.getProjectManager());
+    server.getProjectManager().updateProjectSetting(project);
+  }
+
+  public static void unSubscribeProject(final AzkabanWebServer server,
+      final Project project, final User user) throws ProjectManagerException {
+    @SuppressWarnings("unchecked") final Map<String, String> subscription =
+        (Map<String, String>) project.getMetadata().get("subscription");
+    if (subscription == null) {
+      return;
+    }
+    subscription.remove(user.getUserId());
+    project.getMetadata().put("subscription", subscription);
+    updateProjectNotifications(project, server.getProjectManager());
+    server.getProjectManager().updateProjectSetting(project);
+  }
+
+  public static boolean isSubscribeProject(final Project project, final User user) {
+    @SuppressWarnings("unchecked") final Map<String, String> subscription =
+        (Map<String, String>) project.getMetadata().get("subscription");
+    if (subscription == null) {
+      return false;
+    }
+    return subscription.containsKey(user.getUserId());
+  }
+
+  /**
+   * Updates the email notifications saved in the project's flow.
+   */
+  public static void updateProjectNotifications(final Project project,
+      final ProjectManager pm) throws ProjectManagerException {
+    final Flow flow = project.getFlows().get(0);
+
+    // Get all success emails.
+    final ArrayList<String> successEmails = new ArrayList<>();
+    final String successNotifications =
+        (String) project.getMetadata().get("notifications");
+    final String[] successEmailSplit =
+        successNotifications.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+    successEmails.addAll(Arrays.asList(successEmailSplit));
+
+    // Get all failure emails.
+    final ArrayList<String> failureEmails = new ArrayList<>();
+    final String failureNotifications =
+        (String) project.getMetadata().get("failureNotifications");
+    final String[] failureEmailSplit =
+        failureNotifications.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+    failureEmails.addAll(Arrays.asList(failureEmailSplit));
+
+    // Add subscription emails to success emails list.
+    @SuppressWarnings("unchecked") final Map<String, String> subscription =
+        (Map<String, String>) project.getMetadata().get("subscription");
+    if (subscription != null) {
+      successEmails.addAll(subscription.values());
+    }
+
+    final ArrayList<String> successEmailList = new ArrayList<>();
+    for (final String email : successEmails) {
+      if (!email.trim().isEmpty()) {
+        successEmailList.add(email);
+      }
+    }
+
+    final ArrayList<String> failureEmailList = new ArrayList<>();
+    for (final String email : failureEmails) {
+      if (!email.trim().isEmpty()) {
+        failureEmailList.add(email);
+      }
+    }
+
+    // Save notifications in the flow.
+    flow.getSuccessEmails().clear();
+    flow.getFailureEmails().clear();
+    flow.addSuccessEmails(successEmailList);
+    flow.addFailureEmails(failureEmailList);
+    pm.updateFlow(project, flow);
+  }
+
+  public static boolean isScheduledProject(final Project project) {
+    final Object schedule = project.getMetadata().get("schedule");
+    if (schedule == null || !(schedule instanceof Boolean)) {
+      return false;
+    }
+    return (boolean) (Boolean) schedule;
+  }
+
+  public static boolean isScheduledRepeatingProject(final Project project) {
+    final Object schedule = project.getMetadata().get("scheduleRepeat");
+    if (schedule == null || !(schedule instanceof Boolean)) {
+      return false;
+    }
+    return (boolean) (Boolean) schedule;
+  }
+
+  public static List<Project> getUserReportalProjects(final AzkabanWebServer server,
+      final String userName) throws ProjectManagerException {
+    final ProjectManager projectManager = server.getProjectManager();
+    final List<Project> projects = projectManager.getProjects();
+    final List<Project> result = new ArrayList<>();
+
+    for (final Project project : projects) {
+      if (userName.equals(project.getMetadata().get("reportal-user"))) {
+        result.add(project);
+      }
+    }
+
+    return result;
+  }
+
+  public static Project createReportalProject(final AzkabanWebServer server,
+      final String title, final String description, final User user)
+      throws ProjectManagerException {
+    final ProjectManager projectManager = server.getProjectManager();
+    final String projectName =
+        "reportal-" + user.getUserId() + "-" + sanitizeText(title);
+    Project project = projectManager.getProject(projectName);
+    if (project != null) {
+      return null;
+    }
+    project = projectManager.createProject(projectName, description, user);
+
+    return project;
+  }
+
+  public static String sanitizeText(final String text) {
+    return text.replaceAll("[^A-Za-z0-9]", "-");
+  }
+
+  public static File findAvailableFileName(final File parent, String name,
+      final String extension) {
+    if (name.isEmpty()) {
+      name = "untitled";
+    }
+    File file = new File(parent, name + extension);
+    int i = 1;
+    while (file.exists()) {
+      file = new File(parent, name + "-" + i + extension);
+      i++;
+    }
+    return file;
+  }
+
+  public static String prepareStringForJS(final Object object) {
+    return object.toString().replace("\r", "").replace("\n", "\\n");
+  }
+
+  public static String[] filterCSVFile(final String[] files) {
+    final List<String> result = new ArrayList<>();
+    for (int i = 0; i < files.length; i++) {
+      if (StringUtils.endsWithIgnoreCase(files[i], ".csv")) {
+        result.add(files[i]);
+      }
+    }
+    return result.toArray(new String[result.size()]);
+  }
+
+  /**
+   * Given a string containing multiple emails, splits it based on the given
+   * regular expression, and returns a set containing the unique, non-empty
+   * emails.
+   */
+  public static Set<String> parseUniqueEmails(final String emailList,
+      final String splitRegex) {
+    final Set<String> uniqueEmails = new HashSet<>();
+
+    if (emailList == null) {
+      return uniqueEmails;
+    }
+
+    final String[] emails = emailList.trim().split(splitRegex);
+    for (final String email : emails) {
+      if (!email.isEmpty()) {
+        uniqueEmails.add(email);
+      }
+    }
+
+    return uniqueEmails;
+  }
+
+  /**
+   * Returns true if the given email is valid and false otherwise.
+   */
+  public static boolean isValidEmailAddress(final String email) {
+    if (email == null) {
+      return false;
+    }
+
+    boolean result = true;
+    try {
+      final InternetAddress emailAddr = new InternetAddress(email);
+      emailAddr.validate();
+    } catch (final AddressException ex) {
+      result = false;
+    }
+    return result;
+  }
+
+  /**
+   * Given an email string, returns the domain part if it exists, and null
+   * otherwise.
+   */
+  public static String getEmailDomain(final String email) {
+    if (email == null || email.isEmpty()) {
+      return null;
+    }
+
+    final int atSignIndex = email.indexOf('@');
+    if (atSignIndex != -1) {
+      return email.substring(atSignIndex + 1);
+    }
+
+    return null;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalRunnerException.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalRunnerException.java
new file mode 100644
index 0000000..0fa4cb8
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalRunnerException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+public class ReportalRunnerException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public ReportalRunnerException(final String s) {
+    super(s);
+  }
+
+  public ReportalRunnerException(final Exception e) {
+    super(e);
+  }
+
+  public ReportalRunnerException(final String s, final Exception e) {
+    super(s, e);
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
new file mode 100644
index 0000000..104616a
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.reportal.util.Reportal.Variable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReportalUtil {
+
+  public static IStreamProvider getStreamProvider(final String fileSystem) {
+    if (fileSystem.equalsIgnoreCase("hdfs")) {
+      return new StreamProviderHDFS();
+    }
+    return new StreamProviderLocal();
+  }
+
+  /**
+   * Returns a list of the executable nodes in the specified flow in execution
+   * order. Assumes that the flow is linear.
+   *
+   * @param nodes
+   * @return
+   */
+  public static List<ExecutableNode> sortExecutableNodes(final ExecutableFlow flow) {
+    final List<ExecutableNode> sortedNodes = new ArrayList<>();
+
+    if (flow != null) {
+      final List<String> startNodeIds = flow.getStartNodes();
+
+      String nextNodeId = startNodeIds.isEmpty() ? null : startNodeIds.get(0);
+
+      while (nextNodeId != null) {
+        final ExecutableNode node = flow.getExecutableNode(nextNodeId);
+        sortedNodes.add(node);
+
+        final Set<String> outNodes = node.getOutNodes();
+        nextNodeId = outNodes.isEmpty() ? null : outNodes.iterator().next();
+      }
+    }
+
+    return sortedNodes;
+  }
+
+  /**
+   * Get runtime variables to be set in unscheduled mode of execution.
+   * Returns empty list, if no runtime variable is found
+   *
+   * @param variables
+   * @return
+   */
+  public static List<Variable> getRunTimeVariables(
+      final Collection<Variable> variables) {
+    final List<Variable> runtimeVariables =
+      ReportalUtil.getVariablesByRegex(variables,
+        Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);
+
+    return runtimeVariables;
+  }
+
+  /**
+   * Shortlist variables which match a given regex. Returns empty empty list, if no
+   * eligible variable is found
+   *
+   * @param variables
+   * @param regex
+   * @return
+   */
+  public static List<Variable> getVariablesByRegex(
+      final Collection<Variable> variables, final String regex) {
+    final List<Variable> shortlistedVariables = new ArrayList<>();
+    if (variables != null && regex != null) {
+      for (final Variable var : variables) {
+        if (var.getTitle().matches(regex)) {
+          shortlistedVariables.add(var);
+        }
+      }
+    }
+    return shortlistedVariables;
+  }
+
+  /**
+   * Shortlist variables which match a given prefix. Returns empty map, if no
+   * eligible variable is found.
+   *
+   * @param variables
+   *          variables to be processed
+   * @param prefix
+   *          prefix to be matched
+   * @return a map with shortlisted variables and prefix removed
+   */
+  public static Map<String, String> getVariableMapByPrefix(
+      final Collection<Variable> variables, final String prefix) {
+    final Map<String, String> shortlistMap = new HashMap<>();
+    if (variables!=null && prefix != null) {
+      for (final Variable var : getVariablesByRegex(variables,
+        Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
+        shortlistMap
+          .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
+      }
+    }
+    return shortlistMap;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderHDFS.java b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderHDFS.java
new file mode 100644
index 0000000..776e4ce
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderHDFS.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.security.commons.HadoopSecurityManagerException;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class StreamProviderHDFS implements IStreamProvider {
+
+  FileSystem hdfs;
+  HadoopSecurityManager securityManager;
+  String username;
+
+  public void setHadoopSecurityManager(final HadoopSecurityManager securityManager) {
+    this.securityManager = securityManager;
+  }
+
+  @Override
+  public void setUser(final String user) {
+    this.username = user;
+  }
+
+  @Override
+  public String[] getFileList(final String pathString)
+      throws HadoopSecurityManagerException, IOException {
+    final FileStatus[] statusList = getFileStatusList(pathString);
+    final String[] fileList = new String[statusList.length];
+
+    for (int i = 0; i < statusList.length; i++) {
+      fileList[i] = statusList[i].getPath().getName();
+    }
+
+    return fileList;
+  }
+
+  @Override
+  public String[] getOldFiles(final String pathString, final long thresholdTime)
+      throws Exception {
+    final FileStatus[] statusList = getFileStatusList(pathString);
+
+    final List<String> oldFiles = new ArrayList<>();
+
+    for (final FileStatus fs : statusList) {
+      if (fs.getModificationTime() < thresholdTime) {
+        oldFiles.add(fs.getPath().getName());
+      }
+    }
+
+    return oldFiles.toArray(new String[0]);
+  }
+
+  @Override
+  public void deleteFile(final String pathString) throws Exception {
+    ensureHdfs();
+
+    try {
+      this.hdfs.delete(new Path(pathString), true);
+    } catch (final IOException e) {
+      cleanUp();
+    }
+  }
+
+  @Override
+  public InputStream getFileInputStream(final String pathString) throws Exception {
+    ensureHdfs();
+
+    final Path path = new Path(pathString);
+
+    return new BufferedInputStream(this.hdfs.open(path));
+  }
+
+  @Override
+  public OutputStream getFileOutputStream(final String pathString) throws Exception {
+    ensureHdfs();
+
+    final Path path = new Path(pathString);
+
+    return new BufferedOutputStream(this.hdfs.create(path, true));
+  }
+
+  @Override
+  public void cleanUp() throws IOException {
+    if (this.hdfs != null) {
+      this.hdfs.close();
+      this.hdfs = null;
+    }
+  }
+
+  private void ensureHdfs() throws HadoopSecurityManagerException, IOException {
+    if (this.hdfs == null) {
+      if (this.securityManager == null) {
+        this.hdfs = FileSystem.get(new Configuration());
+      } else {
+        this.hdfs = this.securityManager.getFSAsUser(this.username);
+      }
+    }
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    cleanUp();
+    super.finalize();
+  }
+
+  /**
+   * Returns an array of the file statuses of the files/directories in the given
+   * path if it is a directory and an empty array otherwise.
+   */
+  private FileStatus[] getFileStatusList(final String pathString)
+      throws HadoopSecurityManagerException, IOException {
+    ensureHdfs();
+
+    final Path path = new Path(pathString);
+    FileStatus pathStatus = null;
+    try {
+      pathStatus = this.hdfs.getFileStatus(path);
+    } catch (final IOException e) {
+      cleanUp();
+    }
+
+    if (pathStatus != null && pathStatus.isDir()) {
+      return this.hdfs.listStatus(path);
+    }
+
+    return new FileStatus[0];
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderLocal.java b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderLocal.java
new file mode 100644
index 0000000..5836b3d
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderLocal.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.commons.io.FileUtils;
+
+public class StreamProviderLocal implements IStreamProvider {
+
+  @Override
+  public void setUser(final String user) {
+  }
+
+  @Override
+  public String[] getFileList(final String pathString) throws IOException {
+    final File file = new File(pathString);
+
+    if (file.exists() && file.isDirectory()) {
+      return file.list();
+    }
+
+    return new String[0];
+  }
+
+  @Override
+  public String[] getOldFiles(final String pathString, final long thresholdTime)
+      throws Exception {
+    final File file = new File(pathString);
+
+    if (!file.exists() || !file.isDirectory()) {
+      return new String[0];
+    }
+
+    final File[] fileList = file.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(final File file) {
+        return file.lastModified() < thresholdTime;
+      }
+    });
+
+    final String[] files = new String[fileList.length];
+    for (int i = 0; i < fileList.length; i++) {
+      files[i] = fileList[i].getName();
+    }
+
+    return files;
+  }
+
+  @Override
+  public void deleteFile(final String pathString) throws Exception {
+    FileUtils.deleteDirectory(new File(pathString));
+  }
+
+  @Override
+  public InputStream getFileInputStream(final String pathString) throws IOException {
+
+    final File inputFile = new File(pathString);
+
+    inputFile.getParentFile().mkdirs();
+    inputFile.createNewFile();
+
+    return new BufferedInputStream(new FileInputStream(inputFile));
+  }
+
+  @Override
+  public OutputStream getFileOutputStream(final String pathString) throws IOException {
+
+    final File outputFile = new File(pathString);
+
+    outputFile.getParentFile().mkdirs();
+    outputFile.createNewFile();
+
+    return new BufferedOutputStream(new FileOutputStream(outputFile));
+  }
+
+  @Override
+  public void cleanUp() throws IOException {
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
new file mode 100644
index 0000000..fb09405
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringEscapeUtils;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
+import azkaban.project.Project;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.ReportalHelper;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.reportal.util.StreamProviderHDFS;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.EmailMessage;
+import azkaban.webapp.AzkabanWebServer;
+
+public class ReportalMailCreator implements MailCreator {
+  public static AzkabanWebServer azkaban = null;
+  public static HadoopSecurityManager hadoopSecurityManager = null;
+  public static String outputLocation = "";
+  public static String outputFileSystem = "";
+  public static String reportalStorageUser = "";
+  public static File reportalMailTempDirectory;
+  public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
+  public static final int NUM_PREVIEW_ROWS = 50;
+  //Attachment that equal or larger than 10MB will be skipped in the email
+  public static final long MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024L;
+
+  static {
+    DefaultMailCreator.registerCreator(REPORTAL_MAIL_CREATOR,
+        new ReportalMailCreator());
+  }
+
+  @Override
+  public boolean createFirstErrorMessage(ExecutableFlow flow,
+      EmailMessage message, String azkabanName, String scheme,
+      String clientHostname, String clientPortNumber, String... vars) {
+
+    ExecutionOptions option = flow.getExecutionOptions();
+    Set<String> emailList = new HashSet<String>(option.getFailureEmails());
+
+    return createEmail(flow, emailList, message, "Failure", azkabanName,
+        scheme, clientHostname, clientPortNumber, false);
+  }
+
+  @Override
+  public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message,
+      String azkabanName, String scheme, String clientHostname,
+      String clientPortNumber, String... vars) {
+
+    ExecutionOptions option = flow.getExecutionOptions();
+    Set<String> emailList = new HashSet<String>(option.getFailureEmails());
+
+    return createEmail(flow, emailList, message, "Failure", azkabanName,
+        scheme, clientHostname, clientPortNumber, false);
+  }
+
+  @Override
+  public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
+      String azkabanName, String scheme, String clientHostname,
+      String clientPortNumber, String... vars) {
+
+    ExecutionOptions option = flow.getExecutionOptions();
+    Set<String> emailList = new HashSet<String>(option.getSuccessEmails());
+
+    return createEmail(flow, emailList, message, "Success", azkabanName,
+        scheme, clientHostname, clientPortNumber, false);
+  }
+
+  private boolean createEmail(ExecutableFlow flow, Set<String> emailList,
+      EmailMessage message, String status, String azkabanName, String scheme,
+      String clientHostname, String clientPortNumber, boolean printData) {
+
+    Project project =
+        azkaban.getProjectManager().getProject(flow.getProjectId());
+
+    if (emailList != null && !emailList.isEmpty()) {
+      message.addAllToAddress(emailList);
+      message.setMimeType("text/html");
+      message.setSubject("Report " + status + ": "
+          + project.getMetadata().get("title"));
+      String urlPrefix =
+          scheme + "://" + clientHostname + ":" + clientPortNumber
+              + "/reportal";
+      try {
+        return createMessage(project, flow, message, urlPrefix, printData);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    return false;
+  }
+
+  private boolean createMessage(Project project, ExecutableFlow flow,
+      EmailMessage message, String urlPrefix, boolean printData)
+      throws Exception {
+
+    // set mail content type to be "multipart/mixed" as we are customizing the main content.
+    // failed to to this may result in trouble accessing attachment when mail is viewed from IOS mail app.
+    message.enableAttachementEmbedment(false);
+
+    message.println("<html>");
+    message.println("<head></head>");
+    message
+        .println("<body style='font-family: verdana; color: #000000; background-color: #cccccc; padding: 20px;'>");
+    message
+        .println("<div style='background-color: #ffffff; border: 1px solid #aaaaaa; padding: 20px;-webkit-border-radius: 15px; -moz-border-radius: 15px; border-radius: 15px;'>");
+    // Title
+    message.println("<b>" + project.getMetadata().get("title") + "</b>");
+    message
+        .println("<div style='font-size: .8em; margin-top: .5em; margin-bottom: .5em;'>");
+    // Status
+    message.println(flow.getStatus().name());
+    // Link to View
+    message.println("(<a href='" + urlPrefix + "?view&id="
+        + flow.getProjectId() + "'>View</a>)");
+    // Link to logs
+    message.println("(<a href='" + urlPrefix + "?view&logs&id="
+        + flow.getProjectId() + "&execid=" + flow.getExecutionId()
+        + "'>Logs</a>)");
+    // Link to Data
+    message.println("(<a href='" + urlPrefix + "?view&id="
+        + flow.getProjectId() + "&execid=" + flow.getExecutionId()
+        + "'>Result data</a>)");
+    // Link to Edit
+    message.println("(<a href='" + urlPrefix + "?edit&id="
+        + flow.getProjectId() + "'>Edit</a>)");
+    message.println("</div>");
+    message.println("<div style='margin-top: .5em; margin-bottom: .5em;'>");
+    // Description
+    message.println(project.getDescription());
+    message.println("</div>");
+
+    // Print variable values, if any
+    Map<String, String> flowParameters =
+        flow.getExecutionOptions().getFlowParameters();
+    int i = 0;
+    while (flowParameters.containsKey("reportal.variable." + i + ".from")) {
+      if (i == 0) {
+        message
+            .println("<div style='margin-top: 10px; margin-bottom: 10px; border-bottom: 1px solid #ccc; padding-bottom: 5px; font-weight: bold;'>");
+        message.println("Variables");
+        message.println("</div>");
+        message
+            .println("<table border='1' cellspacing='0' cellpadding='2' style='font-size: 14px;'>");
+        message
+            .println("<thead><tr><th><b>Name</b></th><th><b>Value</b></th></tr></thead>");
+        message.println("<tbody>");
+      }
+
+      message.println("<tr>");
+      message.println("<td>"
+          + flowParameters.get("reportal.variable." + i + ".from") + "</td>");
+      message.println("<td>"
+          + flowParameters.get("reportal.variable." + i + ".to") + "</td>");
+      message.println("</tr>");
+
+      i++;
+    }
+
+    if (i > 0) { // at least one variable
+      message.println("</tbody>");
+      message.println("</table>");
+    }
+
+    long totalFileSize = 0;
+    if (printData) {
+      String locationFull =
+          (outputLocation + "/" + flow.getExecutionId()).replace("//", "/");
+
+      IStreamProvider streamProvider =
+          ReportalUtil.getStreamProvider(outputFileSystem);
+
+      if (streamProvider instanceof StreamProviderHDFS) {
+        StreamProviderHDFS hdfsStreamProvider =
+            (StreamProviderHDFS) streamProvider;
+        hdfsStreamProvider.setHadoopSecurityManager(hadoopSecurityManager);
+        hdfsStreamProvider.setUser(reportalStorageUser);
+      }
+
+      // Get file list
+      String[] fileList =
+          ReportalHelper
+              .filterCSVFile(streamProvider.getFileList(locationFull));
+
+      // Sort files in execution order.
+      // File names are in the format {EXECUTION_ORDER}-{QUERY_TITLE}.csv
+      // E.g.: 1-queryTitle.csv
+      Arrays.sort(fileList, new Comparator<String>() {
+
+        @Override
+        public int compare(String a, String b) {
+          Integer aExecutionOrder =
+              Integer.parseInt(a.substring(0, a.indexOf('-')));
+          Integer bExecutionOrder =
+              Integer.parseInt(b.substring(0, b.indexOf('-')));
+          return aExecutionOrder.compareTo(bExecutionOrder);
+        }
+      });
+
+      // Get jobs in execution order
+      List<ExecutableNode> jobs = ReportalUtil.sortExecutableNodes(flow);
+
+      File tempFolder =
+          new File(reportalMailTempDirectory + "/" + flow.getExecutionId());
+      tempFolder.mkdirs();
+
+      // Copy output files from HDFS to local disk, so you can send them as
+      // email attachments
+      for (String file : fileList) {
+        String filePath = locationFull + "/" + file;
+        InputStream csvInputStream = null;
+        OutputStream tempOutputStream = null;
+        File tempOutputFile = new File(tempFolder, file);
+        tempOutputFile.createNewFile();
+        try {
+          csvInputStream = streamProvider.getFileInputStream(filePath);
+          tempOutputStream =
+              new BufferedOutputStream(new FileOutputStream(tempOutputFile));
+
+          IOUtils.copy(csvInputStream, tempOutputStream);
+        } finally {
+          IOUtils.closeQuietly(tempOutputStream);
+          IOUtils.closeQuietly(csvInputStream);
+        }
+      }
+
+      try {
+        streamProvider.cleanUp();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+
+      boolean emptyResults = true;
+
+      String htmlResults =
+          flowParameters.get("reportal.render.results.as.html");
+      boolean renderResultsAsHtml =
+          htmlResults != null && htmlResults.trim().equalsIgnoreCase("true");
+
+      for (i = 0; i < fileList.length; i++) {
+        String file = fileList[i];
+        ExecutableNode job = jobs.get(i);
+        job.getAttempt();
+
+        message
+            .println("<div style='margin-top: 10px; margin-bottom: 10px; border-bottom: 1px solid #ccc; padding-bottom: 5px; font-weight: bold;'>");
+        message.println(file);
+        message.println("</div>");
+        message.println("<div>");
+        message
+            .println("<table border='1' cellspacing='0' cellpadding='2' style='font-size: 14px;'>");
+        File tempOutputFile = new File(tempFolder, file);
+        InputStream csvInputStream = null;
+        try {
+          csvInputStream =
+              new BufferedInputStream(new FileInputStream(tempOutputFile));
+          Scanner rowScanner = new Scanner(csvInputStream, StandardCharsets.UTF_8.toString());
+          int lineNumber = 0;
+          while (rowScanner.hasNextLine() && lineNumber <= NUM_PREVIEW_ROWS) {
+            // For Hive jobs, the first line is the column names, so we ignore
+            // it
+            // when deciding whether the output is empty or not
+            if (!job.getType().equals(ReportalType.HiveJob.getJobTypeName())
+                || lineNumber > 0) {
+              emptyResults = false;
+            }
+
+            String csvLine = rowScanner.nextLine();
+            String[] data = csvLine.split("\",\"");
+            message.println("<tr>");
+            for (String item : data) {
+              String column = item.replace("\"", "");
+              if (!renderResultsAsHtml) {
+                column = StringEscapeUtils.escapeHtml(column);
+              }
+              message.println("<td>" + column + "</td>");
+            }
+            message.println("</tr>");
+            if (lineNumber == NUM_PREVIEW_ROWS && rowScanner.hasNextLine()) {
+              message.println("<tr>");
+              message.println("<td colspan=\"" + data.length + "\">...</td>");
+              message.println("</tr>");
+            }
+            lineNumber++;
+          }
+          rowScanner.close();
+          message.println("</table>");
+          message.println("</div>");
+        } finally {
+          IOUtils.closeQuietly(csvInputStream);
+        }
+        totalFileSize += tempOutputFile.length();
+      }
+
+      if (totalFileSize < MAX_ATTACHMENT_SIZE) {
+        for (i = 0; i < fileList.length; i++) {
+            String file = fileList[i];
+            File tempOutputFile = new File(tempFolder, file);
+            message.addAttachment(file, tempOutputFile);
+        }
+      }
+
+      // Don't send an email if there are no results, unless this is an
+      // unscheduled run.
+      String unscheduledRun = flowParameters.get("reportal.unscheduled.run");
+      boolean isUnscheduledRun =
+          unscheduledRun != null
+              && unscheduledRun.trim().equalsIgnoreCase("true");
+      if (emptyResults && !isUnscheduledRun) {
+        return false;
+      }
+    }
+
+    message.println("</div>");
+    if (totalFileSize >= MAX_ATTACHMENT_SIZE){
+      message.println("<tr>The total size of the reports (" + totalFileSize/1024/1024 + "MB) is bigger than the allowed maximum size of " +
+              MAX_ATTACHMENT_SIZE/1024/1024 + "MB. " +
+                  "It is too big to be attached in this message. Please use the link above titled Result Data to download the reports</tr>");
+    }
+    message.println("</body>").println("</html>");
+
+    return true;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
new file mode 100644
index 0000000..bdef3b2
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
@@ -0,0 +1,1350 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.Reportal;
+import azkaban.reportal.util.Reportal.Query;
+import azkaban.reportal.util.Reportal.Variable;
+import azkaban.reportal.util.ReportalHelper;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.reportal.util.StreamProviderHDFS;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.server.session.Session;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
+import azkaban.webapp.servlet.Page;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.log4j.Logger;
+import org.apache.velocity.tools.generic.EscapeTool;
+import org.joda.time.DateTime;
+
+public class ReportalServlet extends LoginAbstractAzkabanServlet {
+
+  private static final String REPORTAL_VARIABLE_PREFIX = "reportal.variable.";
+  private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
+      "hadoop.security.manager.class";
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = Logger.getLogger(ReportalServlet.class);
+  private final File reportalMailTempDirectory;
+  private final Props props;
+  private final String viewerName;
+  private final String reportalStorageUser;
+  private final File webResourcesFolder;
+  private final int max_allowed_schedule_dates;
+  private final int default_schedule_dates;
+  private final boolean showNav;
+  private CleanerThread cleanerThread;
+  /**
+   * A whitelist of allowed email domains (e.g.: example.com). If null, all
+   * email domains are allowed.
+   */
+  private Set<String> allowedEmailDomains = null;
+  private AzkabanWebServer server;
+  private boolean shouldProxy;
+  private int itemsPerPage = 20;
+  private HadoopSecurityManager hadoopSecurityManager;
+
+  public ReportalServlet(final Props props) {
+    this.props = props;
+
+    this.viewerName = props.getString("viewer.name");
+    this.reportalStorageUser = props.getString("reportal.storage.user", "reportal");
+    this.itemsPerPage = props.getInt("reportal.items_per_page", 20);
+    this.showNav = props.getBoolean("reportal.show.navigation", false);
+
+    this.max_allowed_schedule_dates = props.getInt("reportal.max.allowed.schedule.dates", 180);
+    this.default_schedule_dates = props.getInt("reportal.default.schedule.dates", 30);
+
+    this.reportalMailTempDirectory =
+        new File(props.getString("reportal.mail.temp.dir", "/tmp/reportal"));
+    this.reportalMailTempDirectory.mkdirs();
+    ReportalMailCreator.reportalMailTempDirectory = this.reportalMailTempDirectory;
+
+    final List<String> allowedDomains =
+        props.getStringList("reportal.allowed.email.domains",
+            (List<String>) null);
+    if (allowedDomains != null) {
+      this.allowedEmailDomains = new HashSet<>(allowedDomains);
+    }
+
+    ReportalMailCreator.outputLocation =
+        props.getString("reportal.output.dir", "/tmp/reportal");
+    ReportalMailCreator.outputFileSystem =
+        props.getString("reportal.output.filesystem", "local");
+    ReportalMailCreator.reportalStorageUser = this.reportalStorageUser;
+
+    this.webResourcesFolder =
+        new File(new File(props.getSource()).getParentFile().getParentFile(),
+            "web");
+    this.webResourcesFolder.mkdirs();
+    setResourceDirectory(this.webResourcesFolder);
+    System.out.println("Reportal web resources: "
+        + this.webResourcesFolder.getAbsolutePath());
+  }
+
+  @Override
+  public void init(final ServletConfig config) throws ServletException {
+    super.init(config);
+    this.server = (AzkabanWebServer) getApplication();
+    ReportalMailCreator.azkaban = this.server;
+
+    this.shouldProxy = this.props.getBoolean("azkaban.should.proxy", false);
+    logger.info("Hdfs browser should proxy: " + this.shouldProxy);
+    try {
+      this.hadoopSecurityManager = loadHadoopSecurityManager(this.props, logger);
+      ReportalMailCreator.hadoopSecurityManager = this.hadoopSecurityManager;
+    } catch (final RuntimeException e) {
+      e.printStackTrace();
+      throw new RuntimeException("Failed to get hadoop security manager!"
+          + e.getCause());
+    }
+
+    this.cleanerThread = new CleanerThread();
+    this.cleanerThread.start();
+  }
+
+  private HadoopSecurityManager loadHadoopSecurityManager(final Props props,
+      final Logger logger) throws RuntimeException {
+
+    final Class<?> hadoopSecurityManagerClass =
+        props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
+            ReportalServlet.class.getClassLoader());
+    logger.info("Initializing hadoop security manager "
+        + hadoopSecurityManagerClass.getName());
+    HadoopSecurityManager hadoopSecurityManager = null;
+
+    try {
+      final Method getInstanceMethod =
+          hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
+      hadoopSecurityManager =
+          (HadoopSecurityManager) getInstanceMethod.invoke(
+              hadoopSecurityManagerClass, props);
+    } catch (final InvocationTargetException e) {
+      logger.error("Could not instantiate Hadoop Security Manager "
+          + hadoopSecurityManagerClass.getName() + e.getCause());
+      throw new RuntimeException(e.getCause());
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e.getCause());
+    }
+
+    return hadoopSecurityManager;
+  }
+
+  @Override
+  protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
+      final Session session) throws ServletException, IOException {
+    if (hasParam(req, "ajax")) {
+      handleAJAXAction(req, resp, session);
+    } else {
+      if (hasParam(req, "view")) {
+        try {
+          handleViewReportal(req, resp, session);
+        } catch (final Exception e) {
+          e.printStackTrace();
+        }
+      } else if (hasParam(req, "new")) {
+        handleNewReportal(req, resp, session);
+      } else if (hasParam(req, "edit")) {
+        handleEditReportal(req, resp, session);
+      } else if (hasParam(req, "run")) {
+        handleRunReportal(req, resp, session);
+      } else {
+        handleListReportal(req, resp, session);
+      }
+    }
+  }
+
+  private void handleAJAXAction(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+    final HashMap<String, Object> ret = new HashMap<>();
+    final String ajaxName = getParam(req, "ajax");
+    final User user = session.getUser();
+    final int id = getIntParam(req, "id");
+    final ProjectManager projectManager = this.server.getProjectManager();
+    final Project project = projectManager.getProject(id);
+    final Reportal reportal = Reportal.loadFromProject(project);
+
+    // Delete report
+    if (ajaxName.equals("delete")) {
+      if (!project.hasPermission(user, Type.ADMIN)) {
+        ret.put("error", "You do not have permissions to delete this reportal.");
+      } else {
+        try {
+          final ScheduleManager scheduleManager = this.server.getScheduleManager();
+          reportal.removeSchedules(scheduleManager);
+          projectManager.removeProject(project, user);
+        } catch (final Exception e) {
+          e.printStackTrace();
+          ret.put("error", "An exception occured while deleting this reportal.");
+        }
+        ret.put("result", "success");
+      }
+    }
+    // Bookmark report
+    else if (ajaxName.equals("bookmark")) {
+      final boolean wasBookmarked = ReportalHelper.isBookmarkProject(project, user);
+      try {
+        if (wasBookmarked) {
+          ReportalHelper.unBookmarkProject(this.server, project, user);
+          ret.put("result", "success");
+          ret.put("bookmark", false);
+        } else {
+          ReportalHelper.bookmarkProject(this.server, project, user);
+          ret.put("result", "success");
+          ret.put("bookmark", true);
+        }
+      } catch (final ProjectManagerException e) {
+        e.printStackTrace();
+        ret.put("error", "Error bookmarking reportal. " + e.getMessage());
+      }
+    }
+    // Subscribe to report
+    else if (ajaxName.equals("subscribe")) {
+      final boolean wasSubscribed = ReportalHelper.isSubscribeProject(project, user);
+      if (!wasSubscribed && reportal.getAccessViewers().size() > 0
+          && !hasPermission(project, user, Type.READ)) {
+        ret.put("error", "You do not have permissions to view this reportal.");
+      } else {
+        try {
+          if (wasSubscribed) {
+            ReportalHelper.unSubscribeProject(this.server, project, user);
+            ret.put("result", "success");
+            ret.put("subscribe", false);
+          } else {
+            ReportalHelper.subscribeProject(this.server, project, user,
+                user.getEmail());
+            ret.put("result", "success");
+            ret.put("subscribe", true);
+          }
+        } catch (final ProjectManagerException e) {
+          e.printStackTrace();
+          ret.put("error", "Error subscribing to reportal. " + e.getMessage());
+        }
+      }
+    }
+    // Get a portion of logs
+    else if (ajaxName.equals("log")) {
+      final int execId = getIntParam(req, "execId");
+      final String jobId = getParam(req, "jobId");
+      final int offset = getIntParam(req, "offset");
+      final int length = getIntParam(req, "length");
+      final ExecutableFlow exec;
+      final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+      try {
+        exec = executorManager.getExecutableFlow(execId);
+      } catch (final Exception e) {
+        ret.put("error", "Log does not exist or isn't created yet.");
+        return;
+      }
+
+      final LogData data;
+      try {
+        data =
+            executorManager.getExecutionJobLog(exec, jobId, offset, length,
+                exec.getExecutableNode(jobId).getAttempt());
+      } catch (final Exception e) {
+        e.printStackTrace();
+        ret.put("error", "Log does not exist or isn't created yet.");
+        return;
+      }
+      if (data != null) {
+        ret.put("result", "success");
+        ret.put("log", data.getData());
+        ret.put("offset", data.getOffset());
+        ret.put("length", data.getLength());
+        ret.put("completed", exec.getEndTime() != -1);
+      } else {
+        // Return an empty result to indicate the end
+        ret.put("result", "success");
+        ret.put("log", "");
+        ret.put("offset", offset);
+        ret.put("length", 0);
+        ret.put("completed", exec.getEndTime() != -1);
+      }
+    }
+
+    if (ret != null) {
+      this.writeJSON(resp, ret);
+    }
+  }
+
+  private void handleListReportal(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+
+    final Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/reportal/reportallistpage.vm");
+    preparePage(page, session);
+
+    final List<Project> projects = ReportalHelper.getReportalProjects(this.server);
+    page.add("ReportalHelper", ReportalHelper.class);
+    page.add("user", session.getUser());
+
+    final String startDate = DateTime.now().minusWeeks(1).toString("yyyy-MM-dd");
+    final String endDate = DateTime.now().toString("yyyy-MM-dd");
+    page.add("startDate", startDate);
+    page.add("endDate", endDate);
+
+    if (!projects.isEmpty()) {
+      page.add("projects", projects);
+    } else {
+      page.add("projects", false);
+    }
+
+    page.render();
+  }
+
+  private void handleViewReportal(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      Exception {
+    final int id = getIntParam(req, "id");
+    final Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/reportal/reportaldatapage.vm");
+    preparePage(page, session);
+
+    final ProjectManager projectManager = this.server.getProjectManager();
+    final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+
+    final Project project = projectManager.getProject(id);
+    final Reportal reportal = Reportal.loadFromProject(project);
+
+    if (reportal == null) {
+      page.add("errorMsg", "Report not found.");
+      page.render();
+      return;
+    }
+
+    if (reportal.getAccessViewers().size() > 0
+        && !hasPermission(project, session.getUser(), Type.READ)) {
+      page.add("errorMsg", "You are not allowed to view this report.");
+      page.render();
+      return;
+    }
+
+    page.add("project", project);
+    page.add("title", project.getMetadata().get("title"));
+
+    if (hasParam(req, "execid")) {
+      final int execId = getIntParam(req, "execid");
+      page.add("execid", execId);
+      // Show logs
+      if (hasParam(req, "logs")) {
+        final ExecutableFlow exec;
+        try {
+          exec = executorManager.getExecutableFlow(execId);
+        } catch (final ExecutorManagerException e) {
+          e.printStackTrace();
+          page.add("errorMsg", "ExecutableFlow not found. " + e.getMessage());
+          page.render();
+          return;
+        }
+        // View single log
+        if (hasParam(req, "log")) {
+          page.add("view-log", true);
+          final String jobId = getParam(req, "log");
+          page.add("execid", execId);
+          page.add("jobId", jobId);
+        }
+        // List files
+        else {
+          page.add("view-logs", true);
+          final List<ExecutableNode> jobLogs = ReportalUtil.sortExecutableNodes(exec);
+
+          final boolean showDataCollector = hasParam(req, "debug");
+          if (!showDataCollector) {
+            jobLogs.remove(jobLogs.size() - 1);
+          }
+
+          if (jobLogs.size() == 1) {
+            resp.sendRedirect("/reportal?view&logs&id=" + project.getId()
+                + "&execid=" + execId + "&log=" + jobLogs.get(0).getId());
+          }
+          page.add("logs", jobLogs);
+        }
+      }
+      // Show data files
+      else {
+        final String outputFileSystem = ReportalMailCreator.outputFileSystem;
+        final String outputBase = ReportalMailCreator.outputLocation;
+
+        final String locationFull = (outputBase + "/" + execId).replace("//", "/");
+
+        final IStreamProvider streamProvider =
+            ReportalUtil.getStreamProvider(outputFileSystem);
+
+        if (streamProvider instanceof StreamProviderHDFS) {
+          final StreamProviderHDFS hdfsStreamProvider =
+              (StreamProviderHDFS) streamProvider;
+          hdfsStreamProvider.setHadoopSecurityManager(this.hadoopSecurityManager);
+          hdfsStreamProvider.setUser(this.reportalStorageUser);
+        }
+
+        try {
+          if (hasParam(req, "download")) {
+            final String fileName = getParam(req, "download");
+            final String filePath = locationFull + "/" + fileName;
+            InputStream csvInputStream = null;
+            OutputStream out = null;
+            try {
+              csvInputStream = streamProvider.getFileInputStream(filePath);
+              resp.setContentType("application/octet-stream");
+
+              out = resp.getOutputStream();
+              IOUtils.copy(csvInputStream, out);
+            } finally {
+              IOUtils.closeQuietly(out);
+              IOUtils.closeQuietly(csvInputStream);
+            }
+            return;
+          }
+          // Show file previews
+          else {
+            page.add("view-preview", true);
+
+            try {
+              String[] fileList = streamProvider.getFileList(locationFull);
+              fileList = ReportalHelper.filterCSVFile(fileList);
+              Arrays.sort(fileList);
+
+              final List<Object> files =
+                  getFilePreviews(fileList, locationFull, streamProvider,
+                      reportal.renderResultsAsHtml);
+
+              page.add("files", files);
+            } catch (final Exception e) {
+              logger.error("Error encountered while processing files in "
+                  + locationFull, e);
+            }
+          }
+        } finally {
+          try {
+            streamProvider.cleanUp();
+          } catch (final IOException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    }
+    // List executions and their data
+    else {
+      page.add("view-executions", true);
+      final ArrayList<ExecutableFlow> exFlows = new ArrayList<>();
+
+      int pageNumber = 0;
+      boolean hasNextPage = false;
+      if (hasParam(req, "page")) {
+        pageNumber = getIntParam(req, "page") - 1;
+      }
+      if (pageNumber < 0) {
+        pageNumber = 0;
+      }
+      try {
+        final Flow flow = project.getFlows().get(0);
+        executorManager.getExecutableFlows(project.getId(), flow.getId(),
+            pageNumber * this.itemsPerPage, this.itemsPerPage, exFlows);
+        final ArrayList<ExecutableFlow> tmp = new ArrayList<>();
+        executorManager.getExecutableFlows(project.getId(), flow.getId(),
+            (pageNumber + 1) * this.itemsPerPage, 1, tmp);
+        if (!tmp.isEmpty()) {
+          hasNextPage = true;
+        }
+      } catch (final ExecutorManagerException e) {
+        page.add("error", "Error retrieving executable flows");
+      }
+
+      if (!exFlows.isEmpty()) {
+        final ArrayList<Object> history = new ArrayList<>();
+        for (final ExecutableFlow exFlow : exFlows) {
+          final HashMap<String, Object> flowInfo = new HashMap<>();
+          flowInfo.put("execId", exFlow.getExecutionId());
+          flowInfo.put("status", exFlow.getStatus().toString());
+          flowInfo.put("startTime", exFlow.getStartTime());
+
+          history.add(flowInfo);
+        }
+        page.add("executions", history);
+      }
+      if (pageNumber > 0) {
+        page.add("pagePrev", pageNumber);
+      }
+      page.add("page", pageNumber + 1);
+      if (hasNextPage) {
+        page.add("pageNext", pageNumber + 2);
+      }
+    }
+
+    page.render();
+  }
+
+  /**
+   * Returns a list of file Objects that contain a "name" property with the file
+   * name, a "content" property with the lines in the file, and a "hasMore"
+   * property if the file contains more than NUM_PREVIEW_ROWS lines.
+   */
+  private List<Object> getFilePreviews(final String[] fileList, final String locationFull,
+      final IStreamProvider streamProvider, final boolean renderResultsAsHtml) {
+    final List<Object> files = new ArrayList<>();
+    InputStream csvInputStream = null;
+
+    try {
+      for (final String fileName : fileList) {
+        final Map<String, Object> file = new HashMap<>();
+        file.put("name", fileName);
+
+        final String filePath = locationFull + "/" + fileName;
+        csvInputStream = streamProvider.getFileInputStream(filePath);
+        final Scanner rowScanner = new Scanner(csvInputStream, StandardCharsets.UTF_8.toString());
+
+        final List<Object> lines = new ArrayList<>();
+        int lineNumber = 0;
+        while (rowScanner.hasNextLine()
+            && lineNumber < ReportalMailCreator.NUM_PREVIEW_ROWS) {
+          final String csvLine = rowScanner.nextLine();
+          final String[] data = csvLine.split("\",\"");
+          final List<String> line = new ArrayList<>();
+          for (final String item : data) {
+            String column = item.replace("\"", "");
+            if (!renderResultsAsHtml) {
+              column = StringEscapeUtils.escapeHtml(column);
+            }
+            line.add(column);
+          }
+          lines.add(line);
+          lineNumber++;
+        }
+
+        file.put("content", lines);
+
+        if (rowScanner.hasNextLine()) {
+          file.put("hasMore", true);
+        }
+
+        files.add(file);
+        rowScanner.close();
+      }
+    } catch (final Exception e) {
+      logger.debug("Error encountered while processing files in "
+          + locationFull, e);
+    } finally {
+      IOUtils.closeQuietly(csvInputStream);
+    }
+
+    return files;
+  }
+
+  private void handleRunReportal(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+    final int id = getIntParam(req, "id");
+    final ProjectManager projectManager = this.server.getProjectManager();
+    final Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/reportal/reportalrunpage.vm");
+    preparePage(page, session);
+
+    final Project project = projectManager.getProject(id);
+    final Reportal reportal = Reportal.loadFromProject(project);
+
+    if (reportal == null) {
+      page.add("errorMsg", "Report not found");
+      page.render();
+      return;
+    }
+
+    if (reportal.getAccessExecutors().size() > 0
+        && !hasPermission(project, session.getUser(), Type.EXECUTE)) {
+      page.add("errorMsg", "You are not allowed to run this report.");
+      page.render();
+      return;
+    }
+
+    page.add("projectId", id);
+    page.add("title", reportal.title);
+    page.add("description", reportal.description);
+
+    final List<Variable> runtimeVariables =
+        ReportalUtil.getRunTimeVariables(reportal.variables);
+    if (runtimeVariables.size() > 0) {
+      page.add("variableNumber", runtimeVariables.size());
+      page.add("variables", runtimeVariables);
+    }
+
+    page.render();
+  }
+
+  private void handleNewReportal(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+
+    final Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/reportal/reportaleditpage.vm");
+    preparePage(page, session);
+
+    page.add("title", "");
+    page.add("description", "");
+
+    page.add("queryNumber", 1);
+
+    final List<Map<String, Object>> queryList = new ArrayList<>();
+    page.add("queries", queryList);
+
+    final Map<String, Object> query = new HashMap<>();
+    queryList.add(query);
+    query.put("title", "");
+    query.put("type", "");
+    query.put("script", "");
+
+    page.add("accessViewer", "");
+    page.add("accessExecutor", "");
+    page.add("accessOwner", "");
+    page.add("notifications", "");
+    page.add("failureNotifications", "");
+
+    page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
+    page.add("default_schedule_dates", this.default_schedule_dates);
+
+    page.render();
+  }
+
+  private void handleEditReportal(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+    final int id = getIntParam(req, "id");
+    final ProjectManager projectManager = this.server.getProjectManager();
+
+    final Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/reportal/reportaleditpage.vm");
+    preparePage(page, session);
+    page.add("ReportalHelper", ReportalHelper.class);
+
+    final Project project = projectManager.getProject(id);
+    final Reportal reportal = Reportal.loadFromProject(project);
+
+    final List<String> errors = new ArrayList<>();
+
+    if (reportal == null) {
+      errors.add("Report not found");
+      page.add("errorMsgs", errors);
+      page.render();
+      return;
+    }
+
+    if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
+      errors.add("You are not allowed to edit this report.");
+      page.add("errorMsgs", errors);
+      page.render();
+      return;
+    }
+
+    page.add("projectId", id);
+    page.add("title", reportal.title);
+    page.add("description", reportal.description);
+    page.add("queryNumber", reportal.queries.size());
+    page.add("queries", reportal.queries);
+    page.add("variableNumber", reportal.variables.size());
+    page.add("variables", reportal.variables);
+    page.add("schedule", reportal.schedule);
+    page.add("scheduleHour", reportal.scheduleHour);
+    page.add("scheduleMinute", reportal.scheduleMinute);
+    page.add("scheduleAmPm", reportal.scheduleAmPm);
+    page.add("scheduleTimeZone", reportal.scheduleTimeZone);
+    page.add("scheduleDate", reportal.scheduleDate);
+    page.add("endScheduleDate", reportal.endSchedule);
+    page.add("scheduleRepeat", reportal.scheduleRepeat);
+    page.add("scheduleIntervalQuantity", reportal.scheduleIntervalQuantity);
+    page.add("scheduleInterval", reportal.scheduleInterval);
+    page.add("renderResultsAsHtml", reportal.renderResultsAsHtml);
+    page.add("notifications", reportal.notifications);
+    page.add("failureNotifications", reportal.failureNotifications);
+    page.add("accessViewer", reportal.accessViewer);
+    page.add("accessExecutor", reportal.accessExecutor);
+    page.add("accessOwner", reportal.accessOwner);
+
+    page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
+    page.add("default_schedule_dates", this.default_schedule_dates);
+    page.render();
+  }
+
+  @Override
+  protected void handlePost(final HttpServletRequest req, final HttpServletResponse resp,
+      final Session session) throws ServletException, IOException {
+    if (hasParam(req, "ajax")) {
+      final HashMap<String, Object> ret = new HashMap<>();
+
+      handleRunReportalWithVariables(req, ret, session);
+
+      if (ret != null) {
+        this.writeJSON(resp, ret);
+      }
+    } else {
+      handleSaveReportal(req, resp, session);
+    }
+  }
+
+  private void handleSaveReportal(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+    final String projectId = validateAndSaveReport(req, resp, session);
+
+    if (projectId != null) {
+      this.setSuccessMessageInCookie(resp, "Report Saved.");
+
+      final String submitType = getParam(req, "submit");
+      if (submitType.equals("Save")) {
+        resp.sendRedirect(req.getRequestURI() + "?edit&id=" + projectId);
+      } else {
+        resp.sendRedirect(req.getRequestURI() + "?run&id=" + projectId);
+      }
+    }
+  }
+
+  /**
+   * Validates and saves a report, returning the project id of the saved report
+   * if successful, and null otherwise.
+   *
+   * @return The project id of the saved report if successful, and null
+   * otherwise
+   */
+  private String validateAndSaveReport(final HttpServletRequest req,
+      final HttpServletResponse resp, final Session session) throws ServletException,
+      IOException {
+
+    final ProjectManager projectManager = this.server.getProjectManager();
+    final User user = session.getUser();
+
+    final Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/reportal/reportaleditpage.vm");
+    preparePage(page, session);
+    page.add("ReportalHelper", ReportalHelper.class);
+
+    final boolean isEdit = hasParam(req, "id");
+    if (isEdit) {
+      page.add("projectId", getIntParam(req, "id"));
+    }
+
+    Project project = null;
+    final Reportal report = new Reportal();
+
+    report.title = getParam(req, "title");
+    report.description = getParam(req, "description");
+    page.add("title", report.title);
+    page.add("description", report.description);
+
+    report.schedule = hasParam(req, "schedule");
+    report.scheduleHour = getParam(req, "schedule-hour");
+    report.scheduleMinute = getParam(req, "schedule-minute");
+    report.scheduleAmPm = getParam(req, "schedule-am_pm");
+    report.scheduleTimeZone = getParam(req, "schedule-timezone");
+    report.scheduleDate = getParam(req, "schedule-date");
+    report.scheduleRepeat = hasParam(req, "schedule-repeat");
+    report.scheduleIntervalQuantity =
+        getParam(req, "schedule-interval-quantity");
+    report.scheduleInterval = getParam(req, "schedule-interval");
+    report.renderResultsAsHtml = hasParam(req, "render-results-as-html");
+
+    final boolean isEndSchedule = hasParam(req, "end-schedule-date");
+    if (isEndSchedule) {
+      report.endSchedule = getParam(req, "end-schedule-date");
+    }
+
+    page.add("schedule", report.schedule);
+    page.add("scheduleHour", report.scheduleHour);
+    page.add("scheduleMinute", report.scheduleMinute);
+    page.add("scheduleAmPm", report.scheduleAmPm);
+    page.add("scheduleTimeZone", report.scheduleTimeZone);
+    page.add("scheduleDate", report.scheduleDate);
+    page.add("scheduleRepeat", report.scheduleRepeat);
+    page.add("scheduleIntervalQuantity", report.scheduleIntervalQuantity);
+    page.add("scheduleInterval", report.scheduleInterval);
+    page.add("renderResultsAsHtml", report.renderResultsAsHtml);
+    page.add("endSchedule", report.endSchedule);
+    page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
+    page.add("default_schedule_dates", this.default_schedule_dates);
+
+    report.accessViewer = getParam(req, "access-viewer");
+    report.accessExecutor = getParam(req, "access-executor");
+    report.accessOwner = getParam(req, "access-owner");
+    page.add("accessViewer", report.accessViewer);
+    page.add("accessExecutor", report.accessExecutor);
+
+    // Adding report creator as explicit owner, if not present already
+    if (report.accessOwner == null || report.accessOwner.isEmpty()) {
+      report.accessOwner = user.getUserId();
+    } else {
+      final String[] splittedOwners = report.accessOwner.toLowerCase()
+          .split(Reportal.ACCESS_LIST_SPLIT_REGEX);
+      if (!Arrays.asList(splittedOwners).contains(user.getUserId())) {
+        report.accessOwner = String.format("%s,%s", user.getUserId(),
+            StringUtils.join(splittedOwners, ','));
+      } else {
+        report.accessOwner = StringUtils.join(splittedOwners, ',');
+      }
+    }
+
+    page.add("accessOwner", report.accessOwner);
+
+    report.notifications = getParam(req, "notifications");
+    report.failureNotifications = getParam(req, "failure-notifications");
+    page.add("notifications", report.notifications);
+    page.add("failureNotifications", report.failureNotifications);
+
+    final int numQueries = getIntParam(req, "queryNumber");
+    page.add("queryNumber", numQueries);
+    final List<Query> queryList = new ArrayList<>(numQueries);
+    page.add("queries", queryList);
+    report.queries = queryList;
+
+    final List<String> errors = new ArrayList<>();
+    for (int i = 0; i < numQueries; i++) {
+      final Query query = new Query();
+
+      query.title = getParam(req, "query" + i + "title");
+      query.type = getParam(req, "query" + i + "type");
+      query.script = getParam(req, "query" + i + "script");
+
+      // Type check
+      final ReportalType type = ReportalType.getTypeByName(query.type);
+      if (type == null) {
+        errors.add("Type " + query.type + " is invalid.");
+      }
+
+      if (!type.checkPermission(user) && report.schedule) {
+        errors.add("You do not have permission to schedule Type " + query.type + ".");
+      }
+
+      queryList.add(query);
+    }
+
+    final int variables = getIntParam(req, "variableNumber");
+    page.add("variableNumber", variables);
+    final List<Variable> variableList = new ArrayList<>(variables);
+    page.add("variables", variableList);
+    report.variables = variableList;
+
+    String proxyUser = null;
+
+    for (int i = 0; i < variables; i++) {
+      final Variable variable =
+          new Variable(getParam(req, "variable" + i + "title"), getParam(req,
+              "variable" + i + "name"));
+
+      if (variable.title.isEmpty() || variable.name.isEmpty()) {
+        errors.add("Variable title and name cannot be empty.");
+      }
+
+      if (variable.title.equals("reportal.config.reportal.execution.user")) {
+        proxyUser = variable.name;
+      }
+
+      variableList.add(variable);
+    }
+
+    // Make sure title isn't empty
+    if (report.title.isEmpty()) {
+      errors.add("Title must not be empty.");
+    }
+
+    // Make sure description isn't empty
+    if (report.description.isEmpty()) {
+      errors.add("Description must not be empty.");
+    }
+
+    // Verify schedule and repeat
+    if (report.schedule) {
+      // Verify schedule time
+      if (!NumberUtils.isDigits(report.scheduleHour)
+          || !NumberUtils.isDigits(report.scheduleMinute)) {
+        errors.add("Schedule time is invalid.");
+      }
+
+      // Verify schedule date is not empty
+      if (report.scheduleDate.isEmpty()) {
+        errors.add("Schedule date must not be empty.");
+      }
+
+      if (report.scheduleRepeat) {
+        // Verify repeat interval
+        if (!NumberUtils.isDigits(report.scheduleIntervalQuantity)) {
+          errors.add("Repeat interval quantity is invalid.");
+        }
+      }
+    }
+
+    // Empty query check
+    if (numQueries <= 0) {
+      errors.add("There needs to have at least one query.");
+    }
+
+    // Validate access users
+    final UserManager userManager = getApplication().getUserManager();
+    final String[] accessLists =
+        new String[]{report.accessViewer, report.accessExecutor,
+            report.accessOwner};
+    for (String accessList : accessLists) {
+      if (accessList == null) {
+        continue;
+      }
+
+      accessList = accessList.trim();
+      if (!accessList.isEmpty()) {
+        final String[] users = accessList.split(Reportal.ACCESS_LIST_SPLIT_REGEX);
+        for (final String accessUser : users) {
+          if (!userManager.validateUser(accessUser)) {
+            errors.add("User " + accessUser + " in access list is invalid.");
+          }
+        }
+      }
+    }
+
+    // Validate proxy user
+    if (proxyUser != null) {
+      if (!userManager.validateProxyUser(proxyUser, user)) {
+        errors.add("User " + user.getUserId() + " has no permission to add " + proxyUser
+            + " as proxy user.");
+      }
+      proxyUser = null;
+    }
+
+    // Validate email addresses
+    final Set<String> emails =
+        ReportalHelper.parseUniqueEmails(report.notifications + ","
+            + report.failureNotifications, Reportal.ACCESS_LIST_SPLIT_REGEX);
+    for (final String email : emails) {
+      if (!ReportalHelper.isValidEmailAddress(email)) {
+        errors.add("Invalid email address: " + email);
+        continue;
+      }
+
+      final String domain = ReportalHelper.getEmailDomain(email);
+      if (this.allowedEmailDomains != null && !this.allowedEmailDomains.contains(domain)) {
+        errors.add("Email address '" + email + "' has an invalid domain '"
+            + domain + "'. " + "Valid domains are: " + this.allowedEmailDomains);
+      }
+    }
+
+    if (errors.size() > 0) {
+      page.add("errorMsgs", errors);
+      page.render();
+      return null;
+    }
+
+    // Attempt to get a project object
+    if (isEdit) {
+      // Editing mode, load project
+      final int projectId = getIntParam(req, "id");
+      project = projectManager.getProject(projectId);
+      report.loadImmutableFromProject(project);
+    } else {
+      // Creation mode, create project
+      try {
+        project =
+            ReportalHelper.createReportalProject(this.server, report.title,
+                report.description, user);
+        report.reportalUser = user.getUserId();
+        report.ownerEmail = user.getEmail();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        errors.add("Error while creating report. " + e.getMessage());
+        page.add("errorMsgs", errors);
+        page.render();
+        return null;
+      }
+
+      // Project already exists
+      if (project == null) {
+        errors.add("A Report with the same name already exists.");
+        page.add("errorMsgs", errors);
+        page.render();
+        return null;
+      }
+    }
+
+    if (project == null) {
+      errors.add("Internal Error: Report not found");
+      page.add("errorMsgs", errors);
+      page.render();
+      return null;
+    }
+
+    report.project = project;
+    page.add("projectId", project.getId());
+
+    try {
+      report.createZipAndUpload(projectManager, user, this.reportalStorageUser);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      errors.add("Error while creating Azkaban jobs. " + e.getMessage());
+      page.add("errorMsgs", errors);
+      page.render();
+      if (!isEdit) {
+        try {
+          projectManager.removeProject(project, user);
+        } catch (final ProjectManagerException e1) {
+          e1.printStackTrace();
+        }
+      }
+      return null;
+    }
+
+    // Prepare flow
+    final Flow flow = project.getFlows().get(0);
+    project.getMetadata().put("flowName", flow.getId());
+
+    // Set Reportal mailer
+    flow.setMailCreator(ReportalMailCreator.REPORTAL_MAIL_CREATOR);
+
+    // Create/Save schedule
+    final ScheduleManager scheduleManager = this.server.getScheduleManager();
+    try {
+      report.updateSchedules(report, scheduleManager, user, flow);
+    } catch (final ScheduleManagerException e) {
+      e.printStackTrace();
+      errors.add(e.getMessage());
+      page.add("errorMsgs", errors);
+      page.render();
+      return null;
+    }
+
+    report.saveToProject(project);
+
+    try {
+      ReportalHelper.updateProjectNotifications(project, projectManager);
+      projectManager.updateProjectSetting(project);
+      projectManager
+          .updateProjectDescription(project, report.description, user);
+      updateProjectPermissions(project, projectManager, report, user);
+      projectManager.updateFlow(project, flow);
+    } catch (final ProjectManagerException e) {
+      e.printStackTrace();
+      errors.add("Error while updating report. " + e.getMessage());
+      page.add("errorMsgs", errors);
+      page.render();
+      if (!isEdit) {
+        try {
+          projectManager.removeProject(project, user);
+        } catch (final ProjectManagerException e1) {
+          e1.printStackTrace();
+        }
+      }
+      return null;
+    }
+
+    return Integer.toString(project.getId());
+  }
+
+  private void updateProjectPermissions(final Project project,
+      final ProjectManager projectManager, final Reportal report, final User currentUser)
+      throws ProjectManagerException {
+    // Old permissions and users
+    final List<Pair<String, Permission>> oldPermissions =
+        project.getUserPermissions();
+    final Set<String> oldUsers = new HashSet<>();
+    for (final Pair<String, Permission> userPermission : oldPermissions) {
+      oldUsers.add(userPermission.getFirst());
+    }
+
+    // Update permissions
+    report.updatePermissions();
+
+    // New permissions and users
+    final List<Pair<String, Permission>> newPermissions =
+        project.getUserPermissions();
+    final Set<String> newUsers = new HashSet<>();
+    for (final Pair<String, Permission> userPermission : newPermissions) {
+      newUsers.add(userPermission.getFirst());
+    }
+
+    // Save all new permissions
+    for (final Pair<String, Permission> userPermission : newPermissions) {
+      if (!oldPermissions.contains(userPermission)) {
+        projectManager.updateProjectPermission(project,
+            userPermission.getFirst(), userPermission.getSecond(), false,
+            currentUser);
+      }
+    }
+
+    // Remove permissions for any old users no longer in the new users
+    for (final String oldUser : oldUsers) {
+      if (!newUsers.contains(oldUser)) {
+        projectManager.removeProjectPermission(project, oldUser, false,
+            currentUser);
+      }
+    }
+  }
+
+  private void handleRunReportalWithVariables(final HttpServletRequest req,
+      final HashMap<String, Object> ret, final Session session) throws ServletException,
+      IOException {
+    final boolean isTestRun = hasParam(req, "testRun");
+
+    final int id = getIntParam(req, "id");
+    final ProjectManager projectManager = this.server.getProjectManager();
+    final Project project = projectManager.getProject(id);
+    final Reportal report = Reportal.loadFromProject(project);
+    final User user = session.getUser();
+
+    if (report.getAccessExecutors().size() > 0
+        && !hasPermission(project, user, Type.EXECUTE)) {
+      ret.put("error", "You are not allowed to run this report.");
+      return;
+    }
+
+    for (final Query query : report.queries) {
+      final String jobType = query.type;
+      final ReportalType type = ReportalType.getTypeByName(jobType);
+      if (!type.checkPermission(user)) {
+        ret.put(
+            "error",
+            "You are not allowed to run this report as you don't have permission to run job type "
+                + type.toString() + ".");
+        return;
+      }
+    }
+
+    final Flow flow = project.getFlows().get(0);
+
+    final ExecutableFlow exflow = new ExecutableFlow(project, flow);
+    exflow.setSubmitUser(user.getUserId());
+    exflow.addAllProxyUsers(project.getProxyUsers());
+
+    final ExecutionOptions options = exflow.getExecutionOptions();
+
+    int i = 0;
+    for (final Variable variable : ReportalUtil.getRunTimeVariables(report.variables)) {
+      options.getFlowParameters().put(REPORTAL_VARIABLE_PREFIX + i + ".from",
+          variable.name);
+      options.getFlowParameters().put(REPORTAL_VARIABLE_PREFIX + i + ".to",
+          getParam(req, "variable" + i));
+      i++;
+    }
+
+    options.getFlowParameters()
+        .put("reportal.execution.user", user.getUserId());
+
+    // Add the execution user's email to the list of success and failure emails.
+    final String email = user.getEmail();
+
+    if (email != null && !email.isEmpty()) {
+      if (isTestRun) { // Only email the executor
+        final List<String> emails = new ArrayList<>();
+        emails.add(email);
+        options.setSuccessEmails(emails);
+        options.setFailureEmails(emails);
+      } else {
+        options.getSuccessEmails().add(email);
+        options.getFailureEmails().add(email);
+      }
+    }
+
+    options.getFlowParameters().put("reportal.title", report.title);
+    options.getFlowParameters().put("reportal.render.results.as.html",
+        report.renderResultsAsHtml ? "true" : "false");
+    options.getFlowParameters().put("reportal.unscheduled.run", "true");
+
+    try {
+      final String message =
+          this.server.getExecutorManager().submitExecutableFlow(exflow,
+              session.getUser().getUserId())
+              + ".";
+      ret.put("message", message);
+      ret.put("result", "success");
+      ret.put("redirect", "/reportal?view&logs&id=" + project.getId()
+          + "&execid=" + exflow.getExecutionId());
+    } catch (final ExecutorManagerException e) {
+      e.printStackTrace();
+      ret.put("error",
+          "Error running report " + report.title + ". " + e.getMessage());
+    }
+  }
+
+  private void preparePage(final Page page, final Session session) {
+    page.add("viewerName", this.viewerName);
+    page.add("hideNavigation", !this.showNav);
+    page.add("userid", session.getUser().getUserId());
+    page.add("esc", new EscapeTool());
+  }
+
+  private class CleanerThread extends Thread {
+
+    private static final long DEFAULT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 1000;
+    private static final long DEFAULT_OUTPUT_DIR_RETENTION_MS = 7 * 24 * 60
+        * 60 * 1000;
+    private static final long DEFAULT_MAIL_TEMP_DIR_RETENTION_MS =
+        24 * 60 * 60 * 1000;
+    // The frequency, in milliseconds, that the Reportal output
+    // and mail temp directories should be cleaned
+    private final long CLEAN_INTERVAL_MS;
+    // The duration, in milliseconds, that Reportal output should be retained
+    // for
+    private final long OUTPUT_DIR_RETENTION_MS;
+    // The duration, in milliseconds, that Reportal mail temp files should be
+    // retained for
+    private final long MAIL_TEMP_DIR_RETENTION_MS;
+    private boolean shutdown = false;
+
+    public CleanerThread() {
+      this.setName("Reportal-Cleaner-Thread");
+      this.CLEAN_INTERVAL_MS =
+          ReportalServlet.this.props
+              .getLong("reportal.clean.interval.ms", DEFAULT_CLEAN_INTERVAL_MS);
+      this.OUTPUT_DIR_RETENTION_MS =
+          ReportalServlet.this.props.getLong("reportal.output.dir.retention.ms",
+              DEFAULT_OUTPUT_DIR_RETENTION_MS);
+      this.MAIL_TEMP_DIR_RETENTION_MS =
+          ReportalServlet.this.props.getLong("reportal.mail.temp.dir.retention.ms",
+              DEFAULT_MAIL_TEMP_DIR_RETENTION_MS);
+    }
+
+    @SuppressWarnings("unused")
+    public void shutdown() {
+      this.shutdown = true;
+      this.interrupt();
+    }
+
+    @Override
+    public void run() {
+      while (!this.shutdown) {
+        synchronized (this) {
+          logger.info("Cleaning old execution output dirs");
+          cleanOldReportalOutputDirs();
+
+          logger.info("Cleaning Reportal mail temp directory");
+          cleanReportalMailTempDir();
+        }
+
+        try {
+          Thread.sleep(this.CLEAN_INTERVAL_MS);
+        } catch (final InterruptedException e) {
+          logger.error("CleanerThread's sleep was interrupted.", e);
+        }
+      }
+    }
+
+    private void cleanOldReportalOutputDirs() {
+      final IStreamProvider streamProvider =
+          ReportalUtil.getStreamProvider(ReportalMailCreator.outputFileSystem);
+
+      if (streamProvider instanceof StreamProviderHDFS) {
+        final StreamProviderHDFS hdfsStreamProvider =
+            (StreamProviderHDFS) streamProvider;
+        hdfsStreamProvider.setHadoopSecurityManager(ReportalServlet.this.hadoopSecurityManager);
+        hdfsStreamProvider.setUser(ReportalServlet.this.reportalStorageUser);
+      }
+
+      final long pastTimeThreshold =
+          System.currentTimeMillis() - this.OUTPUT_DIR_RETENTION_MS;
+
+      String[] oldFiles = null;
+      try {
+        oldFiles =
+            streamProvider.getOldFiles(ReportalMailCreator.outputLocation,
+                pastTimeThreshold);
+      } catch (final Exception e) {
+        logger.error("Error getting old files from "
+            + ReportalMailCreator.outputLocation + " on "
+            + ReportalMailCreator.outputFileSystem + " file system.", e);
+      }
+
+      if (oldFiles != null) {
+        for (final String file : oldFiles) {
+          final String filePath = ReportalMailCreator.outputLocation + "/" + file;
+          try {
+            streamProvider.deleteFile(filePath);
+          } catch (final Exception e) {
+            logger.error("Error deleting file " + filePath + " from "
+                + ReportalMailCreator.outputFileSystem + " file system.", e);
+          }
+        }
+      }
+    }
+
+    private void cleanReportalMailTempDir() {
+      final File dir = ReportalServlet.this.reportalMailTempDirectory;
+      final long pastTimeThreshold =
+          System.currentTimeMillis() - this.MAIL_TEMP_DIR_RETENTION_MS;
+
+      final File[] oldMailTempDirs = dir.listFiles(new FileFilter() {
+        @Override
+        public boolean accept(final File path) {
+          if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
+            return true;
+          }
+          return false;
+        }
+      });
+
+      for (final File tempDir : oldMailTempDirs) {
+        try {
+          FileUtils.deleteDirectory(tempDir);
+        } catch (final IOException e) {
+          logger.error(
+              "Error cleaning Reportal mail temp dir " + tempDir.getPath(), e);
+        }
+      }
+    }
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
new file mode 100644
index 0000000..1dd845a
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import azkaban.reportal.util.Reportal;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+
+public enum ReportalType {
+
+  PigJob("ReportalPig", "reportalpig", "hadoop") {
+    @Override
+    public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
+        final File jobFile, final String jobName, final String queryScript,
+        final String proxyUser) {
+      final File resFolder = new File(jobFile.getParentFile(), "res");
+      resFolder.mkdirs();
+      final File scriptFile = new File(resFolder, jobName + ".pig");
+
+      OutputStream fileOutput = null;
+      try {
+        scriptFile.createNewFile();
+        fileOutput = new BufferedOutputStream(new FileOutputStream(scriptFile));
+        fileOutput.write(queryScript.getBytes(Charset.forName("UTF-8")));
+      } catch (final IOException e) {
+        e.printStackTrace();
+      } finally {
+        if (fileOutput != null) {
+          try {
+            fileOutput.close();
+          } catch (final IOException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+      propertiesFile.put("reportal.pig.script", "res/" + jobName + ".pig");
+    }
+  },
+  HiveJob("ReportalHive", "reportalhive", "hadoop"), TeraDataJob(
+      "ReportalTeraData", "reportalteradata", "teradata"), DataCollectorJob(
+      ReportalTypeManager.DATA_COLLECTOR_JOB,
+      ReportalTypeManager.DATA_COLLECTOR_JOB_TYPE, "") {
+    @Override
+    public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
+        final File jobFile, final String jobName, final String queryScript,
+        final String proxyUser) {
+      propertiesFile.put("user.to.proxy", proxyUser);
+    }
+  };
+
+  private static final HashMap<String, ReportalType> reportalTypes =
+      new HashMap<>();
+
+  static {
+    for (final ReportalType type : ReportalType.values()) {
+      reportalTypes.put(type.typeName, type);
+    }
+  }
+
+  private final String typeName;
+  private final String jobTypeName;
+  private final String permissionName;
+
+  private ReportalType(final String typeName, final String jobTypeName,
+      final String permissionName) {
+    this.typeName = typeName;
+    this.jobTypeName = jobTypeName;
+    this.permissionName = permissionName;
+  }
+
+  public static ReportalType getTypeByName(final String typeName) {
+    return reportalTypes.get(typeName);
+  }
+
+  public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
+      final File jobFile, final String jobName, final String queryScript, final String proxyUser) {
+
+  }
+
+  public String getJobTypeName() {
+    return this.jobTypeName;
+  }
+
+  public boolean checkPermission(final User user) {
+    return user.hasPermission(this.permissionName);
+  }
+
+  @Override
+  public String toString() {
+    return this.typeName;
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalTypeManager.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalTypeManager.java
new file mode 100644
index 0000000..b0e1a56
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalTypeManager.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.reportal.util.Reportal;
+import azkaban.utils.Props;
+import java.io.File;
+import java.util.Map;
+
+public class ReportalTypeManager {
+  public static final String DATA_COLLECTOR_JOB = "ReportalDataCollector";
+  public static final String DATA_COLLECTOR_JOB_TYPE = "reportaldatacollector";
+
+  public static void createJobAndFiles(final Reportal reportal, final File jobFile,
+      final String jobName, final String queryTitle, final String queryType,
+      final String queryScript,
+      final String dependentJob, final String userName, final Map<String, String> extras)
+      throws Exception {
+
+    // Create props for the job
+    final Props propertiesFile = new Props();
+    propertiesFile.put("title", queryTitle);
+
+    final ReportalType type = ReportalType.getTypeByName(queryType);
+
+    if (type == null) {
+      throw new Exception("Type " + queryType + " is invalid.");
+    }
+
+    propertiesFile.put("reportal.title", reportal.title);
+    propertiesFile.put("reportal.job.title", jobName);
+    propertiesFile.put("reportal.job.query", queryScript);
+    propertiesFile.put("user.to.proxy", "${reportal.execution.user}");
+    propertiesFile.put("reportal.proxy.user", "${reportal.execution.user}");
+
+    type.buildJobFiles(reportal, propertiesFile, jobFile, jobName, queryScript,
+        userName);
+
+    propertiesFile.put(CommonJobProperties.JOB_TYPE, type.getJobTypeName());
+    propertiesFile.put(JavaProcessJob.JVM_PARAMS, "-Dreportal.user.name=${reportal.execution.user}"
+        + " -Dreportal.execid=${azkaban.flow.execid}");
+
+    // Order dependency
+    if (dependentJob != null) {
+      propertiesFile.put(CommonJobProperties.DEPENDENCIES, dependentJob);
+    }
+
+    if (extras != null) {
+      propertiesFile.putAll(extras);
+    }
+
+    propertiesFile.storeLocal(jobFile);
+  }
+}
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaldatapage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaldatapage.vm
new file mode 100644
index 0000000..49ec7a3
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaldatapage.vm
@@ -0,0 +1,195 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+  <head>
+    <title>Reportal</title>
+    <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+    <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+    <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/bootstrap.min.js"></script>
+
+    <script type="text/javascript">
+      var contextURL = "${context}";
+      var currentTime = ${currentTime};
+      var timezone = "${timezone}";
+      var homeDir = "${homedir}";
+#if($project)
+      var projectId = ${project.id};
+#end
+    </script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal-data.js"></script>
+    <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+  </head>
+  <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+  #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+    <div class="content" style="margin-top: 41px;">
+      <div id="box-error">
+  #if($errorMsg)
+        <div class="box-error-message">$errorMsg</div>
+  #else
+    #if($error_message != "null")
+        <div class="box-error-message">$error_message</div>
+    #elseif($success_message != "null")
+        <div class="box-success-message">$success_message</div>
+    #end
+  #end
+      </div>
+      <div>&nbsp;</div>
+#if($project)
+      <div class="container">
+        <h2 style="font-size: 31.5px;">${project.metadata.title}</h2>
+        <div class="well">${project.description}</div>
+      </div>
+#end
+      <div style="text-align: center;">
+        <a class="btn btn-info" href="${context}/reportal">Reportal Home</a>
+#if($project)
+        <a class="btn btn-primary" href="${context}/reportal?view&amp;id=${project.id}">Report History</a>
+#if($execid)
+        <a class="btn btn-primary" href="${context}/reportal?view&amp;id=${project.id}&amp;execid=${execid}">Data</a>
+        <a class="btn btn-primary" href="${context}/reportal?view&amp;logs&amp;id=${project.id}&amp;execid=${execid}">Logs</a>
+#end
+        <a class="btn btn-warning" href="${context}/reportal?edit&amp;id=${project.id}">Edit</a>
+        <a class="btn btn-success button-run" href="${context}/reportal?run&amp;id=${project.id}">Run</a>
+#end
+      </div>
+      <div>&nbsp;</div>
+      <div class="container">
+        <div>
+#if($view-executions)
+          <div style="text-align: center;">
+  #if($pagePrev)
+            <a class="btn btn-primary" href="${context}/reportal?view&amp;id=${project.id}&amp;page=${pagePrev}">Previous page</a>
+  #end
+            <a class="btn btn-inverse" href="${context}/reportal?view&amp;id=${project.id}&amp;page=${page}">Page ${page}</a>
+  #if($pageNext)
+            <a class="btn btn-primary" href="${context}/reportal?view&amp;id=${project.id}&amp;page=${pageNext}">Next page</a>
+  #end
+          </div>
+          <div>&nbsp;</div>
+          <table id="report-results" border="0" cellspacing="0" cellpadding="0" class="table table-bordered table-striped">
+            <thead>
+              <tr>
+                <th>Time</th>
+                <th>Status</th>
+                <th>View Data</th>
+                <th>View Logs</th>
+              </tr>
+            </thead>
+            <tbody>
+  #if($executions)
+    #foreach($execution in $executions)
+              <tr>
+                <td>$utils.formatDateTime($execution.startTime)</td>
+                <td>${execution.status}</td>
+                <td><a href="${context}/reportal?view&amp;id=${project.id}&amp;execid=${execution.execId}">Data</a></td>
+                <td><a href="${context}/reportal?view&amp;logs&amp;id=${project.id}&amp;execid=${execution.execId}">Logs</a></td>
+              </tr>
+    #end
+  #else
+              <tr>
+                <td colspan="4" style="text-align:center;">#if($page > 1)There are no reports in this page. #else This report has never been run.#end</td>
+              </tr>
+  #end
+            </tbody>
+          </table>
+#end
+#if($view-logs)
+          <table id="report-results" border="0" cellspacing="0" cellpadding="0" class="table table-bordered table-striped">
+            <thead>
+              <tr>
+                <th>Title</th>
+                <th>Status</th>
+                <th>View Log</th>
+              </tr>
+            </thead>
+            <tbody>
+  #if($logs)
+    #foreach($log in $logs)
+              <tr>
+                <td>${log.id}</td>
+                <td>${log.status}</td>
+                <td><a href="${context}/reportal?view&amp;logs&amp;id=${project.id}&amp;execid=${execid}&amp;log=${log.id}">Log</a></td>
+              </tr>
+    #end
+  #else
+              <tr>
+                <td colspan="2" style="text-align:center;">No log available.</td>
+              </tr>
+  #end
+            </tbody>
+          </table>
+#end
+#if($view-log)
+          <script>
+          var execId = ${execid};
+          var jobId = "${jobId}";
+          var projectId = ${project.id};
+          </script>
+          <script type="text/javascript" src="${context}/reportal/js/reportal-data-log.js"></script>
+          <div id="jobLogView" class="logView" style="top:245px;">
+            <div style="text-align: center;">
+              <a class="btn btn-inverse" id="updateLogBtn" href="#">Refresh</a>
+              <a class="btn btn-inverse" id="toggleLineWrap" href="#">Toggle Line wrapping</a>
+            </div>
+            <div class="logViewer">
+              <pre id="logSection" class="log" style="background-color:#FFFFFF;">Loading log...</pre>
+            </div>
+          </div>
+#end
+#if($view-preview)
+  #if($files)
+    #foreach($file in $files)
+      #set($fileName = $file.get("name"))
+          <div>
+            <a download="$fileName" href="$context/reportal?view&amp;id=${project.id}&amp;execid=$execid&amp;download=$fileName"><b>$fileName</b></a>
+          </div>
+
+          <table id="report-results" border="0" cellspacing="0" cellpadding="0" class="table table-bordered table-striped">
+      #foreach($line in $file.get("content"))
+            <tr>
+        #foreach($item in $line)
+              <td>$esc.html($item)</td>
+        #end
+            </tr>
+      #end
+
+      #if($file.get("hasMore"))
+        #set($numColumns = $file.get("content").get(0).size())
+            <tr>
+              <td colspan="$numColumns">...</td>
+            </tr>
+      #end
+          </table>
+    #end
+  #else
+          <div style="text-align:center;">No data available.</div>
+  #end
+#end
+        </div>
+      </div>
+    </div>
+  </body>
+</html>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
new file mode 100644
index 0000000..4993a78
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
@@ -0,0 +1,316 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+  <head>
+    <title>Reportal</title>
+
+    <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+
+    <link href="${context}/css/jquery-ui.css" rel="stylesheet">
+    <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+    <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+
+    <script type="text/javascript" src="${context}/js/bootstrap.min.js"></script>
+    <link href="${context}/css/bootstrap-datetimepicker.css" rel="stylesheet">
+    <script type="text/javascript" src="${context}/js/moment.min.js"></script>
+    <script type="text/javascript" src="${context}/js/bootstrap-datetimepicker.min.js"></script>
+
+    <link href="${context}/reportal/css/codemirror.css" rel="stylesheet">
+    <link href="${context}/reportal/css/solarized.css" rel="stylesheet">
+    <script type="text/javascript" src="${context}/reportal/js/codemirror.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/mode/sql/sql.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/mode/pig/pig.js"></script>
+
+    <script type="text/javascript">
+      var contextURL = "${context}";
+      var startQueries = [
+        #if($queries)
+          #foreach($query in $queries)
+            #set($num = $velocityCount - 1)
+
+            ## Escape the query title and script strings as JavaScript strings before loading them.
+            #set($queryTitle = $esc.javascript(${query.title}))
+            #set($queryScript = $esc.javascript(${query.script}))
+            {
+              "num" : "${num}",
+              "title" : #if($queryTitle) "$queryTitle" #else "" #end,
+              "type" : "${query.type}",
+              "script" : #if($queryScript) "$queryScript" #else "" #end
+            },
+          #end
+        #end
+      ];
+      var startVariables = [
+        #if($variables)
+          #foreach($variable in $variables)
+            #set($num = $velocityCount - 1)
+                  {
+                    "num" : "${num}",
+                    "title" : "${variable.title}",
+                    "name" : "${variable.name}",
+                  },
+          #end
+        #end
+      ];
+    </script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal-edit.js"></script>
+    <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+  </head>
+  <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+  #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+    <div class="content" style="margin-top: 41px;">
+      <div id="box-error">
+  #if($errorMsgs)
+        <div class="box-error-message">
+    #foreach($errorMsg in $errorMsgs)
+      #set($num = $velocityCount - 1)
+
+      #if($num > 0)
+                    <br/>
+      #end
+
+                    $esc.html($errorMsg)
+    #end
+        </div>
+  #else
+    #if($error_message != "null")
+        <div class="box-error-message">$esc.html($error_message)</div>
+    #elseif($success_message != "null")
+        <div class="box-success-message">$success_message</div>
+    #end
+  #end
+      </div>
+      <div>&nbsp;</div>
+      <form method="post" class="form-horizontal">
+        <div style="text-align: center;">
+          <a class="btn btn-primary" href="${context}/reportal">Reportal Home</a>
+#if($projectId)
+          <a class="btn btn-info" href="${context}/reportal?view&amp;id=${projectId}">Report History</a>
+          <input type="submit" class="btn btn-primary" name="submit" value="Save"/>
+          <input type="submit" class="btn btn-success" name="submit" value="Save and Run"/>
+#end
+        </div>
+#set($title = $esc.html($title))
+        <div>&nbsp;</div>
+        <div class="container">
+          <input id="queryNumber" type="hidden" name="queryNumber" value="#if($queryNumber)${queryNumber}#{else}1#end">
+          <input id="variableNumber" type="hidden" name="variableNumber" value="#if($variableNumber)${variableNumber}#{else}1#end">
+          <fieldset>
+            <legend>Report</legend>
+            <div class="control-group required">
+              <label class="control-label">Title<abbr title="Required" class="required-mark">*</abbr></label>
+              <div class="controls"><input type="text" name="title"#if($title) value="$title"#end></div>
+            </div>
+            <div class="control-group required">
+              <label class="control-label">Description<abbr title="Required" class="required-mark">*</abbr></label>
+              <div class="controls"><textarea class="span8" name="description">${description}</textarea></div>
+            </div>
+            <div class="control-group">
+              <div class="controls">
+                <label class="checkbox">
+                  <input name="render-results-as-html" type="checkbox"#if($renderResultsAsHtml) checked#end />Render results as HTML
+                </label>
+              </div>
+            </div>
+            <div class="control-group">
+              <div class="controls">
+                <label class="checkbox">
+                  <input id="schedule-options" name="schedule" type="checkbox"#if($schedule) checked#end />Schedule
+                </label>
+              </div>
+            </div>
+            <div id="schedule-fields">
+              <div class="control-group">
+                <label for="schedule-time" class="control-label">Time</label>
+                <div id="schedule-time" class="controls">
+                  #set($TWELVE=12)
+                  #set($ZERO_ZERO="00")
+                  <input name="schedule-hour" type="text" maxlength="2" value="#if($scheduleHour && $scheduleHour != "")${scheduleHour}#else$TWELVE#end" style="width:20px;"/>:<input name="schedule-minute" type="text" size="2" maxlength="2" value="#if($scheduleMinute && $scheduleMinute != "")${scheduleMinute}#else$ZERO_ZERO#end" style="width:20px;"/>
+                  <select name="schedule-am_pm" style="width:60px;">
+                    <option#if($scheduleAmPm=="am") selected#end>am</option>
+                    <option#if($scheduleAmPm=="pm") selected#end>pm</option>
+                  </select>
+                  <select name="schedule-timezone" style="width:70px;">
+                    <option#if($scheduleTimeZone==$timezone) selected#end>${timezone}</option>
+                    <option#if($scheduleTimeZone=="UTC") selected#end>UTC</option>
+                  </select>
+                </div>
+              </div>
+              <div class="control-group">
+                <label for="date" class="control-label">Date</label>
+                <div id="date" class="controls" style="position: relative">
+                  <input type="text" id="schedule-date" name="schedule-date"#if($scheduleDate && $scheduleDate != "") value="$scheduleDate"#end/>
+                </div>
+              </div>
+              <div class="control-group">
+                <div class="controls">
+                  <label class="checkbox">
+                    <input id="schedule-repeat" name="schedule-repeat" type="checkbox"#if($scheduleRepeat) checked#end/>Repeat
+                  </label>
+                </div>
+              </div>
+              <div id="schedule-repeat-fields">
+                <div class="control-group">
+                  <label for="interval" class="control-label">Every</label>
+                  <div id="interval" class="controls">
+                    #set($ONE=1)
+                    <input name="schedule-interval-quantity" type="text" maxlength="2" value="#if($scheduleIntervalQuantity)${scheduleIntervalQuantity}#else$ONE#end" style="width:20px;"/>
+                    <select name="schedule-interval" style="width:100px;">
+                      <option value="M"#if($scheduleInterval=="M") selected#end>Minute(s)</option>
+                      <option value="h"#if($scheduleInterval=="h") selected#end>Hour(s)</option>
+                      <option value="d"#if($scheduleInterval=="d") selected#end>Day(s)</option>
+                      <option value="w"#if($scheduleInterval=="w") selected#end>Week(s)</option>
+                      <option value="m"#if($scheduleInterval=="m") selected#end>Month(s)</option>
+                      <option value="y"#if($scheduleInterval=="y") selected#end>Year(s)</option>
+                    </select>
+                  </div>
+                </div>
+
+                <div class="control-group" id='endScheduleId' data-max="$max_allowed_schedule_dates" data-default="$default_schedule_dates">
+                  <label for="endSchedule" class="control-label"> <span style="color:#ff1059">End Schedule Time</span> </label>
+                  <div id="interval_2" class="controls" style="position: relative">
+                    <input type="text" id="end-schedule-date" name="end-schedule-date"#if($endScheduleDate && $endScheduleDate != "") value="$endScheduleDate"#end/>
+                  </div>
+                  <div id="expireNote" class="controls" data-end="$endScheduleDate">
+                  </div>
+                </div>
+              </div>
+            </div>
+          </fieldset>
+          <fieldset>
+            <div id="variable-fields">
+              <legend>Variables</legend>
+              <ol id="variable-list" class="reportal-list">
+              </ol>
+              <ol id="variable-template" style="display:none;">
+                <li class="reportal-list-item">
+                  <div class="query-actions" style="float: right;">
+                    <a class="btn btn-icon btn-left bump-up" title="Move the variable up."><span class="icon icon-arrow-up icon-gray-light icon-medium"></span></a>
+                    <a class="btn btn-icon btn-right bump-down disabled" title="Move the variable down."><span class="icon icon-arrow-down icon-gray-light icon-medium"></span></a>
+                    <a class="btn btn-danger btn-icon delete" style="float: right; margin-left:5px;" title="Remove">Remove</a>
+                  </div>
+                  <div class="control-group required">
+                    <label class="control-label">Title<abbr title="Required" class="required-mark">*</abbr></label>
+                    <div class="controls"><input type="text" class="variabletitle" nametemplate="variable#title" value=""></div>
+                  </div>
+                  <div class="control-group required">
+                    <label class="control-label">Name<abbr title="Required" class="required-mark">*</abbr></label>
+                    <div class="controls"><input type="text" class="variablename" nametemplate="variable#name" value=""></div>
+                  </div>
+                </li>
+              </ol>
+              <div class="control-group">
+                <label class="control-label"></label>
+                <div class="controls"><button id="buttonAddVariable" type="button" class="btn btn-success" value="Add another Variable"><span class="icon icon-plus-alt icon-white icon-small"></span>Add Another Variable</button></div>
+              </div>
+            </div>
+          </fieldset>
+          <legend>Queries</legend>
+          <fieldset>
+            <ol id="query-list" class="reportal-list">
+            </ol>
+            <ol id="query-template" style="display:none;">
+              <li class="reportal-list-item">
+                <div class="query-actions" style="float: right;">
+                  <a class="btn btn-icon btn-left bump-up" title="Move the query up in execution order."><span class="icon icon-arrow-up icon-gray-light icon-medium"></span></a>
+                  <a class="btn btn-icon btn-right bump-down disabled" title="Move the query down in execution order."><span class="icon icon-arrow-down icon-gray-light icon-medium"></span></a>
+                  <a class="btn btn-danger btn-icon delete" style="float: right; margin-left:5px;" title="Remove">Remove</a>
+                </div>
+                <div class="control-group">
+                  <label class="control-label">Title</label>
+                  <div class="controls"><input type="text" class="querytitle" nametemplate="query#title" value="" maxlength="249"></div>
+                </div>
+                <div class="control-group required">
+                  <label class="control-label">Type<abbr title="Required" class="required-mark">*</abbr></label>
+                  <div class="controls">
+                    <select class="querytype" nametemplate="query#type">
+                      <option value="ReportalHive">Hive</option>
+                      <option value="ReportalTeraData">Teradata</option>
+                      <option value="ReportalPig">Pig</option>
+                    </select>
+                  </div>
+                </div>
+                <div class="control-group">
+                  <label class="control-label">Script</label>
+                  <div class="controls"><textarea class="span8 queryscript" nametemplate="query#script"></textarea></div>
+                </div>
+              </li>
+            </ol>
+            <div class="control-group">
+              <label class="control-label"></label>
+              <div class="controls"><button id="buttonAddQuery" type="button" class="btn btn-success" value="Add another Query"><span class="icon icon-plus-alt icon-white icon-small"></span>Add Another Query</button></div>
+            </div>
+          </fieldset>
+          <fieldset>
+            <legend>Access</legend>
+            <div class="control-group">
+              <label class="control-label">Viewers</label>
+              <div class="controls">
+                <textarea class="span8" name="access-viewer">$accessViewer</textarea>
+                <br/>Separate usernames (e.g.: jdoe) by commas, spaces, or semicolons. If left blank, the runs of this report are viewable by everyone.
+              </div>
+            </div>
+            <div class="control-group">
+              <label class="control-label">Executors</label>
+              <div class="controls">
+                <textarea class="span8" name="access-executor">$accessExecutor</textarea>
+                <br/>Separate usernames (e.g.: jdoe) by commas, spaces, or semicolons. If left blank, anyone can execute this report.
+              </div>
+            </div>
+            <div class="control-group">
+              <label class="control-label">Owners</label>
+              <div class="controls">
+                <textarea class="span8" name="access-owner">$accessOwner</textarea>
+                <br/>Separate usernames (e.g.: jdoe) by commas, spaces, or semicolons.
+              </div>
+            </div>
+          </fieldset>
+          <fieldset>
+            <legend>Notification</legend>
+            <div class="control-group">
+              <label class="control-label">Success Emails</label>
+              <div class="controls">
+                <textarea class="span8" name="notifications">$notifications</textarea>
+                <br/>Separate emails (e.g.: jdoe@example.com) by commas, spaces, or semicolons. Note: If the report returns no results, no email is sent, except for unscheduled runs.
+              </div>
+            </div>
+            <div class="control-group">
+              <label class="control-label">Failure Emails</label>
+              <div class="controls">
+                <textarea class="span8" name="failure-notifications">$failureNotifications</textarea>
+                <br/>Separate emails (e.g.: jdoe@example.com) by commas, spaces, or semicolons.
+              </div>
+            </div>
+          </fieldset>
+          <div class="form-actions">
+            <input type="submit" class="btn btn-primary" name="submit" value="Save">
+            <input type="submit" class="btn btn-success" name="submit" value="Save and Run"/>
+            <a href="${context}/reportal" class="btn">Cancel</a>
+          </div>
+        </div>
+      </form>
+    </div>
+  </body>
+</html>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportallistpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportallistpage.vm
new file mode 100644
index 0000000..0e0704e
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportallistpage.vm
@@ -0,0 +1,147 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+  <head>
+    <title>Reportal</title>
+    <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+    <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+    <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/bootstrap.min.js"></script>
+
+    <script type="text/javascript">
+      var contextURL = "${context}";
+      var currentTime = ${currentTime};
+      var timezone = "${timezone}";
+      var homeDir = "${homedir}";
+    </script>
+    <script type="text/javascript">
+      var reportals = [
+#if($projects)
+  #foreach($project in $projects)
+    #set($title = $esc.javascript(${project.getMetadata().title}))
+          {
+            "id" : "${project.id}",
+            "title" : "$title",
+            "time" : $project.createTimestamp,
+            "timeText" : "$utils.formatDateTime($project.createTimestamp)",
+            "user" : "$project.getMetadata().get("accessOwner")",
+            "scheduled" : $ReportalHelper.isScheduledProject($project),
+            "scheduledRepeating" : $ReportalHelper.isScheduledRepeatingProject($project),
+            "bookmark" : $ReportalHelper.isBookmarkProject($project, $user),
+            "subscribe" : $ReportalHelper.isSubscribeProject($project, $user),
+            "shown" : true
+          },
+  #end
+#end
+      ];
+    </script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal-list.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/jquery.tablesorter.min.js"></script>
+    <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+  </head>
+  <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+  #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+    <div class="content" style="margin-top: 41px;">
+      <div id="box-error">
+  #if($errorMsg)
+        <div class="box-error-message">$errorMsg</div>
+  #else
+    #if($error_message != "null")
+        <div class="box-error-message">$error_message</div>
+    #elseif($success_message != "null")
+        <div class="box-success-message">$success_message</div>
+    #end
+  #end
+      </div>
+      <div>&nbsp;</div>
+      <div class="container-fluid" style"font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif;">
+        <div class="row-fluid">
+          <div class="span2">
+            <div class="well">
+              <p style="margin-bottom:20px;"><a href="${context}/reportal?new" class="btn btn-success">Create Report</a></p>
+              <form method="get" accept-charset="utf-8" class="form-inline" id="search-facet-form">
+                <label class="input-list-label">Filter Reports</label>
+                <div class="control-group">
+                  <div class="controls">
+                    <label class="checkbox"><input id="facet_bookmarked" type="checkbox">Bookmarked</label>
+                  </div>
+                </div>
+                <div class="control-group">
+                  <div class="controls">
+                    <label class="checkbox"><input id="facet_subscribed" type="checkbox">Subscribed</label>
+                  </div>
+                </div>
+                <div class="control-group">
+                  <div class="controls">
+                    <label class="checkbox"><input id="facet_date_created" type="checkbox">Date Created</label>
+                  </div>
+                </div>
+                <div class="facet-input">
+                  <input class="span2 hasDatepicker" id="date_created_from" name="date_created_from" type="text" value="${startDate}" style="width:130px;">
+                  to
+                  <input class="span2 hasDatepicker" id="date_created_to" name="date_created_to" type="text" value="${endDate}" style="width:130px;">
+                </div>
+                <div class="control-group">
+                  <div class="controls">
+                    <label class="checkbox"><input id="facet_owner" type="checkbox" checked="checked">Owner</label>
+                  </div>
+                </div>
+                <div class="facet-input">
+                  <input class="span2" id="owner" type="text" value="$userid" style="width:130px;">
+                </div>
+              </form>
+            </div>
+          </div>
+          <div class="span10">
+            <table id="reportalTable" class="table table-bordered table-striped">
+              <thead>
+                <tr>
+                  <th>Title</th>
+                  <th>Date Created</th>
+                  <th>Owner</th>
+                </tr>
+              </thead>
+              <tbody>
+              </tbody>
+            </table>
+          </div>
+          <div id="action-template" style="display:none;">
+            <div class="report-actions">
+              <a href="" title="Toggle bookmarked" class="btn btn-icon btn-left button-bookmark"><span class="icon icon-bookmark icon-gray-light icon-small"></span></a><a href="" title="Toggle subscription" class="btn btn-icon btn-middle button-subscribe"><span class="icon icon-mail icon-gray-light icon-small"></span></a><div class="btn-dropdown"><a class="btn btn-icon btn-right dropdown-toggle"><span class="icon icon-cog icon-gray-light icon-small"></span></a>
+                <ul class="dropdown-menu">
+                  <li><a class="button-view" href="">View</a></li>
+                  <li><a class="button-edit" href="">Edit</a></li>
+                  <li class="divider"></li>
+                  <li><a class="button-run" href="">Run</a></li>
+                  <li><a class="button-delete" href="">Delete</a></li>
+                </ul>
+              </div>
+            </div>
+          </div>
+        </div>
+      </div>
+    </div>
+  </body>
+</html>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalnavigation.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalnavigation.vm
new file mode 100644
index 0000000..e22a53d
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalnavigation.vm
@@ -0,0 +1,30 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<div class="topbar navbar navbar-fixed-top navbar-inverse">
+  <div class="navbar-inner">
+    <div class="container-fluid">
+      <div class="topbar-content">
+        <a class="brand" href="${context}/reportal">Reportal</a>
+        <div class="login pull-right">
+          <p class="navbar-text">
+            Logged in as $userid | <a href="${context}/reportal?logout">Sign out</a>
+          </p>
+        </div>
+      </div>
+    </div>
+  </div>
+</div>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalrunpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalrunpage.vm
new file mode 100644
index 0000000..73c250c
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalrunpage.vm
@@ -0,0 +1,99 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+  <head>
+    <title>Reportal</title>
+    <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+    <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+    <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+    <script type="text/javascript">
+      var contextURL = "${context}";
+    </script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+    <script type="text/javascript" src="${context}/reportal/js/reportal-run.js"></script>
+    <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+  </head>
+  <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+  #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+    <div class="content" style="margin-top: 41px;">
+      <div id="box-error">
+  #if($errorMsg)
+        <div class="box-error-message">$errorMsg</div>
+  #else
+    #if($error_message != "null")
+        <div class="box-error-message">$error_message</div>
+    #elseif($success_message != "null")
+        <div class="box-success-message">$success_message</div>
+    #end
+  #end
+      </div>
+      <div>&nbsp;</div>
+      <div style="text-align: center;">
+        <a class="btn btn-info" href="${context}/reportal">Reportal Home</a>
+#if($projectId)
+        <a class="btn btn-primary" href="${context}/reportal?view&amp;id=$projectId">Report History</a>
+        <a class="btn btn-warning" href="${context}/reportal?edit&amp;id=$projectId">Edit</a>
+#end
+      </div>
+#if($projectId)
+      <div>&nbsp;</div>
+      <div class="container">
+        <div>
+          <form method="post" class="form-horizontal">
+            <input type="hidden" name="id" value="$projectId">
+            <fieldset>
+              <legend>Report</legend>
+              <div class="control-group">
+                <label class="control-label">Title</label>
+                <div class="controls" style="padding-top:5px;">${title}</div>
+              </div>
+              <div class="control-group">
+                <label class="control-label">Description</label>
+                <div class="controls" style="padding-top:5px;">${description}</div>
+              </div>
+            </fieldset>
+  #if($variables)
+            <legend>Variables</legend>
+            <fieldset>
+    #foreach($variable in $variables)
+      #set($num = $velocityCount - 1)
+              <div class="control-group">
+                <label class="control-label">${variable.title}</label>
+                <div class="controls"><input type="text" name="variable${num}" value=""></div>
+              </div>
+    #end
+            </fieldset>
+  #end
+            <div class="form-actions">
+              <input id="run-button" type="submit" class="btn btn-primary" value="Run">
+              <input id="test-run-button" type="submit" class="btn btn-info" value="Test Run (only email executor)">
+              <a href="${context}/reportal" class="btn">Cancel</a>
+            </div>
+          <form>
+        </div>
+      </div>
+#end
+    </div>
+  </body>
+</html>

build.gradle 3(+3 -0)

diff --git a/build.gradle b/build.gradle
index 60eea52..9f33f3f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,6 +54,7 @@ allprojects {
 ext.versions = [
         hadoop: '2.6.1',
         hive  : '1.1.0',
+        pig   : '0.15.0',
         restli: '1.15.7',
         slf4j : '1.7.18',
 ]
@@ -78,6 +79,7 @@ ext.deps = [
         hadoopHdfs          : "org.apache.hadoop:hadoop-hdfs:" + versions.hadoop,
         hadoopMRClientCommon: "org.apache.hadoop:hadoop-mapreduce-client-common:" + versions.hadoop,
         hadoopMRClientCore  : "org.apache.hadoop:hadoop-mapreduce-client-core:" + versions.hadoop,
+        hiveCli             : "org.apache.hive:hive-cli:" + versions.hive,
         hiveExecCore        : "org.apache.hive:hive-exec:" + versions.hive + ":core",
         hiveMetastore       : "org.apache.hive:hive-metastore:" + versions.hive,
         httpclient          : 'org.apache.httpcomponents:httpclient:4.5.3',
@@ -99,6 +101,7 @@ ext.deps = [
         metricsJvm          : 'io.dropwizard.metrics:metrics-jvm:3.1.0',
         mockito             : 'org.mockito:mockito-core:2.10.0',
         mysqlConnector      : 'mysql:mysql-connector-java:5.1.28',
+        pig                 : 'org.apache.pig:pig:' + versions.pig,
         quartz              : 'org.quartz-scheduler:quartz:2.2.1',
         restliGenerator     : 'com.linkedin.pegasus:generator:' + versions.restli,
         restliServer        : 'com.linkedin.pegasus:restli-server:' + versions.restli,

settings.gradle 2(+2 -0)

diff --git a/settings.gradle b/settings.gradle
index dae347c..def0782 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -27,3 +27,5 @@ include 'azkaban-solo-server'
 include 'azkaban-web-server'
 include 'az-flow-trigger-dependency-plugin'
 include 'test'
+include 'az-reportal'
+