azkaban-aplcache
Details
az-reportal/build.gradle 49(+49 -0)
diff --git a/az-reportal/build.gradle b/az-reportal/build.gradle
new file mode 100644
index 0000000..2e3cd0a
--- /dev/null
+++ b/az-reportal/build.gradle
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+apply plugin: 'distribution'
+
+dependencies {
+ compile project(':az-core')
+ compile project(":azkaban-common")
+ compile project(":azkaban-web-server")
+ compile project(":azkaban-hadoop-security-plugin")
+
+ compileOnly deps.hadoopCommon
+ compileOnly deps.hadoopMRClientCommon
+ compileOnly deps.hadoopMRClientCore
+ compileOnly (deps.hiveCli) {
+ transitive = false
+ }
+ compileOnly deps.hiveMetastore
+ compileOnly(deps.hiveExecCore) {
+ exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+ exclude group: 'eigenbase', module: 'eigenbase-properties'
+ }
+ compileOnly(deps.pig) {
+ transitive = false
+ }
+}
+
+distributions {
+ main {
+ contents {
+ from(jar) {
+ into 'lib'
+ }
+ }
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalAbstractRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalAbstractRunner.java
new file mode 100644
index 0000000..56e6d81
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalAbstractRunner.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import static azkaban.security.commons.SecurityUtils.MAPREDUCE_JOB_CREDENTIALS_BINARY;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.reportal.util.BoundedOutputStream;
+import azkaban.reportal.util.ReportalRunnerException;
+import azkaban.utils.Props;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TimeZone;
+import org.apache.hadoop.conf.Configuration;
+
+public abstract class ReportalAbstractRunner {
+
+ private static final String REPORTAL_VARIABLE_PREFIX = "reportal.variable.";
+ protected Props props;
+ protected OutputStream outputStream;
+ protected String proxyUser;
+ protected String jobQuery;
+ protected String jobTitle;
+ protected String reportalTitle;
+ protected String reportalStorageUser;
+ protected int outputCapacity;
+ protected Map<String, String> variables = new HashMap<>();
+
+ public ReportalAbstractRunner(final Properties props) {
+ final Props prop = new Props();
+ prop.put(props);
+ this.props = prop;
+ }
+
+ public void run() throws Exception {
+ System.out.println("Reportal: Setting up environment");
+
+ // Check the properties file
+ if (this.props == null) {
+ throw new ReportalRunnerException("Properties file not loaded correctly.");
+ }
+
+ // Get the hadoop token
+ final Configuration conf = new Configuration();
+ if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+ conf.set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
+ System.getenv(HADOOP_TOKEN_FILE_LOCATION));
+ }
+
+ // Get properties
+ final String execId = this.props.getString(CommonJobProperties.EXEC_ID);
+ this.outputCapacity = this.props.getInt("reportal.output.capacity", 10 * 1024 * 1024);
+ this.proxyUser = this.props.getString("reportal.proxy.user");
+ this.jobQuery = this.props.getString("reportal.job.query");
+ this.jobTitle = this.props.getString("reportal.job.title");
+ this.reportalTitle = this.props.getString("reportal.title");
+ this.reportalStorageUser = this.props.getString("reportal.storage.user", "reportal");
+ final Map<String, String> reportalVariables =
+ this.props.getMapByPrefix(REPORTAL_VARIABLE_PREFIX);
+
+ // Parse variables
+ for (final Entry<String, String> entry : reportalVariables.entrySet()) {
+ if (entry.getKey().endsWith("from")) {
+ final String fromValue = entry.getValue();
+ final String toKey =
+ entry.getKey().substring(0, entry.getKey().length() - 4) + "to";
+ final String toValue = reportalVariables.get(toKey);
+ if (toValue != null) {
+ this.variables.put(fromValue, toValue);
+ }
+ }
+ }
+
+ // Built-in variables
+ this.variables.put("run_id", execId);
+ this.variables.put("sys_date", Long.toString(System.currentTimeMillis() / 1000));
+
+ final Calendar cal = Calendar.getInstance();
+ final Date date = new Date();
+ cal.setTime(date);
+
+ final String timeZone = this.props.getString("reportal.default.timezone", "UTC");
+ TimeZone.setDefault(TimeZone.getTimeZone(timeZone));
+
+ final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ final SimpleDateFormat hourFormat = new SimpleDateFormat("yyyy-MM-dd-HH");
+
+ this.variables.put("hive_current_hour", hourFormat.format(cal.getTime()));
+ this.variables.put("hive_current_day", dateFormat.format(cal.getTime()));
+ cal.add(Calendar.HOUR, -1);
+ this.variables.put("hive_last_hour", hourFormat.format(cal.getTime()));
+ cal.add(Calendar.HOUR, 1);
+ cal.add(Calendar.DATE, -1);
+ this.variables.put("hive_yesterday", dateFormat.format(cal.getTime()));
+ cal.add(Calendar.DATE, -6);
+ this.variables.put("hive_last_seven_days", dateFormat.format(cal.getTime()));
+ cal.add(Calendar.DATE, -1);
+ this.variables.put("hive_last_eight_days", dateFormat.format(cal.getTime()));
+ this.variables.put("owner", this.proxyUser);
+ this.variables.put("title", this.reportalTitle);
+
+ // Props debug
+ System.out.println("Reportal Variables:");
+ for (final Entry<String, String> data : this.variables.entrySet()) {
+ System.out.println(data.getKey() + " -> " + data.getValue());
+ }
+
+ if (requiresOutput()) {
+ // Get output stream to data
+ final String locationTemp =
+ ("./reportal/" + this.jobTitle + ".csv").replace("//", "/");
+ final File tempOutput = new File(locationTemp);
+ tempOutput.getParentFile().mkdirs();
+ tempOutput.createNewFile();
+ this.outputStream =
+ new BoundedOutputStream(new BufferedOutputStream(
+ new FileOutputStream(tempOutput)), this.outputCapacity);
+
+ // Run the reportal
+ runReportal();
+
+ // Cleanup the reportal
+ try {
+ this.outputStream.close();
+ } catch (final IOException e) {
+ // We can safely ignore this exception since we're just making sure the
+ // stream is closed.
+ }
+ } else {
+ runReportal();
+ }
+ }
+
+ protected abstract void runReportal() throws Exception;
+
+ protected boolean requiresOutput() {
+ return true;
+ }
+
+ protected String injectVariables(String line) {
+ for (final Entry<String, String> entry : this.variables.entrySet()) {
+ line =
+ line.replace(":" + entry.getKey(), sanitizeVariable(entry.getValue()));
+ }
+ return line;
+ }
+
+ private String sanitizeVariable(final String variable) {
+ return variable.replace("'", "\\'").replace("\"", "\\\"");
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalDataCollector.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalDataCollector.java
new file mode 100644
index 0000000..9e13d15
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalDataCollector.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.reportal.util.CompositeException;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.utils.Props;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.io.IOUtils;
+
+public class ReportalDataCollector extends ReportalAbstractRunner {
+
+ Props prop;
+
+ public ReportalDataCollector(final String jobName, final Properties props) {
+ super(props);
+ this.prop = new Props();
+ this.prop.put(props);
+ }
+
+ @Override
+ protected void runReportal() throws Exception {
+ System.out.println("Reportal Data Collector: Initializing");
+
+ final String outputFileSystem =
+ this.props.getString("reportal.output.filesystem", "local");
+ final String outputBase = this.props.getString("reportal.output.dir", "/tmp/reportal");
+ final String execId = this.props.getString(CommonJobProperties.EXEC_ID);
+
+ final int jobNumber = this.prop.getInt("reportal.job.number");
+ final List<Exception> exceptions = new ArrayList<>();
+ for (int i = 0; i < jobNumber; i++) {
+ InputStream tempStream = null;
+ IStreamProvider outputProvider = null;
+ OutputStream persistentStream = null;
+ try {
+ final String jobTitle = this.prop.getString("reportal.job." + i);
+ System.out.println("Reportal Data Collector: Job name=" + jobTitle);
+
+ final String tempFileName = jobTitle + ".csv";
+ // We add the job index to the beginning of the job title to allow us to
+ // sort the files correctly.
+ final String persistentFileName = i + "-" + tempFileName;
+
+ final String subPath = "/" + execId + "/" + persistentFileName;
+ final String locationFull = (outputBase + subPath).replace("//", "/");
+ final String locationTemp = ("./reportal/" + tempFileName).replace("//", "/");
+ final File tempOutput = new File(locationTemp);
+ if (!tempOutput.exists()) {
+ throw new FileNotFoundException("File: "
+ + tempOutput.getAbsolutePath() + " does not exist.");
+ }
+
+ // Copy file to persistent saving location
+ System.out
+ .println("Reportal Data Collector: Saving output to persistent storage");
+ System.out.println("Reportal Data Collector: FS=" + outputFileSystem
+ + ", Location=" + locationFull);
+ // Open temp file
+ tempStream = new BufferedInputStream(new FileInputStream(tempOutput));
+ // Open file from HDFS if specified
+ outputProvider = ReportalUtil.getStreamProvider(outputFileSystem);
+ persistentStream = outputProvider.getFileOutputStream(locationFull);
+ // Copy it
+ IOUtils.copy(tempStream, persistentStream);
+
+ } catch (final Exception e) {
+ System.out.println("Reportal Data Collector: Data collection failed. "
+ + e.getMessage());
+ e.printStackTrace();
+ exceptions.add(e);
+ } finally {
+ IOUtils.closeQuietly(tempStream);
+ IOUtils.closeQuietly(persistentStream);
+
+ try {
+ outputProvider.cleanUp();
+ } catch (final IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if (exceptions.size() > 0) {
+ throw new CompositeException(exceptions);
+ }
+
+ System.out.println("Reportal Data Collector: Ended successfully");
+ }
+
+ @Override
+ protected boolean requiresOutput() {
+ return false;
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalHiveRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalHiveRunner.java
new file mode 100644
index 0000000..e641adb
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalHiveRunner.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import static azkaban.security.commons.SecurityUtils.MAPREDUCE_JOB_CREDENTIALS_BINARY;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+import azkaban.reportal.util.BoundedOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Scanner;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.cli.CliDriver;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.cli.OptionsProcessor;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+public class ReportalHiveRunner extends ReportalAbstractRunner {
+
+ public ReportalHiveRunner(final String jobName, final Properties props) {
+ super(props);
+ }
+
+ /**
+ * Normally hive.aux.jars.path is expanded from just being a path to the full
+ * list of files in the directory by the hive shell script. Since we normally
+ * won't be running from the script, it's up to us to do that work here. We
+ * use a heuristic that if there is no occurrence of ".jar" in the original,
+ * it needs expansion. Otherwise it's already been done for us. Also, surround
+ * the files with uri niceities.
+ */
+ static String expandHiveAuxJarsPath(final String original) throws IOException {
+ if (original == null || original.endsWith(".jar")) {
+ return original;
+ }
+
+ final File[] files = new File(original).listFiles();
+
+ if (files == null || files.length == 0) {
+ return original;
+ }
+
+ return filesToURIString(files);
+
+ }
+
+ static String filesToURIString(final File[] files) throws IOException {
+ final StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < files.length; i++) {
+ sb.append("file:///").append(files[i].getCanonicalPath());
+ if (i != files.length - 1) {
+ sb.append(",");
+ }
+ }
+
+ return sb.toString();
+ }
+
+ @Override
+ protected void runReportal() throws Exception {
+ System.out.println("Reportal Hive: Setting up Hive");
+ final HiveConf conf = new HiveConf(SessionState.class);
+
+ if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+ conf.set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
+ System.getenv(HADOOP_TOKEN_FILE_LOCATION));
+ }
+
+ final File tempTSVFile = new File("./temp.tsv");
+ final OutputStream tsvTempOutputStream =
+ new BoundedOutputStream(new BufferedOutputStream(new FileOutputStream(
+ tempTSVFile)), this.outputCapacity);
+ final PrintStream logOut = System.out;
+
+ final String orig = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+
+ final CliSessionState sessionState = new CliSessionState(conf);
+ sessionState.in = System.in;
+ sessionState.out = new PrintStream(tsvTempOutputStream, true, "UTF-8");
+ sessionState.err = new PrintStream(logOut, true, "UTF-8");
+
+ final OptionsProcessor oproc = new OptionsProcessor();
+
+ // Feed in Hive Args
+ final String[] args = buildHiveArgs();
+ if (!oproc.process_stage1(args)) {
+ throw new Exception("unable to parse options stage 1");
+ }
+
+ if (!oproc.process_stage2(sessionState)) {
+ throw new Exception("unable to parse options stage 2");
+ }
+
+ // Set all properties specified via command line
+ for (final Map.Entry<Object, Object> item : sessionState.cmdProperties.entrySet()) {
+ conf.set((String) item.getKey(), (String) item.getValue());
+ }
+
+ SessionState.start(sessionState);
+
+ final String expanded = expandHiveAuxJarsPath(orig);
+ if (orig == null || orig.equals(expanded)) {
+ System.out.println("Hive aux jars variable not expanded");
+ } else {
+ System.out.println("Expanded aux jars variable from [" + orig + "] to ["
+ + expanded + "]");
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEAUXJARS, expanded);
+ }
+
+ // hadoop-20 and above - we need to augment classpath using hiveconf
+ // components
+ // see also: code in ExecDriver.java
+ ClassLoader loader = conf.getClassLoader();
+ final String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+
+ System.out.println("Got auxJars = " + auxJars);
+
+ if (StringUtils.isNotBlank(auxJars)) {
+ loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
+ }
+ conf.setClassLoader(loader);
+ Thread.currentThread().setContextClassLoader(loader);
+
+ final CliDriver cli = new CliDriver();
+ int returnValue = 0;
+ String prefix = "";
+
+ returnValue = cli.processLine("set hive.cli.print.header=true;");
+ final String[] queries = this.jobQuery.split("\n");
+ for (String line : queries) {
+ if (!prefix.isEmpty()) {
+ prefix += '\n';
+ }
+ if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
+ line = prefix + line;
+ line = injectVariables(line);
+ System.out.println("Reportal Hive: Running Hive Query: " + line);
+ returnValue = cli.processLine(line);
+ prefix = "";
+ } else {
+ prefix = prefix + line;
+ continue;
+ }
+ }
+
+ tsvTempOutputStream.close();
+
+ // convert tsv to csv and write it do disk
+ System.out.println("Reportal Hive: Converting output");
+ final InputStream tsvTempInputStream =
+ new BufferedInputStream(new FileInputStream(tempTSVFile));
+ final Scanner rowScanner = new Scanner(tsvTempInputStream, StandardCharsets.UTF_8.toString());
+ final PrintStream csvOutputStream = new PrintStream(this.outputStream);
+ while (rowScanner.hasNextLine()) {
+ final String tsvLine = rowScanner.nextLine();
+ // strip all quotes, and then quote the columns
+ csvOutputStream.println("\""
+ + tsvLine.replace("\"", "").replace("\t", "\",\"") + "\"");
+ }
+ rowScanner.close();
+ csvOutputStream.close();
+
+ // Flush the temp file out
+ tempTSVFile.delete();
+
+ if (returnValue != 0) {
+ throw new Exception("Hive query finished with a non zero return code");
+ }
+
+ System.out.println("Reportal Hive: Ended successfully");
+ }
+
+ private String[] buildHiveArgs() {
+ final List<String> confBuilder = new ArrayList<>();
+
+ if (this.proxyUser != null) {
+ confBuilder.add("hive.exec.scratchdir=/tmp/hive-" + this.proxyUser);
+ }
+
+ if (this.jobTitle != null) {
+ confBuilder.add("mapred.job.name=\"Reportal: " + this.jobTitle + "\"");
+ }
+
+ confBuilder.add("mapreduce.job.complete.cancel.delegation.tokens=false");
+
+ final String[] args = new String[confBuilder.size() * 2];
+
+ for (int i = 0; i < confBuilder.size(); i++) {
+ args[i * 2] = "--hiveconf";
+ args[i * 2 + 1] = confBuilder.get(i);
+ }
+
+ return args;
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalPigRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalPigRunner.java
new file mode 100644
index 0000000..f17fbeb
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalPigRunner.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.reportal.util.BoundedOutputStream;
+import azkaban.reportal.util.ReportalRunnerException;
+import azkaban.utils.Props;
+import azkaban.utils.StringUtils;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Scanner;
+import java.util.Set;
+import org.apache.pig.PigRunner;
+import org.apache.pig.tools.pigstats.PigStats;
+
+public class ReportalPigRunner extends ReportalAbstractRunner {
+
+ public static final String PIG_PARAM_PREFIX = "param.";
+ public static final String PIG_PARAM_FILES = "paramfile";
+ public static final String PIG_SCRIPT = "reportal.pig.script";
+ public static final String UDF_IMPORT_LIST = "udf.import.list";
+ public static final String PIG_ADDITIONAL_JARS = "pig.additional.jars";
+ Props prop;
+
+ public ReportalPigRunner(final String jobName, final Properties props) {
+ super(props);
+ this.prop = new Props();
+ this.prop.put(props);
+ }
+
+ private static void handleError(final File pigLog) throws Exception {
+ System.out.println();
+ System.out.println("====Pig logfile dump====");
+ System.out.println("File: " + pigLog.getAbsolutePath());
+ System.out.println();
+ try {
+ final BufferedReader reader = new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(pigLog), StandardCharsets.UTF_8.toString()
+ ));
+
+ String line = reader.readLine();
+ while (line != null) {
+ System.out.println(line);
+ line = reader.readLine();
+ }
+ reader.close();
+ System.out.println();
+ System.out.println("====End logfile dump====");
+ } catch (final FileNotFoundException e) {
+ System.out.println("pig log file: " + pigLog + " not found.");
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void runReportal() throws Exception {
+ System.out.println("Reportal Pig: Setting up Pig");
+
+ injectAllVariables(this.prop.getString(PIG_SCRIPT));
+
+ final String[] args = getParams();
+
+ System.out.println("Reportal Pig: Running pig script");
+ final PrintStream oldOutputStream = System.out;
+
+ final File tempOutputFile = new File("./temp.out");
+ final OutputStream tempOutputStream =
+ new BoundedOutputStream(new BufferedOutputStream(new FileOutputStream(
+ tempOutputFile)), this.outputCapacity);
+ final PrintStream printStream = new PrintStream(tempOutputStream);
+ System.setOut(printStream);
+
+ final PigStats stats = PigRunner.run(args, null);
+
+ System.setOut(oldOutputStream);
+
+ printStream.close();
+
+ // convert pig output to csv and write it to disk
+ System.out.println("Reportal Pig: Converting output");
+ final InputStream tempInputStream =
+ new BufferedInputStream(new FileInputStream(tempOutputFile));
+ final Scanner rowScanner = new Scanner(tempInputStream, StandardCharsets.UTF_8.toString());
+ final PrintStream csvOutputStream = new PrintStream(this.outputStream);
+ while (rowScanner.hasNextLine()) {
+ String line = rowScanner.nextLine();
+ // strip all quotes, and then quote the columns
+ if (line.startsWith("(")) {
+ line = transformDumpLine(line);
+ } else {
+ line = transformDescriptionLine(line);
+ }
+ csvOutputStream.println(line);
+ }
+ rowScanner.close();
+ csvOutputStream.close();
+
+ // Flush the temp file out
+ tempOutputFile.delete();
+
+ if (!stats.isSuccessful()) {
+ System.out.println("Reportal Pig: Handling errors");
+
+ final File pigLogFile = new File("./");
+
+ final File[] fileList = pigLogFile.listFiles();
+
+ for (final File file : fileList) {
+ if (file.isFile() && file.getName().matches("^pig_.*\\.log$")) {
+ handleError(file);
+ }
+ }
+
+ // see jira ticket PIG-3313. Will remove these when we use pig binary with
+ // that patch.
+ // /////////////////////
+ System.out.println("Trying to do self kill, in case pig could not.");
+ final Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+ final Thread[] threadArray = threadSet.toArray(new Thread[threadSet.size()]);
+ for (final Thread t : threadArray) {
+ if (!t.isDaemon() && !t.equals(Thread.currentThread())) {
+ System.out.println("Killing thread " + t);
+ t.interrupt();
+ t.stop();
+ }
+ }
+ System.exit(1);
+ // ////////////////////
+
+ throw new ReportalRunnerException("Pig job failed.");
+ } else {
+ System.out.println("Reportal Pig: Ended successfully");
+ }
+ }
+
+ private String[] getParams() {
+ final ArrayList<String> list = new ArrayList<>();
+
+ final Map<String, String> map = getPigParams();
+ if (map != null) {
+ for (final Map.Entry<String, String> entry : map.entrySet()) {
+ list.add("-param");
+ list.add(StringUtils.shellQuote(
+ entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
+ }
+ }
+
+ // Run in local mode if filesystem is set to local.
+ if (this.prop.getString("reportal.output.filesystem", "local").equals("local")) {
+ list.add("-x");
+ list.add("local");
+ }
+
+ // Register any additional Pig jars
+ final String additionalPigJars = this.prop.getString(PIG_ADDITIONAL_JARS, null);
+ if (additionalPigJars != null && additionalPigJars.length() > 0) {
+ list.add("-Dpig.additional.jars=" + additionalPigJars);
+ }
+
+ // Add UDF import list
+ final String udfImportList = this.prop.getString(UDF_IMPORT_LIST, null);
+ if (udfImportList != null && udfImportList.length() > 0) {
+ list.add("-Dudf.import.list=" + udfImportList);
+ }
+
+ // Add the script to execute
+ list.add(this.prop.getString(PIG_SCRIPT));
+ return list.toArray(new String[0]);
+ }
+
+ protected Map<String, String> getPigParams() {
+ return this.prop.getMapByPrefix(PIG_PARAM_PREFIX);
+ }
+
+ private String transformDescriptionLine(final String line) {
+ final int start = line.indexOf(':');
+ String cleanLine = line;
+ if (start != -1 && start + 3 < line.length()) {
+ cleanLine = line.substring(start + 3, line.length() - 1);
+ }
+ return "\"" + cleanLine.replace("\"", "").replace(",", "\",\"") + "\"";
+ }
+
+ private String transformDumpLine(final String line) {
+ final String cleanLine = line.substring(1, line.length() - 1);
+ return "\"" + cleanLine.replace("\"", "").replace(",", "\",\"") + "\"";
+ }
+
+ private void injectAllVariables(final String file) throws FileNotFoundException {
+ // Inject variables into the script
+ System.out.println("Reportal Pig: Replacing variables");
+ final File inputFile = new File(file);
+ final File outputFile = new File(file + ".bak");
+ final InputStream scriptInputStream =
+ new BufferedInputStream(new FileInputStream(inputFile));
+ final Scanner rowScanner = new Scanner(scriptInputStream, StandardCharsets.UTF_8.toString());
+ final PrintStream scriptOutputStream =
+ new PrintStream(new BufferedOutputStream(new FileOutputStream(
+ outputFile)));
+ while (rowScanner.hasNextLine()) {
+ String line = rowScanner.nextLine();
+ line = injectVariables(line);
+ scriptOutputStream.println(line);
+ }
+ rowScanner.close();
+ scriptOutputStream.close();
+ outputFile.renameTo(inputFile);
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
new file mode 100644
index 0000000..81b0e5e
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.reportal.util.CompositeException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.sql.DataSource;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.lang.StringUtils;
+
+public class ReportalTeradataRunner extends ReportalAbstractRunner {
+
+ public ReportalTeradataRunner(final String jobName, final Properties props) {
+ super(props);
+ }
+
+ @Override
+ protected void runReportal() throws Exception {
+ System.out.println("Reportal Teradata: Setting up Teradata");
+ final List<Exception> exceptions = new ArrayList<>();
+
+ Class.forName("com.teradata.jdbc.TeraDriver");
+ final String connectionString =
+ this.props.getString("reportal.teradata.connection.string", null);
+
+ final String user = this.props.getString("reportal.teradata.username", null);
+ final String pass = this.props.getString("reportal.teradata.password", null);
+
+ final Map<String, String> queryBandProperties = new HashMap<>();
+ queryBandProperties.put("USER", this.proxyUser);
+ queryBandProperties
+ .put(CommonJobProperties.EXEC_ID, this.props.getString(CommonJobProperties.EXEC_ID));
+ queryBandProperties
+ .put(CommonJobProperties.PROJECT_NAME,
+ this.props.getString(CommonJobProperties.PROJECT_NAME));
+ queryBandProperties
+ .put(CommonJobProperties.FLOW_ID, this.props.getString(CommonJobProperties.FLOW_ID));
+ queryBandProperties
+ .put(CommonJobProperties.JOB_ID, this.props.getString(CommonJobProperties.JOB_ID));
+ final String attemptUrl = this.props.getString(CommonJobProperties.ATTEMPT_LINK);
+ queryBandProperties.put(CommonJobProperties.ATTEMPT_LINK, attemptUrl);
+ final URI attemptUri = new URI(attemptUrl);
+ queryBandProperties.put("azkaban.server", attemptUri.getHost());
+
+ if (user == null) {
+ System.out.println("Reportal Teradata: Configuration incomplete");
+ throw new RuntimeException(
+ "The reportal.teradata.username variable was not defined.");
+ }
+ if (pass == null) {
+ System.out.println("Reportal Teradata: Configuration incomplete");
+ throw new RuntimeException(
+ "The reportal.teradata.password variable was not defined.");
+ }
+
+ final DataSource teraDataSource =
+ new TeradataDataSource(connectionString, user, pass);
+ final Connection conn = teraDataSource.getConnection();
+
+ final String[] sqlQueries = cleanAndGetQueries(this.jobQuery, queryBandProperties);
+
+ final int numQueries = sqlQueries.length;
+
+ for (int i = 0; i < numQueries; i++) {
+ try {
+ final String queryLine = sqlQueries[i];
+
+ // Only store results from the last statement
+ if (i == numQueries - 1) {
+ final PreparedStatement stmt = prepareStatement(conn, queryLine);
+ stmt.execute();
+ final ResultSet rs = stmt.getResultSet();
+ outputQueryResult(rs, this.outputStream);
+ stmt.close();
+ } else {
+ try {
+ final PreparedStatement stmt = prepareStatement(conn, queryLine);
+ stmt.execute();
+ stmt.close();
+ } catch (final NullPointerException e) {
+ // An empty query (or comment) throws a NPE in JDBC. Yay!
+ System.err
+ .println("Caught NPE in execute call because report has a NOOP query: "
+ + queryLine);
+ }
+ }
+ } catch (final Exception e) {
+ // Catch and continue. Delay exception throwing until we've run all
+ // queries in this task.
+ System.out.println("Reportal Teradata: SQL query failed. "
+ + e.getMessage());
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+ }
+
+ if (exceptions.size() > 0) {
+ throw new CompositeException(exceptions);
+ }
+
+ System.out.println("Reportal Teradata: Ended successfully");
+ }
+
+ protected String[] cleanAndGetQueries(final String sqlQuery,
+ final Map<String, String> queryBandProperties) {
+
+ /**
+ * Teradata's SET Query_Band allows use to "proxy" to an LDAP user. This
+ * makes queries appear to admins as though it's being issued by the owner
+ * of the report, rather than the 'Reportal' user. Tables will still be
+ * "owned" by Reportal, but admins will be able to send angry emails to the
+ * proper user when a reportal query is impacting the system negatively.
+ * Best we could do.
+ */
+
+ final StringBuilder queryBandBuilder = new StringBuilder();
+ for (final Map.Entry<String, String> pair : queryBandProperties.entrySet()) {
+ queryBandBuilder.append("" + pair.getKey() + "=" + pair.getValue() + ";");
+ }
+
+ final String queryBand =
+ "SET Query_Band = '" + queryBandBuilder.toString()
+ + "' FOR SESSION;";
+ final ArrayList<String> injectedQueries = new ArrayList<>();
+
+ injectedQueries.add(queryBand);
+ final String[] queries = StringUtils.split(sqlQuery.trim(), ";");
+ for (String query : queries) {
+ query = cleanQueryLine(query);
+ if (query == null || query.isEmpty()) {
+ continue;
+ }
+ injectedQueries.add(query);
+ }
+
+ return injectedQueries.toArray(new String[]{});
+ }
+
+ private String cleanQueryLine(final String line) {
+ if (line != null) {
+ return line.trim();
+ }
+ return null;
+ }
+
+ private void outputQueryResult(final ResultSet result, final OutputStream outputStream)
+ throws SQLException {
+ final PrintStream outFile = new PrintStream(outputStream);
+ final String delim = ",";
+ boolean isHeaderPending = true;
+ if (result != null) {
+ while (result.next()) {
+ final int numColumns = result.getMetaData().getColumnCount();
+ final StringBuilder dataString = new StringBuilder();
+
+ if (isHeaderPending) {
+ final StringBuilder headerString = new StringBuilder();
+ for (int j = 1; j <= numColumns; j++) {
+ final String colName = formatValue(result.getMetaData().getColumnName(j));
+ if (j > 1) {
+ headerString.append(delim).append(colName);
+ } else {
+ headerString.append(colName);
+ }
+ }
+ isHeaderPending = false;
+ outFile.println(headerString.toString());
+ }
+
+ for (int j = 1; j <= numColumns; j++) {
+ String colVal = result.getString(j);
+
+ if (colVal == null) {
+ colVal = "\"null\"";
+ } else {
+ colVal = formatValue(colVal);
+ }
+
+ if (j > 1) {
+ dataString.append(delim).append(colVal);
+ } else {
+ dataString.append(colVal);
+ }
+ }
+
+ outFile.println(dataString.toString());
+ }
+ }
+ outFile.close();
+ }
+
+ private String formatValue(final String value) {
+ return "\"" + value.replace("\"", "") + "\"";
+ }
+
+ private PreparedStatement prepareStatement(final Connection conn, String line)
+ throws SQLException {
+ line = injectVariables(line);
+
+ // For some reason, teradata's adapter can't seem to handle this well
+ // List<String> variableReplacements = new ArrayList<String>();
+ //
+ // for (Entry<String, String> entry : variables.entrySet()) {
+ // String key = ":" + entry.getKey();
+ // int index;
+ // while ((index = line.indexOf(key)) != -1) {
+ // line = line.substring(0, index) + "?" + line.substring(index +
+ // key.length());
+ // variableReplacements.add(entry.getValue());
+ // }
+ // }
+
+ // StringBuilder sb = new StringBuilder();
+ final PreparedStatement stmt = conn.prepareStatement(line);
+ // for (int i = 0; i < variableReplacements.size(); i++) {
+ // stmt.setString(i + 1, variableReplacements.get(i));
+ // sb.append(variableReplacements.get(i)).append(",");
+ // }
+
+ System.out.println("Reportal Teradata: Teradata query: " + line);
+ // System.out.println("Reportal Teradata: Variables: " + sb.toString());
+ return stmt;
+ }
+
+ private static class TeradataDataSource extends BasicDataSource {
+
+ private TeradataDataSource(final String connectionString, final String user,
+ final String password) {
+ super();
+ setDriverClassName("com.teradata.jdbc.TeraDriver");
+ setUrl(connectionString);
+ setUsername(user);
+ setPassword(password);
+ }
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/BoundedOutputStream.java b/az-reportal/src/main/java/azkaban/reportal/util/BoundedOutputStream.java
new file mode 100644
index 0000000..ba29818
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/BoundedOutputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class BoundedOutputStream extends OutputStream {
+
+ OutputStream out;
+ int totalCapacity;
+ int remainingCapacity;
+ boolean hasExceededSize = false;
+ boolean havePrintedErrorMessage = false;
+
+ public BoundedOutputStream(final OutputStream out, final int size) {
+ this.out = out;
+ this.totalCapacity = size;
+ this.remainingCapacity = size;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.out.close();
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ if (this.remainingCapacity <= 0) {
+ this.hasExceededSize = true;
+ } else if (this.remainingCapacity - b.length < 0) {
+ this.out.write(b, 0, this.remainingCapacity);
+ this.remainingCapacity = 0;
+ this.hasExceededSize = true;
+ } else {
+ this.out.write(b);
+ this.remainingCapacity -= b.length;
+ }
+
+ if (this.hasExceededSize && !this.havePrintedErrorMessage) {
+ System.err.println("Output has exceeded the max limit of "
+ + this.totalCapacity + " bytes. Truncating remaining output.");
+ this.havePrintedErrorMessage = true;
+ }
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ if (this.remainingCapacity <= 0) {
+ this.hasExceededSize = true;
+ } else if (this.remainingCapacity - len < 0) {
+ this.out.write(b, off, this.remainingCapacity);
+ this.remainingCapacity = 0;
+ this.hasExceededSize = true;
+ } else {
+ this.out.write(b, off, len);
+ this.remainingCapacity -= len;
+ }
+
+ if (this.hasExceededSize && !this.havePrintedErrorMessage) {
+ System.err.println("Output has exceeded the max limit of "
+ + this.totalCapacity + " bytes. Truncating remaining output.");
+ this.havePrintedErrorMessage = true;
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ if (this.remainingCapacity <= 0) {
+ this.hasExceededSize = true;
+
+ if (!this.havePrintedErrorMessage) {
+ System.err.println("Output has exceeded the max limit of "
+ + this.totalCapacity + " bytes. Truncating remaining output.");
+ this.havePrintedErrorMessage = true;
+ }
+
+ return;
+ }
+ this.out.write(b);
+ this.remainingCapacity--;
+ }
+
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/CompositeException.java b/az-reportal/src/main/java/azkaban/reportal/util/CompositeException.java
new file mode 100644
index 0000000..41e6746
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/CompositeException.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.util.List;
+
+public class CompositeException extends Exception {
+ private static final long serialVersionUID = 1L;
+ private final List<? extends Exception> es;
+
+ public CompositeException(final List<? extends Exception> es) {
+ this.es = es;
+ }
+
+ public List<? extends Exception> getExceptions() {
+ return this.es;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder str = new StringBuilder();
+ boolean pastFirst = false;
+
+ for (final Exception e : this.es) {
+ if (pastFirst) {
+ str.append("\n\n");
+ }
+ str.append(e.toString());
+ pastFirst = true;
+ }
+
+ return str.toString();
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/IStreamProvider.java b/az-reportal/src/main/java/azkaban/reportal/util/IStreamProvider.java
new file mode 100644
index 0000000..8c374a0
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/IStreamProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface IStreamProvider {
+
+ public void setUser(String user);
+
+ public String[] getFileList(String pathString) throws Exception;
+
+ /**
+ * Returns a list of all files in a directory with a modification time less
+ * than the specified time
+ */
+ public String[] getOldFiles(String pathString, long thresholdTime)
+ throws Exception;
+
+
+ /**
+ * Deletes the file denoted by the specified path. If the file is a directory,
+ * this method recursively deletes the files in the directory and the
+ * directory itself.
+ */
+ public void deleteFile(String pathString) throws Exception;
+
+ public InputStream getFileInputStream(String pathString) throws Exception;
+
+ public OutputStream getFileOutputStream(String pathString) throws Exception;
+
+ public void cleanUp() throws IOException;
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/Reportal.java b/az-reportal/src/main/java/azkaban/reportal/util/Reportal.java
new file mode 100644
index 0000000..6d3df92
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/Reportal.java
@@ -0,0 +1,504 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.scheduler.Schedule;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.Utils;
+import azkaban.viewer.reportal.ReportalMailCreator;
+import azkaban.viewer.reportal.ReportalTypeManager;
+import java.io.File;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Weeks;
+import org.joda.time.Years;
+import org.joda.time.format.DateTimeFormat;
+
+public class Reportal {
+
+ public static final String REPORTAL_CONFIG_PREFIX = "reportal.config.";
+ public static final String REPORTAL_CONFIG_PREFIX_REGEX =
+ "^reportal[.]config[.].+";
+ public static final String REPORTAL_CONFIG_PREFIX_NEGATION_REGEX =
+ "(?!(^reportal[.]config[.])).+";
+ public static final String ACCESS_LIST_SPLIT_REGEX =
+ "\\s*,\\s*|\\s*;\\s*|\\s+";
+ // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
+ private static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+ private static final Logger logger = Logger.getLogger(Reportal.class);
+ public static Getter<Boolean> boolGetter = new Getter<>(false,
+ Boolean.class);
+ public static Getter<Integer> intGetter = new Getter<>(0,
+ Integer.class);
+ public static Getter<String> stringGetter = new Getter<>("",
+ String.class);
+ public String reportalUser;
+ public String ownerEmail;
+ public String title;
+ public String description;
+ public List<Query> queries;
+ public List<Variable> variables;
+ public boolean schedule;
+ public String scheduleHour;
+ public String scheduleMinute;
+ public String scheduleAmPm;
+ public String scheduleTimeZone;
+ public String scheduleDate;
+ public boolean scheduleRepeat;
+ public String scheduleIntervalQuantity;
+ public String scheduleInterval;
+ public String endSchedule;
+ public boolean renderResultsAsHtml;
+ public String accessViewer;
+ public String accessExecutor;
+ public String accessOwner;
+ public String notifications;
+ public String failureNotifications;
+ public Project project;
+
+ public static Reportal loadFromProject(final Project project) {
+ if (project == null) {
+ return null;
+ }
+
+ final Reportal reportal = new Reportal();
+ final Map<String, Object> metadata = project.getMetadata();
+
+ reportal.loadImmutableFromProject(project);
+
+ if (reportal.reportalUser == null || reportal.reportalUser.isEmpty()) {
+ return null;
+ }
+
+ reportal.title = stringGetter.get(metadata.get("title"));
+ reportal.description = project.getDescription();
+ final int queries = intGetter.get(project.getMetadata().get("queryNumber"));
+ final int variables = intGetter.get(project.getMetadata().get("variableNumber"));
+
+ reportal.schedule = boolGetter.get(project.getMetadata().get("schedule"));
+ reportal.scheduleHour =
+ stringGetter.get(project.getMetadata().get("scheduleHour"));
+ reportal.scheduleMinute =
+ stringGetter.get(project.getMetadata().get("scheduleMinute"));
+ reportal.scheduleAmPm =
+ stringGetter.get(project.getMetadata().get("scheduleAmPm"));
+ reportal.scheduleTimeZone =
+ stringGetter.get(project.getMetadata().get("scheduleTimeZone"));
+ reportal.scheduleDate =
+ stringGetter.get(project.getMetadata().get("scheduleDate"));
+ reportal.scheduleRepeat =
+ boolGetter.get(project.getMetadata().get("scheduleRepeat"));
+ reportal.scheduleIntervalQuantity =
+ stringGetter.get(project.getMetadata().get("scheduleIntervalQuantity"));
+ reportal.scheduleInterval =
+ stringGetter.get(project.getMetadata().get("scheduleInterval"));
+ reportal.endSchedule =
+ stringGetter.get(project.getMetadata().get("endSchedule"));
+
+ reportal.renderResultsAsHtml =
+ boolGetter.get(project.getMetadata().get("renderResultsAsHtml"));
+
+ reportal.accessViewer =
+ stringGetter.get(project.getMetadata().get("accessViewer"));
+ reportal.accessExecutor =
+ stringGetter.get(project.getMetadata().get("accessExecutor"));
+ reportal.accessOwner =
+ stringGetter.get(project.getMetadata().get("accessOwner"));
+
+ reportal.notifications =
+ stringGetter.get(project.getMetadata().get("notifications"));
+ reportal.failureNotifications =
+ stringGetter.get(project.getMetadata().get("failureNotifications"));
+
+ reportal.queries = new ArrayList<>();
+
+ for (int i = 0; i < queries; i++) {
+ final Query query = new Query();
+ reportal.queries.add(query);
+ query.title =
+ stringGetter.get(project.getMetadata().get("query" + i + "title"));
+ query.type =
+ stringGetter.get(project.getMetadata().get("query" + i + "type"));
+ query.script =
+ stringGetter.get(project.getMetadata().get("query" + i + "script"));
+ }
+
+ reportal.variables = new ArrayList<>();
+
+ for (int i = 0; i < variables; i++) {
+ final String title =
+ stringGetter.get(project.getMetadata().get("variable" + i + "title"));
+ final String name =
+ stringGetter.get(project.getMetadata().get("variable" + i + "name"));
+ final Variable variable = new Variable(title, name);
+ reportal.variables.add(variable);
+ }
+
+ reportal.project = project;
+
+ return reportal;
+ }
+
+ public void saveToProject(final Project project) {
+ this.project = project;
+
+ project.getMetadata().put("reportal-user", this.reportalUser);
+ project.getMetadata().put("owner-email", this.ownerEmail);
+
+ project.getMetadata().put("title", this.title);
+ project.setDescription(this.description);
+
+ project.getMetadata().put("schedule", this.schedule);
+ project.getMetadata().put("scheduleHour", this.scheduleHour);
+ project.getMetadata().put("scheduleMinute", this.scheduleMinute);
+ project.getMetadata().put("scheduleAmPm", this.scheduleAmPm);
+ project.getMetadata().put("scheduleTimeZone", this.scheduleTimeZone);
+ project.getMetadata().put("scheduleDate", this.scheduleDate);
+ project.getMetadata().put("scheduleRepeat", this.scheduleRepeat);
+ project.getMetadata().put("scheduleIntervalQuantity",
+ this.scheduleIntervalQuantity);
+ project.getMetadata().put("scheduleInterval", this.scheduleInterval);
+ project.getMetadata().put("endSchedule", this.endSchedule);
+
+ project.getMetadata().put("renderResultsAsHtml", this.renderResultsAsHtml);
+
+ project.getMetadata().put("accessViewer", this.accessViewer);
+ project.getMetadata().put("accessExecutor", this.accessExecutor);
+ project.getMetadata().put("accessOwner", this.accessOwner);
+
+ project.getMetadata().put("queryNumber", this.queries.size());
+ for (int i = 0; i < this.queries.size(); i++) {
+ final Query query = this.queries.get(i);
+ project.getMetadata().put("query" + i + "title", query.title);
+ project.getMetadata().put("query" + i + "type", query.type);
+ project.getMetadata().put("query" + i + "script", query.script);
+ }
+
+ project.getMetadata().put("variableNumber", this.variables.size());
+ for (int i = 0; i < this.variables.size(); i++) {
+ final Variable variable = this.variables.get(i);
+ project.getMetadata().put("variable" + i + "title", variable.title);
+ project.getMetadata().put("variable" + i + "name", variable.name);
+ }
+
+ project.getMetadata().put("notifications", this.notifications);
+ project.getMetadata().put("failureNotifications", this.failureNotifications);
+ }
+
+ public void removeSchedules(final ScheduleManager scheduleManager)
+ throws ScheduleManagerException {
+ final List<Flow> flows = this.project.getFlows();
+ for (final Flow flow : flows) {
+ final Schedule sched =
+ scheduleManager.getSchedule(this.project.getId(), flow.getId());
+ if (sched != null) {
+ scheduleManager.removeSchedule(sched);
+ }
+ }
+ }
+
+ public void updateSchedules(final Reportal report, final ScheduleManager scheduleManager,
+ final User user, final Flow flow) throws ScheduleManagerException {
+ // Clear previous schedules
+ removeSchedules(scheduleManager);
+ // Add new schedule
+ if (this.schedule) {
+ final int hour =
+ (Integer.parseInt(this.scheduleHour) % 12)
+ + (this.scheduleAmPm.equalsIgnoreCase("pm") ? 12 : 0);
+ final int minute = Integer.parseInt(this.scheduleMinute) % 60;
+ final DateTimeZone timeZone =
+ this.scheduleTimeZone.equalsIgnoreCase("UTC") ? DateTimeZone.UTC
+ : DateTimeZone.getDefault();
+ DateTime firstSchedTime =
+ DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timeZone)
+ .parseDateTime(this.scheduleDate);
+ firstSchedTime =
+ firstSchedTime.withHourOfDay(hour).withMinuteOfHour(minute)
+ .withSecondOfMinute(0).withMillisOfSecond(0);
+
+ ReadablePeriod period = null;
+ if (this.scheduleRepeat) {
+ final int intervalQuantity = Integer.parseInt(this.scheduleIntervalQuantity);
+
+ if (this.scheduleInterval.equals("y")) {
+ period = Years.years(intervalQuantity);
+ } else if (this.scheduleInterval.equals("m")) {
+ period = Months.months(intervalQuantity);
+ } else if (this.scheduleInterval.equals("w")) {
+ period = Weeks.weeks(intervalQuantity);
+ } else if (this.scheduleInterval.equals("d")) {
+ period = Days.days(intervalQuantity);
+ } else if (this.scheduleInterval.equals("h")) {
+ period = Hours.hours(intervalQuantity);
+ } else if (this.scheduleInterval.equals("M")) {
+ period = Minutes.minutes(intervalQuantity);
+ }
+ }
+
+ final ExecutionOptions options = new ExecutionOptions();
+ options.getFlowParameters().put("reportal.execution.user",
+ user.getUserId());
+ options.getFlowParameters().put("reportal.title", report.title);
+ options.getFlowParameters().put("reportal.render.results.as.html",
+ report.renderResultsAsHtml ? "true" : "false");
+ options.setMailCreator(ReportalMailCreator.REPORTAL_MAIL_CREATOR);
+
+ final long endScheduleTime = report.endSchedule == null ?
+ DEFAULT_SCHEDULE_END_EPOCH_TIME : parseDateToEpoch(report.endSchedule);
+
+ logger.info("This report scheudle end time is " + endScheduleTime);
+
+ scheduleManager.scheduleFlow(-1, this.project.getId(), this.project.getName(),
+ flow.getId(), "ready", firstSchedTime.getMillis(), endScheduleTime,
+ firstSchedTime.getZone(), period, DateTime.now().getMillis(),
+ firstSchedTime.getMillis(), firstSchedTime.getMillis(),
+ user.getUserId(), options, null);
+ }
+ }
+
+ private long parseDateToEpoch(final String date) throws ScheduleManagerException {
+ final DateFormat dffrom = new SimpleDateFormat("MM/dd/yyyy h:mm a");
+ try {
+ // this string will be parsed according to system's timezone setting.
+ return dffrom.parse(date).getTime();
+ } catch (final Exception ex) {
+ throw new ScheduleManagerException("can not parse this date " + date);
+ }
+ }
+
+ /**
+ * Updates the project permissions in MEMORY, but does NOT update the project
+ * in the database.
+ */
+ public void updatePermissions() {
+ final String[] accessViewerList =
+ this.accessViewer.trim().split(ACCESS_LIST_SPLIT_REGEX);
+ final String[] accessExecutorList =
+ this.accessExecutor.trim().split(ACCESS_LIST_SPLIT_REGEX);
+ final String[] accessOwnerList =
+ this.accessOwner.trim().split(ACCESS_LIST_SPLIT_REGEX);
+ // Prepare permission types
+ final Permission admin = new Permission();
+ admin.addPermission(Type.READ);
+ admin.addPermission(Type.EXECUTE);
+ admin.addPermission(Type.ADMIN);
+ final Permission executor = new Permission();
+ executor.addPermission(Type.READ);
+ executor.addPermission(Type.EXECUTE);
+ final Permission viewer = new Permission();
+ viewer.addPermission(Type.READ);
+ // Sets the permissions
+ this.project.clearUserPermission();
+ for (String user : accessViewerList) {
+ user = user.trim();
+ if (!user.isEmpty()) {
+ this.project.setUserPermission(user, viewer);
+ }
+ }
+ for (String user : accessExecutorList) {
+ user = user.trim();
+ if (!user.isEmpty()) {
+ this.project.setUserPermission(user, executor);
+ }
+ }
+ for (String user : accessOwnerList) {
+ user = user.trim();
+ if (!user.isEmpty()) {
+ this.project.setUserPermission(user, admin);
+ }
+ }
+ this.project.setUserPermission(this.reportalUser, admin);
+ }
+
+ public void createZipAndUpload(final ProjectManager projectManager, final User user,
+ final String reportalStorageUser) throws Exception {
+ // Create temp folder to make the zip file for upload
+ final File tempDir = Utils.createTempDir();
+ final File dataDir = new File(tempDir, "data");
+ dataDir.mkdirs();
+
+ // Create all job files
+ String dependentJob = null;
+ final List<String> jobs = new ArrayList<>();
+ final Map<String, String> extraProps =
+ ReportalUtil.getVariableMapByPrefix(this.variables, REPORTAL_CONFIG_PREFIX);
+ for (final Query query : this.queries) {
+ // Create .job file
+ final File jobFile =
+ ReportalHelper.findAvailableFileName(dataDir,
+ ReportalHelper.sanitizeText(query.title), ".job");
+
+ final String fileName = jobFile.getName();
+ final String jobName = fileName.substring(0, fileName.length() - 4);
+ jobs.add(jobName);
+
+ // Populate the job file
+ ReportalTypeManager.createJobAndFiles(this, jobFile, jobName,
+ query.title, query.type, query.script, dependentJob, this.reportalUser,
+ extraProps);
+
+ // For dependency of next query
+ dependentJob = jobName;
+ }
+
+ // Create the data collector job
+ if (dependentJob != null) {
+ final String jobName = "data-collector";
+
+ // Create .job file
+ final File jobFile =
+ ReportalHelper.findAvailableFileName(dataDir,
+ ReportalHelper.sanitizeText(jobName), ".job");
+ final Map<String, String> extras = new HashMap<>();
+ extras.put("reportal.job.number", Integer.toString(jobs.size()));
+ for (int i = 0; i < jobs.size(); i++) {
+ extras.put("reportal.job." + i, jobs.get(i));
+ }
+ ReportalTypeManager.createJobAndFiles(this, jobFile, jobName, "",
+ ReportalTypeManager.DATA_COLLECTOR_JOB, "", dependentJob,
+ reportalStorageUser, extras);
+ }
+
+ // Zip jobs together
+ final File archiveFile = new File(tempDir, this.project.getName() + ".zip");
+ Utils.zipFolderContent(dataDir, archiveFile);
+
+ // Upload zip
+ projectManager.uploadProject(this.project, archiveFile, "zip", user, null);
+
+ // Empty temp
+ if (tempDir.exists()) {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ }
+
+ public void loadImmutableFromProject(final Project project) {
+ this.reportalUser = stringGetter.get(project.getMetadata().get("reportal-user"));
+ this.ownerEmail = stringGetter.get(project.getMetadata().get("owner-email"));
+ }
+
+ /**
+ * @return A set of users explicitly granted viewer access to the report.
+ */
+ public Set<String> getAccessViewers() {
+ final Set<String> viewers = new HashSet<>();
+ for (final String user : this.accessViewer.trim().split(ACCESS_LIST_SPLIT_REGEX)) {
+ if (!user.isEmpty()) {
+ viewers.add(user);
+ }
+ }
+ return viewers;
+ }
+
+ /**
+ * @return A set of users explicitly granted executor access to the report.
+ */
+ public Set<String> getAccessExecutors() {
+ final Set<String> executors = new HashSet<>();
+ for (final String user : this.accessExecutor.trim().split(ACCESS_LIST_SPLIT_REGEX)) {
+ if (!user.isEmpty()) {
+ executors.add(user);
+
+ }
+ }
+ return executors;
+ }
+
+ public static class Getter<T> {
+
+ Class<?> cls;
+ T defaultValue;
+
+ public Getter(final T defaultValue, final Class<?> cls) {
+ this.cls = cls;
+ this.defaultValue = defaultValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T get(final Object object) {
+ if (object == null || !(this.cls.isAssignableFrom(object.getClass()))) {
+ return this.defaultValue;
+ }
+ return (T) object;
+ }
+ }
+
+ public static class Query {
+
+ public String title;
+ public String type;
+ public String script;
+
+ public String getTitle() {
+ return this.title;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public String getScript() {
+ return this.script;
+ }
+ }
+
+ public static class Variable {
+
+ public String title;
+ public String name;
+
+ public Variable() {
+ }
+
+ public Variable(final String title, final String name) {
+ this.title = title;
+ this.name = name;
+ }
+
+ public String getTitle() {
+ return this.title;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalHelper.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalHelper.java
new file mode 100644
index 0000000..18ceb73
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalHelper.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.User;
+import azkaban.webapp.AzkabanWebServer;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+import org.apache.commons.lang.StringUtils;
+
+public class ReportalHelper {
+
+ public static List<Project> getReportalProjects(final AzkabanWebServer server) {
+ final List<Project> projects = server.getProjectManager().getProjects();
+
+ final List<Project> reportalProjects = new ArrayList<>();
+
+ for (final Project project : projects) {
+ if (project.getMetadata().containsKey("reportal-user")) {
+ reportalProjects.add(project);
+ }
+ }
+
+ return reportalProjects;
+ }
+
+ public static void bookmarkProject(final AzkabanWebServer server, final Project project,
+ final User user) throws ProjectManagerException {
+ project.getMetadata().put("bookmark-" + user.getUserId(), true);
+ server.getProjectManager().updateProjectSetting(project);
+ }
+
+ public static void unBookmarkProject(final AzkabanWebServer server,
+ final Project project, final User user) throws ProjectManagerException {
+ project.getMetadata().remove("bookmark-" + user.getUserId());
+ server.getProjectManager().updateProjectSetting(project);
+ }
+
+ public static boolean isBookmarkProject(final Project project, final User user) {
+ return project.getMetadata().containsKey("bookmark-" + user.getUserId());
+ }
+
+ public static void subscribeProject(final AzkabanWebServer server, final Project project,
+ final User user, final String email) throws ProjectManagerException {
+ @SuppressWarnings("unchecked")
+ Map<String, String> subscription =
+ (Map<String, String>) project.getMetadata().get("subscription");
+ if (subscription == null) {
+ subscription = new HashMap<>();
+ }
+
+ if (email != null && !email.isEmpty()) {
+ subscription.put(user.getUserId(), email);
+ }
+
+ project.getMetadata().put("subscription", subscription);
+ updateProjectNotifications(project, server.getProjectManager());
+ server.getProjectManager().updateProjectSetting(project);
+ }
+
+ public static void unSubscribeProject(final AzkabanWebServer server,
+ final Project project, final User user) throws ProjectManagerException {
+ @SuppressWarnings("unchecked") final Map<String, String> subscription =
+ (Map<String, String>) project.getMetadata().get("subscription");
+ if (subscription == null) {
+ return;
+ }
+ subscription.remove(user.getUserId());
+ project.getMetadata().put("subscription", subscription);
+ updateProjectNotifications(project, server.getProjectManager());
+ server.getProjectManager().updateProjectSetting(project);
+ }
+
+ public static boolean isSubscribeProject(final Project project, final User user) {
+ @SuppressWarnings("unchecked") final Map<String, String> subscription =
+ (Map<String, String>) project.getMetadata().get("subscription");
+ if (subscription == null) {
+ return false;
+ }
+ return subscription.containsKey(user.getUserId());
+ }
+
+ /**
+ * Updates the email notifications saved in the project's flow.
+ */
+ public static void updateProjectNotifications(final Project project,
+ final ProjectManager pm) throws ProjectManagerException {
+ final Flow flow = project.getFlows().get(0);
+
+ // Get all success emails.
+ final ArrayList<String> successEmails = new ArrayList<>();
+ final String successNotifications =
+ (String) project.getMetadata().get("notifications");
+ final String[] successEmailSplit =
+ successNotifications.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ successEmails.addAll(Arrays.asList(successEmailSplit));
+
+ // Get all failure emails.
+ final ArrayList<String> failureEmails = new ArrayList<>();
+ final String failureNotifications =
+ (String) project.getMetadata().get("failureNotifications");
+ final String[] failureEmailSplit =
+ failureNotifications.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ failureEmails.addAll(Arrays.asList(failureEmailSplit));
+
+ // Add subscription emails to success emails list.
+ @SuppressWarnings("unchecked") final Map<String, String> subscription =
+ (Map<String, String>) project.getMetadata().get("subscription");
+ if (subscription != null) {
+ successEmails.addAll(subscription.values());
+ }
+
+ final ArrayList<String> successEmailList = new ArrayList<>();
+ for (final String email : successEmails) {
+ if (!email.trim().isEmpty()) {
+ successEmailList.add(email);
+ }
+ }
+
+ final ArrayList<String> failureEmailList = new ArrayList<>();
+ for (final String email : failureEmails) {
+ if (!email.trim().isEmpty()) {
+ failureEmailList.add(email);
+ }
+ }
+
+ // Save notifications in the flow.
+ flow.getSuccessEmails().clear();
+ flow.getFailureEmails().clear();
+ flow.addSuccessEmails(successEmailList);
+ flow.addFailureEmails(failureEmailList);
+ pm.updateFlow(project, flow);
+ }
+
+ public static boolean isScheduledProject(final Project project) {
+ final Object schedule = project.getMetadata().get("schedule");
+ if (schedule == null || !(schedule instanceof Boolean)) {
+ return false;
+ }
+ return (boolean) (Boolean) schedule;
+ }
+
+ public static boolean isScheduledRepeatingProject(final Project project) {
+ final Object schedule = project.getMetadata().get("scheduleRepeat");
+ if (schedule == null || !(schedule instanceof Boolean)) {
+ return false;
+ }
+ return (boolean) (Boolean) schedule;
+ }
+
+ public static List<Project> getUserReportalProjects(final AzkabanWebServer server,
+ final String userName) throws ProjectManagerException {
+ final ProjectManager projectManager = server.getProjectManager();
+ final List<Project> projects = projectManager.getProjects();
+ final List<Project> result = new ArrayList<>();
+
+ for (final Project project : projects) {
+ if (userName.equals(project.getMetadata().get("reportal-user"))) {
+ result.add(project);
+ }
+ }
+
+ return result;
+ }
+
+ public static Project createReportalProject(final AzkabanWebServer server,
+ final String title, final String description, final User user)
+ throws ProjectManagerException {
+ final ProjectManager projectManager = server.getProjectManager();
+ final String projectName =
+ "reportal-" + user.getUserId() + "-" + sanitizeText(title);
+ Project project = projectManager.getProject(projectName);
+ if (project != null) {
+ return null;
+ }
+ project = projectManager.createProject(projectName, description, user);
+
+ return project;
+ }
+
+ public static String sanitizeText(final String text) {
+ return text.replaceAll("[^A-Za-z0-9]", "-");
+ }
+
+ public static File findAvailableFileName(final File parent, String name,
+ final String extension) {
+ if (name.isEmpty()) {
+ name = "untitled";
+ }
+ File file = new File(parent, name + extension);
+ int i = 1;
+ while (file.exists()) {
+ file = new File(parent, name + "-" + i + extension);
+ i++;
+ }
+ return file;
+ }
+
+ public static String prepareStringForJS(final Object object) {
+ return object.toString().replace("\r", "").replace("\n", "\\n");
+ }
+
+ public static String[] filterCSVFile(final String[] files) {
+ final List<String> result = new ArrayList<>();
+ for (int i = 0; i < files.length; i++) {
+ if (StringUtils.endsWithIgnoreCase(files[i], ".csv")) {
+ result.add(files[i]);
+ }
+ }
+ return result.toArray(new String[result.size()]);
+ }
+
+ /**
+ * Given a string containing multiple emails, splits it based on the given
+ * regular expression, and returns a set containing the unique, non-empty
+ * emails.
+ */
+ public static Set<String> parseUniqueEmails(final String emailList,
+ final String splitRegex) {
+ final Set<String> uniqueEmails = new HashSet<>();
+
+ if (emailList == null) {
+ return uniqueEmails;
+ }
+
+ final String[] emails = emailList.trim().split(splitRegex);
+ for (final String email : emails) {
+ if (!email.isEmpty()) {
+ uniqueEmails.add(email);
+ }
+ }
+
+ return uniqueEmails;
+ }
+
+ /**
+ * Returns true if the given email is valid and false otherwise.
+ */
+ public static boolean isValidEmailAddress(final String email) {
+ if (email == null) {
+ return false;
+ }
+
+ boolean result = true;
+ try {
+ final InternetAddress emailAddr = new InternetAddress(email);
+ emailAddr.validate();
+ } catch (final AddressException ex) {
+ result = false;
+ }
+ return result;
+ }
+
+ /**
+ * Given an email string, returns the domain part if it exists, and null
+ * otherwise.
+ */
+ public static String getEmailDomain(final String email) {
+ if (email == null || email.isEmpty()) {
+ return null;
+ }
+
+ final int atSignIndex = email.indexOf('@');
+ if (atSignIndex != -1) {
+ return email.substring(atSignIndex + 1);
+ }
+
+ return null;
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalRunnerException.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalRunnerException.java
new file mode 100644
index 0000000..0fa4cb8
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalRunnerException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+public class ReportalRunnerException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public ReportalRunnerException(final String s) {
+ super(s);
+ }
+
+ public ReportalRunnerException(final Exception e) {
+ super(e);
+ }
+
+ public ReportalRunnerException(final String s, final Exception e) {
+ super(s, e);
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
new file mode 100644
index 0000000..104616a
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.reportal.util.Reportal.Variable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReportalUtil {
+
+ public static IStreamProvider getStreamProvider(final String fileSystem) {
+ if (fileSystem.equalsIgnoreCase("hdfs")) {
+ return new StreamProviderHDFS();
+ }
+ return new StreamProviderLocal();
+ }
+
+ /**
+ * Returns a list of the executable nodes in the specified flow in execution
+ * order. Assumes that the flow is linear.
+ *
+ * @param nodes
+ * @return
+ */
+ public static List<ExecutableNode> sortExecutableNodes(final ExecutableFlow flow) {
+ final List<ExecutableNode> sortedNodes = new ArrayList<>();
+
+ if (flow != null) {
+ final List<String> startNodeIds = flow.getStartNodes();
+
+ String nextNodeId = startNodeIds.isEmpty() ? null : startNodeIds.get(0);
+
+ while (nextNodeId != null) {
+ final ExecutableNode node = flow.getExecutableNode(nextNodeId);
+ sortedNodes.add(node);
+
+ final Set<String> outNodes = node.getOutNodes();
+ nextNodeId = outNodes.isEmpty() ? null : outNodes.iterator().next();
+ }
+ }
+
+ return sortedNodes;
+ }
+
+ /**
+ * Get runtime variables to be set in unscheduled mode of execution.
+ * Returns empty list, if no runtime variable is found
+ *
+ * @param variables
+ * @return
+ */
+ public static List<Variable> getRunTimeVariables(
+ final Collection<Variable> variables) {
+ final List<Variable> runtimeVariables =
+ ReportalUtil.getVariablesByRegex(variables,
+ Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);
+
+ return runtimeVariables;
+ }
+
+ /**
+ * Shortlist variables which match a given regex. Returns empty empty list, if no
+ * eligible variable is found
+ *
+ * @param variables
+ * @param regex
+ * @return
+ */
+ public static List<Variable> getVariablesByRegex(
+ final Collection<Variable> variables, final String regex) {
+ final List<Variable> shortlistedVariables = new ArrayList<>();
+ if (variables != null && regex != null) {
+ for (final Variable var : variables) {
+ if (var.getTitle().matches(regex)) {
+ shortlistedVariables.add(var);
+ }
+ }
+ }
+ return shortlistedVariables;
+ }
+
+ /**
+ * Shortlist variables which match a given prefix. Returns empty map, if no
+ * eligible variable is found.
+ *
+ * @param variables
+ * variables to be processed
+ * @param prefix
+ * prefix to be matched
+ * @return a map with shortlisted variables and prefix removed
+ */
+ public static Map<String, String> getVariableMapByPrefix(
+ final Collection<Variable> variables, final String prefix) {
+ final Map<String, String> shortlistMap = new HashMap<>();
+ if (variables!=null && prefix != null) {
+ for (final Variable var : getVariablesByRegex(variables,
+ Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
+ shortlistMap
+ .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
+ }
+ }
+ return shortlistMap;
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderHDFS.java b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderHDFS.java
new file mode 100644
index 0000000..776e4ce
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderHDFS.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.security.commons.HadoopSecurityManagerException;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class StreamProviderHDFS implements IStreamProvider {
+
+ FileSystem hdfs;
+ HadoopSecurityManager securityManager;
+ String username;
+
+ public void setHadoopSecurityManager(final HadoopSecurityManager securityManager) {
+ this.securityManager = securityManager;
+ }
+
+ @Override
+ public void setUser(final String user) {
+ this.username = user;
+ }
+
+ @Override
+ public String[] getFileList(final String pathString)
+ throws HadoopSecurityManagerException, IOException {
+ final FileStatus[] statusList = getFileStatusList(pathString);
+ final String[] fileList = new String[statusList.length];
+
+ for (int i = 0; i < statusList.length; i++) {
+ fileList[i] = statusList[i].getPath().getName();
+ }
+
+ return fileList;
+ }
+
+ @Override
+ public String[] getOldFiles(final String pathString, final long thresholdTime)
+ throws Exception {
+ final FileStatus[] statusList = getFileStatusList(pathString);
+
+ final List<String> oldFiles = new ArrayList<>();
+
+ for (final FileStatus fs : statusList) {
+ if (fs.getModificationTime() < thresholdTime) {
+ oldFiles.add(fs.getPath().getName());
+ }
+ }
+
+ return oldFiles.toArray(new String[0]);
+ }
+
+ @Override
+ public void deleteFile(final String pathString) throws Exception {
+ ensureHdfs();
+
+ try {
+ this.hdfs.delete(new Path(pathString), true);
+ } catch (final IOException e) {
+ cleanUp();
+ }
+ }
+
+ @Override
+ public InputStream getFileInputStream(final String pathString) throws Exception {
+ ensureHdfs();
+
+ final Path path = new Path(pathString);
+
+ return new BufferedInputStream(this.hdfs.open(path));
+ }
+
+ @Override
+ public OutputStream getFileOutputStream(final String pathString) throws Exception {
+ ensureHdfs();
+
+ final Path path = new Path(pathString);
+
+ return new BufferedOutputStream(this.hdfs.create(path, true));
+ }
+
+ @Override
+ public void cleanUp() throws IOException {
+ if (this.hdfs != null) {
+ this.hdfs.close();
+ this.hdfs = null;
+ }
+ }
+
+ private void ensureHdfs() throws HadoopSecurityManagerException, IOException {
+ if (this.hdfs == null) {
+ if (this.securityManager == null) {
+ this.hdfs = FileSystem.get(new Configuration());
+ } else {
+ this.hdfs = this.securityManager.getFSAsUser(this.username);
+ }
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ cleanUp();
+ super.finalize();
+ }
+
+ /**
+ * Returns an array of the file statuses of the files/directories in the given
+ * path if it is a directory and an empty array otherwise.
+ */
+ private FileStatus[] getFileStatusList(final String pathString)
+ throws HadoopSecurityManagerException, IOException {
+ ensureHdfs();
+
+ final Path path = new Path(pathString);
+ FileStatus pathStatus = null;
+ try {
+ pathStatus = this.hdfs.getFileStatus(path);
+ } catch (final IOException e) {
+ cleanUp();
+ }
+
+ if (pathStatus != null && pathStatus.isDir()) {
+ return this.hdfs.listStatus(path);
+ }
+
+ return new FileStatus[0];
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderLocal.java b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderLocal.java
new file mode 100644
index 0000000..5836b3d
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/reportal/util/StreamProviderLocal.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.commons.io.FileUtils;
+
+public class StreamProviderLocal implements IStreamProvider {
+
+ @Override
+ public void setUser(final String user) {
+ }
+
+ @Override
+ public String[] getFileList(final String pathString) throws IOException {
+ final File file = new File(pathString);
+
+ if (file.exists() && file.isDirectory()) {
+ return file.list();
+ }
+
+ return new String[0];
+ }
+
+ @Override
+ public String[] getOldFiles(final String pathString, final long thresholdTime)
+ throws Exception {
+ final File file = new File(pathString);
+
+ if (!file.exists() || !file.isDirectory()) {
+ return new String[0];
+ }
+
+ final File[] fileList = file.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(final File file) {
+ return file.lastModified() < thresholdTime;
+ }
+ });
+
+ final String[] files = new String[fileList.length];
+ for (int i = 0; i < fileList.length; i++) {
+ files[i] = fileList[i].getName();
+ }
+
+ return files;
+ }
+
+ @Override
+ public void deleteFile(final String pathString) throws Exception {
+ FileUtils.deleteDirectory(new File(pathString));
+ }
+
+ @Override
+ public InputStream getFileInputStream(final String pathString) throws IOException {
+
+ final File inputFile = new File(pathString);
+
+ inputFile.getParentFile().mkdirs();
+ inputFile.createNewFile();
+
+ return new BufferedInputStream(new FileInputStream(inputFile));
+ }
+
+ @Override
+ public OutputStream getFileOutputStream(final String pathString) throws IOException {
+
+ final File outputFile = new File(pathString);
+
+ outputFile.getParentFile().mkdirs();
+ outputFile.createNewFile();
+
+ return new BufferedOutputStream(new FileOutputStream(outputFile));
+ }
+
+ @Override
+ public void cleanUp() throws IOException {
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
new file mode 100644
index 0000000..fb09405
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringEscapeUtils;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
+import azkaban.project.Project;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.ReportalHelper;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.reportal.util.StreamProviderHDFS;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.EmailMessage;
+import azkaban.webapp.AzkabanWebServer;
+
+public class ReportalMailCreator implements MailCreator {
+ public static AzkabanWebServer azkaban = null;
+ public static HadoopSecurityManager hadoopSecurityManager = null;
+ public static String outputLocation = "";
+ public static String outputFileSystem = "";
+ public static String reportalStorageUser = "";
+ public static File reportalMailTempDirectory;
+ public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
+ public static final int NUM_PREVIEW_ROWS = 50;
+ //Attachment that equal or larger than 10MB will be skipped in the email
+ public static final long MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024L;
+
+ static {
+ DefaultMailCreator.registerCreator(REPORTAL_MAIL_CREATOR,
+ new ReportalMailCreator());
+ }
+
+ @Override
+ public boolean createFirstErrorMessage(ExecutableFlow flow,
+ EmailMessage message, String azkabanName, String scheme,
+ String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ Set<String> emailList = new HashSet<String>(option.getFailureEmails());
+
+ return createEmail(flow, emailList, message, "Failure", azkabanName,
+ scheme, clientHostname, clientPortNumber, false);
+ }
+
+ @Override
+ public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message,
+ String azkabanName, String scheme, String clientHostname,
+ String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ Set<String> emailList = new HashSet<String>(option.getFailureEmails());
+
+ return createEmail(flow, emailList, message, "Failure", azkabanName,
+ scheme, clientHostname, clientPortNumber, false);
+ }
+
+ @Override
+ public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
+ String azkabanName, String scheme, String clientHostname,
+ String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ Set<String> emailList = new HashSet<String>(option.getSuccessEmails());
+
+ return createEmail(flow, emailList, message, "Success", azkabanName,
+ scheme, clientHostname, clientPortNumber, false);
+ }
+
+ private boolean createEmail(ExecutableFlow flow, Set<String> emailList,
+ EmailMessage message, String status, String azkabanName, String scheme,
+ String clientHostname, String clientPortNumber, boolean printData) {
+
+ Project project =
+ azkaban.getProjectManager().getProject(flow.getProjectId());
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Report " + status + ": "
+ + project.getMetadata().get("title"));
+ String urlPrefix =
+ scheme + "://" + clientHostname + ":" + clientPortNumber
+ + "/reportal";
+ try {
+ return createMessage(project, flow, message, urlPrefix, printData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ return false;
+ }
+
+ private boolean createMessage(Project project, ExecutableFlow flow,
+ EmailMessage message, String urlPrefix, boolean printData)
+ throws Exception {
+
+ // set mail content type to be "multipart/mixed" as we are customizing the main content.
+ // failed to to this may result in trouble accessing attachment when mail is viewed from IOS mail app.
+ message.enableAttachementEmbedment(false);
+
+ message.println("<html>");
+ message.println("<head></head>");
+ message
+ .println("<body style='font-family: verdana; color: #000000; background-color: #cccccc; padding: 20px;'>");
+ message
+ .println("<div style='background-color: #ffffff; border: 1px solid #aaaaaa; padding: 20px;-webkit-border-radius: 15px; -moz-border-radius: 15px; border-radius: 15px;'>");
+ // Title
+ message.println("<b>" + project.getMetadata().get("title") + "</b>");
+ message
+ .println("<div style='font-size: .8em; margin-top: .5em; margin-bottom: .5em;'>");
+ // Status
+ message.println(flow.getStatus().name());
+ // Link to View
+ message.println("(<a href='" + urlPrefix + "?view&id="
+ + flow.getProjectId() + "'>View</a>)");
+ // Link to logs
+ message.println("(<a href='" + urlPrefix + "?view&logs&id="
+ + flow.getProjectId() + "&execid=" + flow.getExecutionId()
+ + "'>Logs</a>)");
+ // Link to Data
+ message.println("(<a href='" + urlPrefix + "?view&id="
+ + flow.getProjectId() + "&execid=" + flow.getExecutionId()
+ + "'>Result data</a>)");
+ // Link to Edit
+ message.println("(<a href='" + urlPrefix + "?edit&id="
+ + flow.getProjectId() + "'>Edit</a>)");
+ message.println("</div>");
+ message.println("<div style='margin-top: .5em; margin-bottom: .5em;'>");
+ // Description
+ message.println(project.getDescription());
+ message.println("</div>");
+
+ // Print variable values, if any
+ Map<String, String> flowParameters =
+ flow.getExecutionOptions().getFlowParameters();
+ int i = 0;
+ while (flowParameters.containsKey("reportal.variable." + i + ".from")) {
+ if (i == 0) {
+ message
+ .println("<div style='margin-top: 10px; margin-bottom: 10px; border-bottom: 1px solid #ccc; padding-bottom: 5px; font-weight: bold;'>");
+ message.println("Variables");
+ message.println("</div>");
+ message
+ .println("<table border='1' cellspacing='0' cellpadding='2' style='font-size: 14px;'>");
+ message
+ .println("<thead><tr><th><b>Name</b></th><th><b>Value</b></th></tr></thead>");
+ message.println("<tbody>");
+ }
+
+ message.println("<tr>");
+ message.println("<td>"
+ + flowParameters.get("reportal.variable." + i + ".from") + "</td>");
+ message.println("<td>"
+ + flowParameters.get("reportal.variable." + i + ".to") + "</td>");
+ message.println("</tr>");
+
+ i++;
+ }
+
+ if (i > 0) { // at least one variable
+ message.println("</tbody>");
+ message.println("</table>");
+ }
+
+ long totalFileSize = 0;
+ if (printData) {
+ String locationFull =
+ (outputLocation + "/" + flow.getExecutionId()).replace("//", "/");
+
+ IStreamProvider streamProvider =
+ ReportalUtil.getStreamProvider(outputFileSystem);
+
+ if (streamProvider instanceof StreamProviderHDFS) {
+ StreamProviderHDFS hdfsStreamProvider =
+ (StreamProviderHDFS) streamProvider;
+ hdfsStreamProvider.setHadoopSecurityManager(hadoopSecurityManager);
+ hdfsStreamProvider.setUser(reportalStorageUser);
+ }
+
+ // Get file list
+ String[] fileList =
+ ReportalHelper
+ .filterCSVFile(streamProvider.getFileList(locationFull));
+
+ // Sort files in execution order.
+ // File names are in the format {EXECUTION_ORDER}-{QUERY_TITLE}.csv
+ // E.g.: 1-queryTitle.csv
+ Arrays.sort(fileList, new Comparator<String>() {
+
+ @Override
+ public int compare(String a, String b) {
+ Integer aExecutionOrder =
+ Integer.parseInt(a.substring(0, a.indexOf('-')));
+ Integer bExecutionOrder =
+ Integer.parseInt(b.substring(0, b.indexOf('-')));
+ return aExecutionOrder.compareTo(bExecutionOrder);
+ }
+ });
+
+ // Get jobs in execution order
+ List<ExecutableNode> jobs = ReportalUtil.sortExecutableNodes(flow);
+
+ File tempFolder =
+ new File(reportalMailTempDirectory + "/" + flow.getExecutionId());
+ tempFolder.mkdirs();
+
+ // Copy output files from HDFS to local disk, so you can send them as
+ // email attachments
+ for (String file : fileList) {
+ String filePath = locationFull + "/" + file;
+ InputStream csvInputStream = null;
+ OutputStream tempOutputStream = null;
+ File tempOutputFile = new File(tempFolder, file);
+ tempOutputFile.createNewFile();
+ try {
+ csvInputStream = streamProvider.getFileInputStream(filePath);
+ tempOutputStream =
+ new BufferedOutputStream(new FileOutputStream(tempOutputFile));
+
+ IOUtils.copy(csvInputStream, tempOutputStream);
+ } finally {
+ IOUtils.closeQuietly(tempOutputStream);
+ IOUtils.closeQuietly(csvInputStream);
+ }
+ }
+
+ try {
+ streamProvider.cleanUp();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ boolean emptyResults = true;
+
+ String htmlResults =
+ flowParameters.get("reportal.render.results.as.html");
+ boolean renderResultsAsHtml =
+ htmlResults != null && htmlResults.trim().equalsIgnoreCase("true");
+
+ for (i = 0; i < fileList.length; i++) {
+ String file = fileList[i];
+ ExecutableNode job = jobs.get(i);
+ job.getAttempt();
+
+ message
+ .println("<div style='margin-top: 10px; margin-bottom: 10px; border-bottom: 1px solid #ccc; padding-bottom: 5px; font-weight: bold;'>");
+ message.println(file);
+ message.println("</div>");
+ message.println("<div>");
+ message
+ .println("<table border='1' cellspacing='0' cellpadding='2' style='font-size: 14px;'>");
+ File tempOutputFile = new File(tempFolder, file);
+ InputStream csvInputStream = null;
+ try {
+ csvInputStream =
+ new BufferedInputStream(new FileInputStream(tempOutputFile));
+ Scanner rowScanner = new Scanner(csvInputStream, StandardCharsets.UTF_8.toString());
+ int lineNumber = 0;
+ while (rowScanner.hasNextLine() && lineNumber <= NUM_PREVIEW_ROWS) {
+ // For Hive jobs, the first line is the column names, so we ignore
+ // it
+ // when deciding whether the output is empty or not
+ if (!job.getType().equals(ReportalType.HiveJob.getJobTypeName())
+ || lineNumber > 0) {
+ emptyResults = false;
+ }
+
+ String csvLine = rowScanner.nextLine();
+ String[] data = csvLine.split("\",\"");
+ message.println("<tr>");
+ for (String item : data) {
+ String column = item.replace("\"", "");
+ if (!renderResultsAsHtml) {
+ column = StringEscapeUtils.escapeHtml(column);
+ }
+ message.println("<td>" + column + "</td>");
+ }
+ message.println("</tr>");
+ if (lineNumber == NUM_PREVIEW_ROWS && rowScanner.hasNextLine()) {
+ message.println("<tr>");
+ message.println("<td colspan=\"" + data.length + "\">...</td>");
+ message.println("</tr>");
+ }
+ lineNumber++;
+ }
+ rowScanner.close();
+ message.println("</table>");
+ message.println("</div>");
+ } finally {
+ IOUtils.closeQuietly(csvInputStream);
+ }
+ totalFileSize += tempOutputFile.length();
+ }
+
+ if (totalFileSize < MAX_ATTACHMENT_SIZE) {
+ for (i = 0; i < fileList.length; i++) {
+ String file = fileList[i];
+ File tempOutputFile = new File(tempFolder, file);
+ message.addAttachment(file, tempOutputFile);
+ }
+ }
+
+ // Don't send an email if there are no results, unless this is an
+ // unscheduled run.
+ String unscheduledRun = flowParameters.get("reportal.unscheduled.run");
+ boolean isUnscheduledRun =
+ unscheduledRun != null
+ && unscheduledRun.trim().equalsIgnoreCase("true");
+ if (emptyResults && !isUnscheduledRun) {
+ return false;
+ }
+ }
+
+ message.println("</div>");
+ if (totalFileSize >= MAX_ATTACHMENT_SIZE){
+ message.println("<tr>The total size of the reports (" + totalFileSize/1024/1024 + "MB) is bigger than the allowed maximum size of " +
+ MAX_ATTACHMENT_SIZE/1024/1024 + "MB. " +
+ "It is too big to be attached in this message. Please use the link above titled Result Data to download the reports</tr>");
+ }
+ message.println("</body>").println("</html>");
+
+ return true;
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
new file mode 100644
index 0000000..bdef3b2
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
@@ -0,0 +1,1350 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.Reportal;
+import azkaban.reportal.util.Reportal.Query;
+import azkaban.reportal.util.Reportal.Variable;
+import azkaban.reportal.util.ReportalHelper;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.reportal.util.StreamProviderHDFS;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.server.session.Session;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
+import azkaban.webapp.servlet.Page;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.log4j.Logger;
+import org.apache.velocity.tools.generic.EscapeTool;
+import org.joda.time.DateTime;
+
+public class ReportalServlet extends LoginAbstractAzkabanServlet {
+
+ private static final String REPORTAL_VARIABLE_PREFIX = "reportal.variable.";
+ private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
+ "hadoop.security.manager.class";
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = Logger.getLogger(ReportalServlet.class);
+ private final File reportalMailTempDirectory;
+ private final Props props;
+ private final String viewerName;
+ private final String reportalStorageUser;
+ private final File webResourcesFolder;
+ private final int max_allowed_schedule_dates;
+ private final int default_schedule_dates;
+ private final boolean showNav;
+ private CleanerThread cleanerThread;
+ /**
+ * A whitelist of allowed email domains (e.g.: example.com). If null, all
+ * email domains are allowed.
+ */
+ private Set<String> allowedEmailDomains = null;
+ private AzkabanWebServer server;
+ private boolean shouldProxy;
+ private int itemsPerPage = 20;
+ private HadoopSecurityManager hadoopSecurityManager;
+
+ public ReportalServlet(final Props props) {
+ this.props = props;
+
+ this.viewerName = props.getString("viewer.name");
+ this.reportalStorageUser = props.getString("reportal.storage.user", "reportal");
+ this.itemsPerPage = props.getInt("reportal.items_per_page", 20);
+ this.showNav = props.getBoolean("reportal.show.navigation", false);
+
+ this.max_allowed_schedule_dates = props.getInt("reportal.max.allowed.schedule.dates", 180);
+ this.default_schedule_dates = props.getInt("reportal.default.schedule.dates", 30);
+
+ this.reportalMailTempDirectory =
+ new File(props.getString("reportal.mail.temp.dir", "/tmp/reportal"));
+ this.reportalMailTempDirectory.mkdirs();
+ ReportalMailCreator.reportalMailTempDirectory = this.reportalMailTempDirectory;
+
+ final List<String> allowedDomains =
+ props.getStringList("reportal.allowed.email.domains",
+ (List<String>) null);
+ if (allowedDomains != null) {
+ this.allowedEmailDomains = new HashSet<>(allowedDomains);
+ }
+
+ ReportalMailCreator.outputLocation =
+ props.getString("reportal.output.dir", "/tmp/reportal");
+ ReportalMailCreator.outputFileSystem =
+ props.getString("reportal.output.filesystem", "local");
+ ReportalMailCreator.reportalStorageUser = this.reportalStorageUser;
+
+ this.webResourcesFolder =
+ new File(new File(props.getSource()).getParentFile().getParentFile(),
+ "web");
+ this.webResourcesFolder.mkdirs();
+ setResourceDirectory(this.webResourcesFolder);
+ System.out.println("Reportal web resources: "
+ + this.webResourcesFolder.getAbsolutePath());
+ }
+
+ @Override
+ public void init(final ServletConfig config) throws ServletException {
+ super.init(config);
+ this.server = (AzkabanWebServer) getApplication();
+ ReportalMailCreator.azkaban = this.server;
+
+ this.shouldProxy = this.props.getBoolean("azkaban.should.proxy", false);
+ logger.info("Hdfs browser should proxy: " + this.shouldProxy);
+ try {
+ this.hadoopSecurityManager = loadHadoopSecurityManager(this.props, logger);
+ ReportalMailCreator.hadoopSecurityManager = this.hadoopSecurityManager;
+ } catch (final RuntimeException e) {
+ e.printStackTrace();
+ throw new RuntimeException("Failed to get hadoop security manager!"
+ + e.getCause());
+ }
+
+ this.cleanerThread = new CleanerThread();
+ this.cleanerThread.start();
+ }
+
+ private HadoopSecurityManager loadHadoopSecurityManager(final Props props,
+ final Logger logger) throws RuntimeException {
+
+ final Class<?> hadoopSecurityManagerClass =
+ props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
+ ReportalServlet.class.getClassLoader());
+ logger.info("Initializing hadoop security manager "
+ + hadoopSecurityManagerClass.getName());
+ HadoopSecurityManager hadoopSecurityManager = null;
+
+ try {
+ final Method getInstanceMethod =
+ hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
+ hadoopSecurityManager =
+ (HadoopSecurityManager) getInstanceMethod.invoke(
+ hadoopSecurityManagerClass, props);
+ } catch (final InvocationTargetException e) {
+ logger.error("Could not instantiate Hadoop Security Manager "
+ + hadoopSecurityManagerClass.getName() + e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e.getCause());
+ }
+
+ return hadoopSecurityManager;
+ }
+
+ @Override
+ protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
+ final Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ } else {
+ if (hasParam(req, "view")) {
+ try {
+ handleViewReportal(req, resp, session);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+ } else if (hasParam(req, "new")) {
+ handleNewReportal(req, resp, session);
+ } else if (hasParam(req, "edit")) {
+ handleEditReportal(req, resp, session);
+ } else if (hasParam(req, "run")) {
+ handleRunReportal(req, resp, session);
+ } else {
+ handleListReportal(req, resp, session);
+ }
+ }
+ }
+
+ private void handleAJAXAction(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+ final HashMap<String, Object> ret = new HashMap<>();
+ final String ajaxName = getParam(req, "ajax");
+ final User user = session.getUser();
+ final int id = getIntParam(req, "id");
+ final ProjectManager projectManager = this.server.getProjectManager();
+ final Project project = projectManager.getProject(id);
+ final Reportal reportal = Reportal.loadFromProject(project);
+
+ // Delete report
+ if (ajaxName.equals("delete")) {
+ if (!project.hasPermission(user, Type.ADMIN)) {
+ ret.put("error", "You do not have permissions to delete this reportal.");
+ } else {
+ try {
+ final ScheduleManager scheduleManager = this.server.getScheduleManager();
+ reportal.removeSchedules(scheduleManager);
+ projectManager.removeProject(project, user);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ ret.put("error", "An exception occured while deleting this reportal.");
+ }
+ ret.put("result", "success");
+ }
+ }
+ // Bookmark report
+ else if (ajaxName.equals("bookmark")) {
+ final boolean wasBookmarked = ReportalHelper.isBookmarkProject(project, user);
+ try {
+ if (wasBookmarked) {
+ ReportalHelper.unBookmarkProject(this.server, project, user);
+ ret.put("result", "success");
+ ret.put("bookmark", false);
+ } else {
+ ReportalHelper.bookmarkProject(this.server, project, user);
+ ret.put("result", "success");
+ ret.put("bookmark", true);
+ }
+ } catch (final ProjectManagerException e) {
+ e.printStackTrace();
+ ret.put("error", "Error bookmarking reportal. " + e.getMessage());
+ }
+ }
+ // Subscribe to report
+ else if (ajaxName.equals("subscribe")) {
+ final boolean wasSubscribed = ReportalHelper.isSubscribeProject(project, user);
+ if (!wasSubscribed && reportal.getAccessViewers().size() > 0
+ && !hasPermission(project, user, Type.READ)) {
+ ret.put("error", "You do not have permissions to view this reportal.");
+ } else {
+ try {
+ if (wasSubscribed) {
+ ReportalHelper.unSubscribeProject(this.server, project, user);
+ ret.put("result", "success");
+ ret.put("subscribe", false);
+ } else {
+ ReportalHelper.subscribeProject(this.server, project, user,
+ user.getEmail());
+ ret.put("result", "success");
+ ret.put("subscribe", true);
+ }
+ } catch (final ProjectManagerException e) {
+ e.printStackTrace();
+ ret.put("error", "Error subscribing to reportal. " + e.getMessage());
+ }
+ }
+ }
+ // Get a portion of logs
+ else if (ajaxName.equals("log")) {
+ final int execId = getIntParam(req, "execId");
+ final String jobId = getParam(req, "jobId");
+ final int offset = getIntParam(req, "offset");
+ final int length = getIntParam(req, "length");
+ final ExecutableFlow exec;
+ final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+ try {
+ exec = executorManager.getExecutableFlow(execId);
+ } catch (final Exception e) {
+ ret.put("error", "Log does not exist or isn't created yet.");
+ return;
+ }
+
+ final LogData data;
+ try {
+ data =
+ executorManager.getExecutionJobLog(exec, jobId, offset, length,
+ exec.getExecutableNode(jobId).getAttempt());
+ } catch (final Exception e) {
+ e.printStackTrace();
+ ret.put("error", "Log does not exist or isn't created yet.");
+ return;
+ }
+ if (data != null) {
+ ret.put("result", "success");
+ ret.put("log", data.getData());
+ ret.put("offset", data.getOffset());
+ ret.put("length", data.getLength());
+ ret.put("completed", exec.getEndTime() != -1);
+ } else {
+ // Return an empty result to indicate the end
+ ret.put("result", "success");
+ ret.put("log", "");
+ ret.put("offset", offset);
+ ret.put("length", 0);
+ ret.put("completed", exec.getEndTime() != -1);
+ }
+ }
+
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
+ }
+
+ private void handleListReportal(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/reportal/reportallistpage.vm");
+ preparePage(page, session);
+
+ final List<Project> projects = ReportalHelper.getReportalProjects(this.server);
+ page.add("ReportalHelper", ReportalHelper.class);
+ page.add("user", session.getUser());
+
+ final String startDate = DateTime.now().minusWeeks(1).toString("yyyy-MM-dd");
+ final String endDate = DateTime.now().toString("yyyy-MM-dd");
+ page.add("startDate", startDate);
+ page.add("endDate", endDate);
+
+ if (!projects.isEmpty()) {
+ page.add("projects", projects);
+ } else {
+ page.add("projects", false);
+ }
+
+ page.render();
+ }
+
+ private void handleViewReportal(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ Exception {
+ final int id = getIntParam(req, "id");
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/reportal/reportaldatapage.vm");
+ preparePage(page, session);
+
+ final ProjectManager projectManager = this.server.getProjectManager();
+ final ExecutorManagerAdapter executorManager = this.server.getExecutorManager();
+
+ final Project project = projectManager.getProject(id);
+ final Reportal reportal = Reportal.loadFromProject(project);
+
+ if (reportal == null) {
+ page.add("errorMsg", "Report not found.");
+ page.render();
+ return;
+ }
+
+ if (reportal.getAccessViewers().size() > 0
+ && !hasPermission(project, session.getUser(), Type.READ)) {
+ page.add("errorMsg", "You are not allowed to view this report.");
+ page.render();
+ return;
+ }
+
+ page.add("project", project);
+ page.add("title", project.getMetadata().get("title"));
+
+ if (hasParam(req, "execid")) {
+ final int execId = getIntParam(req, "execid");
+ page.add("execid", execId);
+ // Show logs
+ if (hasParam(req, "logs")) {
+ final ExecutableFlow exec;
+ try {
+ exec = executorManager.getExecutableFlow(execId);
+ } catch (final ExecutorManagerException e) {
+ e.printStackTrace();
+ page.add("errorMsg", "ExecutableFlow not found. " + e.getMessage());
+ page.render();
+ return;
+ }
+ // View single log
+ if (hasParam(req, "log")) {
+ page.add("view-log", true);
+ final String jobId = getParam(req, "log");
+ page.add("execid", execId);
+ page.add("jobId", jobId);
+ }
+ // List files
+ else {
+ page.add("view-logs", true);
+ final List<ExecutableNode> jobLogs = ReportalUtil.sortExecutableNodes(exec);
+
+ final boolean showDataCollector = hasParam(req, "debug");
+ if (!showDataCollector) {
+ jobLogs.remove(jobLogs.size() - 1);
+ }
+
+ if (jobLogs.size() == 1) {
+ resp.sendRedirect("/reportal?view&logs&id=" + project.getId()
+ + "&execid=" + execId + "&log=" + jobLogs.get(0).getId());
+ }
+ page.add("logs", jobLogs);
+ }
+ }
+ // Show data files
+ else {
+ final String outputFileSystem = ReportalMailCreator.outputFileSystem;
+ final String outputBase = ReportalMailCreator.outputLocation;
+
+ final String locationFull = (outputBase + "/" + execId).replace("//", "/");
+
+ final IStreamProvider streamProvider =
+ ReportalUtil.getStreamProvider(outputFileSystem);
+
+ if (streamProvider instanceof StreamProviderHDFS) {
+ final StreamProviderHDFS hdfsStreamProvider =
+ (StreamProviderHDFS) streamProvider;
+ hdfsStreamProvider.setHadoopSecurityManager(this.hadoopSecurityManager);
+ hdfsStreamProvider.setUser(this.reportalStorageUser);
+ }
+
+ try {
+ if (hasParam(req, "download")) {
+ final String fileName = getParam(req, "download");
+ final String filePath = locationFull + "/" + fileName;
+ InputStream csvInputStream = null;
+ OutputStream out = null;
+ try {
+ csvInputStream = streamProvider.getFileInputStream(filePath);
+ resp.setContentType("application/octet-stream");
+
+ out = resp.getOutputStream();
+ IOUtils.copy(csvInputStream, out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ IOUtils.closeQuietly(csvInputStream);
+ }
+ return;
+ }
+ // Show file previews
+ else {
+ page.add("view-preview", true);
+
+ try {
+ String[] fileList = streamProvider.getFileList(locationFull);
+ fileList = ReportalHelper.filterCSVFile(fileList);
+ Arrays.sort(fileList);
+
+ final List<Object> files =
+ getFilePreviews(fileList, locationFull, streamProvider,
+ reportal.renderResultsAsHtml);
+
+ page.add("files", files);
+ } catch (final Exception e) {
+ logger.error("Error encountered while processing files in "
+ + locationFull, e);
+ }
+ }
+ } finally {
+ try {
+ streamProvider.cleanUp();
+ } catch (final IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ // List executions and their data
+ else {
+ page.add("view-executions", true);
+ final ArrayList<ExecutableFlow> exFlows = new ArrayList<>();
+
+ int pageNumber = 0;
+ boolean hasNextPage = false;
+ if (hasParam(req, "page")) {
+ pageNumber = getIntParam(req, "page") - 1;
+ }
+ if (pageNumber < 0) {
+ pageNumber = 0;
+ }
+ try {
+ final Flow flow = project.getFlows().get(0);
+ executorManager.getExecutableFlows(project.getId(), flow.getId(),
+ pageNumber * this.itemsPerPage, this.itemsPerPage, exFlows);
+ final ArrayList<ExecutableFlow> tmp = new ArrayList<>();
+ executorManager.getExecutableFlows(project.getId(), flow.getId(),
+ (pageNumber + 1) * this.itemsPerPage, 1, tmp);
+ if (!tmp.isEmpty()) {
+ hasNextPage = true;
+ }
+ } catch (final ExecutorManagerException e) {
+ page.add("error", "Error retrieving executable flows");
+ }
+
+ if (!exFlows.isEmpty()) {
+ final ArrayList<Object> history = new ArrayList<>();
+ for (final ExecutableFlow exFlow : exFlows) {
+ final HashMap<String, Object> flowInfo = new HashMap<>();
+ flowInfo.put("execId", exFlow.getExecutionId());
+ flowInfo.put("status", exFlow.getStatus().toString());
+ flowInfo.put("startTime", exFlow.getStartTime());
+
+ history.add(flowInfo);
+ }
+ page.add("executions", history);
+ }
+ if (pageNumber > 0) {
+ page.add("pagePrev", pageNumber);
+ }
+ page.add("page", pageNumber + 1);
+ if (hasNextPage) {
+ page.add("pageNext", pageNumber + 2);
+ }
+ }
+
+ page.render();
+ }
+
+ /**
+ * Returns a list of file Objects that contain a "name" property with the file
+ * name, a "content" property with the lines in the file, and a "hasMore"
+ * property if the file contains more than NUM_PREVIEW_ROWS lines.
+ */
+ private List<Object> getFilePreviews(final String[] fileList, final String locationFull,
+ final IStreamProvider streamProvider, final boolean renderResultsAsHtml) {
+ final List<Object> files = new ArrayList<>();
+ InputStream csvInputStream = null;
+
+ try {
+ for (final String fileName : fileList) {
+ final Map<String, Object> file = new HashMap<>();
+ file.put("name", fileName);
+
+ final String filePath = locationFull + "/" + fileName;
+ csvInputStream = streamProvider.getFileInputStream(filePath);
+ final Scanner rowScanner = new Scanner(csvInputStream, StandardCharsets.UTF_8.toString());
+
+ final List<Object> lines = new ArrayList<>();
+ int lineNumber = 0;
+ while (rowScanner.hasNextLine()
+ && lineNumber < ReportalMailCreator.NUM_PREVIEW_ROWS) {
+ final String csvLine = rowScanner.nextLine();
+ final String[] data = csvLine.split("\",\"");
+ final List<String> line = new ArrayList<>();
+ for (final String item : data) {
+ String column = item.replace("\"", "");
+ if (!renderResultsAsHtml) {
+ column = StringEscapeUtils.escapeHtml(column);
+ }
+ line.add(column);
+ }
+ lines.add(line);
+ lineNumber++;
+ }
+
+ file.put("content", lines);
+
+ if (rowScanner.hasNextLine()) {
+ file.put("hasMore", true);
+ }
+
+ files.add(file);
+ rowScanner.close();
+ }
+ } catch (final Exception e) {
+ logger.debug("Error encountered while processing files in "
+ + locationFull, e);
+ } finally {
+ IOUtils.closeQuietly(csvInputStream);
+ }
+
+ return files;
+ }
+
+ private void handleRunReportal(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+ final int id = getIntParam(req, "id");
+ final ProjectManager projectManager = this.server.getProjectManager();
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/reportal/reportalrunpage.vm");
+ preparePage(page, session);
+
+ final Project project = projectManager.getProject(id);
+ final Reportal reportal = Reportal.loadFromProject(project);
+
+ if (reportal == null) {
+ page.add("errorMsg", "Report not found");
+ page.render();
+ return;
+ }
+
+ if (reportal.getAccessExecutors().size() > 0
+ && !hasPermission(project, session.getUser(), Type.EXECUTE)) {
+ page.add("errorMsg", "You are not allowed to run this report.");
+ page.render();
+ return;
+ }
+
+ page.add("projectId", id);
+ page.add("title", reportal.title);
+ page.add("description", reportal.description);
+
+ final List<Variable> runtimeVariables =
+ ReportalUtil.getRunTimeVariables(reportal.variables);
+ if (runtimeVariables.size() > 0) {
+ page.add("variableNumber", runtimeVariables.size());
+ page.add("variables", runtimeVariables);
+ }
+
+ page.render();
+ }
+
+ private void handleNewReportal(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/reportal/reportaleditpage.vm");
+ preparePage(page, session);
+
+ page.add("title", "");
+ page.add("description", "");
+
+ page.add("queryNumber", 1);
+
+ final List<Map<String, Object>> queryList = new ArrayList<>();
+ page.add("queries", queryList);
+
+ final Map<String, Object> query = new HashMap<>();
+ queryList.add(query);
+ query.put("title", "");
+ query.put("type", "");
+ query.put("script", "");
+
+ page.add("accessViewer", "");
+ page.add("accessExecutor", "");
+ page.add("accessOwner", "");
+ page.add("notifications", "");
+ page.add("failureNotifications", "");
+
+ page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
+ page.add("default_schedule_dates", this.default_schedule_dates);
+
+ page.render();
+ }
+
+ private void handleEditReportal(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+ final int id = getIntParam(req, "id");
+ final ProjectManager projectManager = this.server.getProjectManager();
+
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/reportal/reportaleditpage.vm");
+ preparePage(page, session);
+ page.add("ReportalHelper", ReportalHelper.class);
+
+ final Project project = projectManager.getProject(id);
+ final Reportal reportal = Reportal.loadFromProject(project);
+
+ final List<String> errors = new ArrayList<>();
+
+ if (reportal == null) {
+ errors.add("Report not found");
+ page.add("errorMsgs", errors);
+ page.render();
+ return;
+ }
+
+ if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
+ errors.add("You are not allowed to edit this report.");
+ page.add("errorMsgs", errors);
+ page.render();
+ return;
+ }
+
+ page.add("projectId", id);
+ page.add("title", reportal.title);
+ page.add("description", reportal.description);
+ page.add("queryNumber", reportal.queries.size());
+ page.add("queries", reportal.queries);
+ page.add("variableNumber", reportal.variables.size());
+ page.add("variables", reportal.variables);
+ page.add("schedule", reportal.schedule);
+ page.add("scheduleHour", reportal.scheduleHour);
+ page.add("scheduleMinute", reportal.scheduleMinute);
+ page.add("scheduleAmPm", reportal.scheduleAmPm);
+ page.add("scheduleTimeZone", reportal.scheduleTimeZone);
+ page.add("scheduleDate", reportal.scheduleDate);
+ page.add("endScheduleDate", reportal.endSchedule);
+ page.add("scheduleRepeat", reportal.scheduleRepeat);
+ page.add("scheduleIntervalQuantity", reportal.scheduleIntervalQuantity);
+ page.add("scheduleInterval", reportal.scheduleInterval);
+ page.add("renderResultsAsHtml", reportal.renderResultsAsHtml);
+ page.add("notifications", reportal.notifications);
+ page.add("failureNotifications", reportal.failureNotifications);
+ page.add("accessViewer", reportal.accessViewer);
+ page.add("accessExecutor", reportal.accessExecutor);
+ page.add("accessOwner", reportal.accessOwner);
+
+ page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
+ page.add("default_schedule_dates", this.default_schedule_dates);
+ page.render();
+ }
+
+ @Override
+ protected void handlePost(final HttpServletRequest req, final HttpServletResponse resp,
+ final Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ final HashMap<String, Object> ret = new HashMap<>();
+
+ handleRunReportalWithVariables(req, ret, session);
+
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
+ } else {
+ handleSaveReportal(req, resp, session);
+ }
+ }
+
+ private void handleSaveReportal(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+ final String projectId = validateAndSaveReport(req, resp, session);
+
+ if (projectId != null) {
+ this.setSuccessMessageInCookie(resp, "Report Saved.");
+
+ final String submitType = getParam(req, "submit");
+ if (submitType.equals("Save")) {
+ resp.sendRedirect(req.getRequestURI() + "?edit&id=" + projectId);
+ } else {
+ resp.sendRedirect(req.getRequestURI() + "?run&id=" + projectId);
+ }
+ }
+ }
+
+ /**
+ * Validates and saves a report, returning the project id of the saved report
+ * if successful, and null otherwise.
+ *
+ * @return The project id of the saved report if successful, and null
+ * otherwise
+ */
+ private String validateAndSaveReport(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+
+ final ProjectManager projectManager = this.server.getProjectManager();
+ final User user = session.getUser();
+
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/reportal/reportaleditpage.vm");
+ preparePage(page, session);
+ page.add("ReportalHelper", ReportalHelper.class);
+
+ final boolean isEdit = hasParam(req, "id");
+ if (isEdit) {
+ page.add("projectId", getIntParam(req, "id"));
+ }
+
+ Project project = null;
+ final Reportal report = new Reportal();
+
+ report.title = getParam(req, "title");
+ report.description = getParam(req, "description");
+ page.add("title", report.title);
+ page.add("description", report.description);
+
+ report.schedule = hasParam(req, "schedule");
+ report.scheduleHour = getParam(req, "schedule-hour");
+ report.scheduleMinute = getParam(req, "schedule-minute");
+ report.scheduleAmPm = getParam(req, "schedule-am_pm");
+ report.scheduleTimeZone = getParam(req, "schedule-timezone");
+ report.scheduleDate = getParam(req, "schedule-date");
+ report.scheduleRepeat = hasParam(req, "schedule-repeat");
+ report.scheduleIntervalQuantity =
+ getParam(req, "schedule-interval-quantity");
+ report.scheduleInterval = getParam(req, "schedule-interval");
+ report.renderResultsAsHtml = hasParam(req, "render-results-as-html");
+
+ final boolean isEndSchedule = hasParam(req, "end-schedule-date");
+ if (isEndSchedule) {
+ report.endSchedule = getParam(req, "end-schedule-date");
+ }
+
+ page.add("schedule", report.schedule);
+ page.add("scheduleHour", report.scheduleHour);
+ page.add("scheduleMinute", report.scheduleMinute);
+ page.add("scheduleAmPm", report.scheduleAmPm);
+ page.add("scheduleTimeZone", report.scheduleTimeZone);
+ page.add("scheduleDate", report.scheduleDate);
+ page.add("scheduleRepeat", report.scheduleRepeat);
+ page.add("scheduleIntervalQuantity", report.scheduleIntervalQuantity);
+ page.add("scheduleInterval", report.scheduleInterval);
+ page.add("renderResultsAsHtml", report.renderResultsAsHtml);
+ page.add("endSchedule", report.endSchedule);
+ page.add("max_allowed_schedule_dates", this.max_allowed_schedule_dates);
+ page.add("default_schedule_dates", this.default_schedule_dates);
+
+ report.accessViewer = getParam(req, "access-viewer");
+ report.accessExecutor = getParam(req, "access-executor");
+ report.accessOwner = getParam(req, "access-owner");
+ page.add("accessViewer", report.accessViewer);
+ page.add("accessExecutor", report.accessExecutor);
+
+ // Adding report creator as explicit owner, if not present already
+ if (report.accessOwner == null || report.accessOwner.isEmpty()) {
+ report.accessOwner = user.getUserId();
+ } else {
+ final String[] splittedOwners = report.accessOwner.toLowerCase()
+ .split(Reportal.ACCESS_LIST_SPLIT_REGEX);
+ if (!Arrays.asList(splittedOwners).contains(user.getUserId())) {
+ report.accessOwner = String.format("%s,%s", user.getUserId(),
+ StringUtils.join(splittedOwners, ','));
+ } else {
+ report.accessOwner = StringUtils.join(splittedOwners, ',');
+ }
+ }
+
+ page.add("accessOwner", report.accessOwner);
+
+ report.notifications = getParam(req, "notifications");
+ report.failureNotifications = getParam(req, "failure-notifications");
+ page.add("notifications", report.notifications);
+ page.add("failureNotifications", report.failureNotifications);
+
+ final int numQueries = getIntParam(req, "queryNumber");
+ page.add("queryNumber", numQueries);
+ final List<Query> queryList = new ArrayList<>(numQueries);
+ page.add("queries", queryList);
+ report.queries = queryList;
+
+ final List<String> errors = new ArrayList<>();
+ for (int i = 0; i < numQueries; i++) {
+ final Query query = new Query();
+
+ query.title = getParam(req, "query" + i + "title");
+ query.type = getParam(req, "query" + i + "type");
+ query.script = getParam(req, "query" + i + "script");
+
+ // Type check
+ final ReportalType type = ReportalType.getTypeByName(query.type);
+ if (type == null) {
+ errors.add("Type " + query.type + " is invalid.");
+ }
+
+ if (!type.checkPermission(user) && report.schedule) {
+ errors.add("You do not have permission to schedule Type " + query.type + ".");
+ }
+
+ queryList.add(query);
+ }
+
+ final int variables = getIntParam(req, "variableNumber");
+ page.add("variableNumber", variables);
+ final List<Variable> variableList = new ArrayList<>(variables);
+ page.add("variables", variableList);
+ report.variables = variableList;
+
+ String proxyUser = null;
+
+ for (int i = 0; i < variables; i++) {
+ final Variable variable =
+ new Variable(getParam(req, "variable" + i + "title"), getParam(req,
+ "variable" + i + "name"));
+
+ if (variable.title.isEmpty() || variable.name.isEmpty()) {
+ errors.add("Variable title and name cannot be empty.");
+ }
+
+ if (variable.title.equals("reportal.config.reportal.execution.user")) {
+ proxyUser = variable.name;
+ }
+
+ variableList.add(variable);
+ }
+
+ // Make sure title isn't empty
+ if (report.title.isEmpty()) {
+ errors.add("Title must not be empty.");
+ }
+
+ // Make sure description isn't empty
+ if (report.description.isEmpty()) {
+ errors.add("Description must not be empty.");
+ }
+
+ // Verify schedule and repeat
+ if (report.schedule) {
+ // Verify schedule time
+ if (!NumberUtils.isDigits(report.scheduleHour)
+ || !NumberUtils.isDigits(report.scheduleMinute)) {
+ errors.add("Schedule time is invalid.");
+ }
+
+ // Verify schedule date is not empty
+ if (report.scheduleDate.isEmpty()) {
+ errors.add("Schedule date must not be empty.");
+ }
+
+ if (report.scheduleRepeat) {
+ // Verify repeat interval
+ if (!NumberUtils.isDigits(report.scheduleIntervalQuantity)) {
+ errors.add("Repeat interval quantity is invalid.");
+ }
+ }
+ }
+
+ // Empty query check
+ if (numQueries <= 0) {
+ errors.add("There needs to have at least one query.");
+ }
+
+ // Validate access users
+ final UserManager userManager = getApplication().getUserManager();
+ final String[] accessLists =
+ new String[]{report.accessViewer, report.accessExecutor,
+ report.accessOwner};
+ for (String accessList : accessLists) {
+ if (accessList == null) {
+ continue;
+ }
+
+ accessList = accessList.trim();
+ if (!accessList.isEmpty()) {
+ final String[] users = accessList.split(Reportal.ACCESS_LIST_SPLIT_REGEX);
+ for (final String accessUser : users) {
+ if (!userManager.validateUser(accessUser)) {
+ errors.add("User " + accessUser + " in access list is invalid.");
+ }
+ }
+ }
+ }
+
+ // Validate proxy user
+ if (proxyUser != null) {
+ if (!userManager.validateProxyUser(proxyUser, user)) {
+ errors.add("User " + user.getUserId() + " has no permission to add " + proxyUser
+ + " as proxy user.");
+ }
+ proxyUser = null;
+ }
+
+ // Validate email addresses
+ final Set<String> emails =
+ ReportalHelper.parseUniqueEmails(report.notifications + ","
+ + report.failureNotifications, Reportal.ACCESS_LIST_SPLIT_REGEX);
+ for (final String email : emails) {
+ if (!ReportalHelper.isValidEmailAddress(email)) {
+ errors.add("Invalid email address: " + email);
+ continue;
+ }
+
+ final String domain = ReportalHelper.getEmailDomain(email);
+ if (this.allowedEmailDomains != null && !this.allowedEmailDomains.contains(domain)) {
+ errors.add("Email address '" + email + "' has an invalid domain '"
+ + domain + "'. " + "Valid domains are: " + this.allowedEmailDomains);
+ }
+ }
+
+ if (errors.size() > 0) {
+ page.add("errorMsgs", errors);
+ page.render();
+ return null;
+ }
+
+ // Attempt to get a project object
+ if (isEdit) {
+ // Editing mode, load project
+ final int projectId = getIntParam(req, "id");
+ project = projectManager.getProject(projectId);
+ report.loadImmutableFromProject(project);
+ } else {
+ // Creation mode, create project
+ try {
+ project =
+ ReportalHelper.createReportalProject(this.server, report.title,
+ report.description, user);
+ report.reportalUser = user.getUserId();
+ report.ownerEmail = user.getEmail();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ errors.add("Error while creating report. " + e.getMessage());
+ page.add("errorMsgs", errors);
+ page.render();
+ return null;
+ }
+
+ // Project already exists
+ if (project == null) {
+ errors.add("A Report with the same name already exists.");
+ page.add("errorMsgs", errors);
+ page.render();
+ return null;
+ }
+ }
+
+ if (project == null) {
+ errors.add("Internal Error: Report not found");
+ page.add("errorMsgs", errors);
+ page.render();
+ return null;
+ }
+
+ report.project = project;
+ page.add("projectId", project.getId());
+
+ try {
+ report.createZipAndUpload(projectManager, user, this.reportalStorageUser);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ errors.add("Error while creating Azkaban jobs. " + e.getMessage());
+ page.add("errorMsgs", errors);
+ page.render();
+ if (!isEdit) {
+ try {
+ projectManager.removeProject(project, user);
+ } catch (final ProjectManagerException e1) {
+ e1.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ // Prepare flow
+ final Flow flow = project.getFlows().get(0);
+ project.getMetadata().put("flowName", flow.getId());
+
+ // Set Reportal mailer
+ flow.setMailCreator(ReportalMailCreator.REPORTAL_MAIL_CREATOR);
+
+ // Create/Save schedule
+ final ScheduleManager scheduleManager = this.server.getScheduleManager();
+ try {
+ report.updateSchedules(report, scheduleManager, user, flow);
+ } catch (final ScheduleManagerException e) {
+ e.printStackTrace();
+ errors.add(e.getMessage());
+ page.add("errorMsgs", errors);
+ page.render();
+ return null;
+ }
+
+ report.saveToProject(project);
+
+ try {
+ ReportalHelper.updateProjectNotifications(project, projectManager);
+ projectManager.updateProjectSetting(project);
+ projectManager
+ .updateProjectDescription(project, report.description, user);
+ updateProjectPermissions(project, projectManager, report, user);
+ projectManager.updateFlow(project, flow);
+ } catch (final ProjectManagerException e) {
+ e.printStackTrace();
+ errors.add("Error while updating report. " + e.getMessage());
+ page.add("errorMsgs", errors);
+ page.render();
+ if (!isEdit) {
+ try {
+ projectManager.removeProject(project, user);
+ } catch (final ProjectManagerException e1) {
+ e1.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ return Integer.toString(project.getId());
+ }
+
+ private void updateProjectPermissions(final Project project,
+ final ProjectManager projectManager, final Reportal report, final User currentUser)
+ throws ProjectManagerException {
+ // Old permissions and users
+ final List<Pair<String, Permission>> oldPermissions =
+ project.getUserPermissions();
+ final Set<String> oldUsers = new HashSet<>();
+ for (final Pair<String, Permission> userPermission : oldPermissions) {
+ oldUsers.add(userPermission.getFirst());
+ }
+
+ // Update permissions
+ report.updatePermissions();
+
+ // New permissions and users
+ final List<Pair<String, Permission>> newPermissions =
+ project.getUserPermissions();
+ final Set<String> newUsers = new HashSet<>();
+ for (final Pair<String, Permission> userPermission : newPermissions) {
+ newUsers.add(userPermission.getFirst());
+ }
+
+ // Save all new permissions
+ for (final Pair<String, Permission> userPermission : newPermissions) {
+ if (!oldPermissions.contains(userPermission)) {
+ projectManager.updateProjectPermission(project,
+ userPermission.getFirst(), userPermission.getSecond(), false,
+ currentUser);
+ }
+ }
+
+ // Remove permissions for any old users no longer in the new users
+ for (final String oldUser : oldUsers) {
+ if (!newUsers.contains(oldUser)) {
+ projectManager.removeProjectPermission(project, oldUser, false,
+ currentUser);
+ }
+ }
+ }
+
+ private void handleRunReportalWithVariables(final HttpServletRequest req,
+ final HashMap<String, Object> ret, final Session session) throws ServletException,
+ IOException {
+ final boolean isTestRun = hasParam(req, "testRun");
+
+ final int id = getIntParam(req, "id");
+ final ProjectManager projectManager = this.server.getProjectManager();
+ final Project project = projectManager.getProject(id);
+ final Reportal report = Reportal.loadFromProject(project);
+ final User user = session.getUser();
+
+ if (report.getAccessExecutors().size() > 0
+ && !hasPermission(project, user, Type.EXECUTE)) {
+ ret.put("error", "You are not allowed to run this report.");
+ return;
+ }
+
+ for (final Query query : report.queries) {
+ final String jobType = query.type;
+ final ReportalType type = ReportalType.getTypeByName(jobType);
+ if (!type.checkPermission(user)) {
+ ret.put(
+ "error",
+ "You are not allowed to run this report as you don't have permission to run job type "
+ + type.toString() + ".");
+ return;
+ }
+ }
+
+ final Flow flow = project.getFlows().get(0);
+
+ final ExecutableFlow exflow = new ExecutableFlow(project, flow);
+ exflow.setSubmitUser(user.getUserId());
+ exflow.addAllProxyUsers(project.getProxyUsers());
+
+ final ExecutionOptions options = exflow.getExecutionOptions();
+
+ int i = 0;
+ for (final Variable variable : ReportalUtil.getRunTimeVariables(report.variables)) {
+ options.getFlowParameters().put(REPORTAL_VARIABLE_PREFIX + i + ".from",
+ variable.name);
+ options.getFlowParameters().put(REPORTAL_VARIABLE_PREFIX + i + ".to",
+ getParam(req, "variable" + i));
+ i++;
+ }
+
+ options.getFlowParameters()
+ .put("reportal.execution.user", user.getUserId());
+
+ // Add the execution user's email to the list of success and failure emails.
+ final String email = user.getEmail();
+
+ if (email != null && !email.isEmpty()) {
+ if (isTestRun) { // Only email the executor
+ final List<String> emails = new ArrayList<>();
+ emails.add(email);
+ options.setSuccessEmails(emails);
+ options.setFailureEmails(emails);
+ } else {
+ options.getSuccessEmails().add(email);
+ options.getFailureEmails().add(email);
+ }
+ }
+
+ options.getFlowParameters().put("reportal.title", report.title);
+ options.getFlowParameters().put("reportal.render.results.as.html",
+ report.renderResultsAsHtml ? "true" : "false");
+ options.getFlowParameters().put("reportal.unscheduled.run", "true");
+
+ try {
+ final String message =
+ this.server.getExecutorManager().submitExecutableFlow(exflow,
+ session.getUser().getUserId())
+ + ".";
+ ret.put("message", message);
+ ret.put("result", "success");
+ ret.put("redirect", "/reportal?view&logs&id=" + project.getId()
+ + "&execid=" + exflow.getExecutionId());
+ } catch (final ExecutorManagerException e) {
+ e.printStackTrace();
+ ret.put("error",
+ "Error running report " + report.title + ". " + e.getMessage());
+ }
+ }
+
+ private void preparePage(final Page page, final Session session) {
+ page.add("viewerName", this.viewerName);
+ page.add("hideNavigation", !this.showNav);
+ page.add("userid", session.getUser().getUserId());
+ page.add("esc", new EscapeTool());
+ }
+
+ private class CleanerThread extends Thread {
+
+ private static final long DEFAULT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 1000;
+ private static final long DEFAULT_OUTPUT_DIR_RETENTION_MS = 7 * 24 * 60
+ * 60 * 1000;
+ private static final long DEFAULT_MAIL_TEMP_DIR_RETENTION_MS =
+ 24 * 60 * 60 * 1000;
+ // The frequency, in milliseconds, that the Reportal output
+ // and mail temp directories should be cleaned
+ private final long CLEAN_INTERVAL_MS;
+ // The duration, in milliseconds, that Reportal output should be retained
+ // for
+ private final long OUTPUT_DIR_RETENTION_MS;
+ // The duration, in milliseconds, that Reportal mail temp files should be
+ // retained for
+ private final long MAIL_TEMP_DIR_RETENTION_MS;
+ private boolean shutdown = false;
+
+ public CleanerThread() {
+ this.setName("Reportal-Cleaner-Thread");
+ this.CLEAN_INTERVAL_MS =
+ ReportalServlet.this.props
+ .getLong("reportal.clean.interval.ms", DEFAULT_CLEAN_INTERVAL_MS);
+ this.OUTPUT_DIR_RETENTION_MS =
+ ReportalServlet.this.props.getLong("reportal.output.dir.retention.ms",
+ DEFAULT_OUTPUT_DIR_RETENTION_MS);
+ this.MAIL_TEMP_DIR_RETENTION_MS =
+ ReportalServlet.this.props.getLong("reportal.mail.temp.dir.retention.ms",
+ DEFAULT_MAIL_TEMP_DIR_RETENTION_MS);
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
+ this.shutdown = true;
+ this.interrupt();
+ }
+
+ @Override
+ public void run() {
+ while (!this.shutdown) {
+ synchronized (this) {
+ logger.info("Cleaning old execution output dirs");
+ cleanOldReportalOutputDirs();
+
+ logger.info("Cleaning Reportal mail temp directory");
+ cleanReportalMailTempDir();
+ }
+
+ try {
+ Thread.sleep(this.CLEAN_INTERVAL_MS);
+ } catch (final InterruptedException e) {
+ logger.error("CleanerThread's sleep was interrupted.", e);
+ }
+ }
+ }
+
+ private void cleanOldReportalOutputDirs() {
+ final IStreamProvider streamProvider =
+ ReportalUtil.getStreamProvider(ReportalMailCreator.outputFileSystem);
+
+ if (streamProvider instanceof StreamProviderHDFS) {
+ final StreamProviderHDFS hdfsStreamProvider =
+ (StreamProviderHDFS) streamProvider;
+ hdfsStreamProvider.setHadoopSecurityManager(ReportalServlet.this.hadoopSecurityManager);
+ hdfsStreamProvider.setUser(ReportalServlet.this.reportalStorageUser);
+ }
+
+ final long pastTimeThreshold =
+ System.currentTimeMillis() - this.OUTPUT_DIR_RETENTION_MS;
+
+ String[] oldFiles = null;
+ try {
+ oldFiles =
+ streamProvider.getOldFiles(ReportalMailCreator.outputLocation,
+ pastTimeThreshold);
+ } catch (final Exception e) {
+ logger.error("Error getting old files from "
+ + ReportalMailCreator.outputLocation + " on "
+ + ReportalMailCreator.outputFileSystem + " file system.", e);
+ }
+
+ if (oldFiles != null) {
+ for (final String file : oldFiles) {
+ final String filePath = ReportalMailCreator.outputLocation + "/" + file;
+ try {
+ streamProvider.deleteFile(filePath);
+ } catch (final Exception e) {
+ logger.error("Error deleting file " + filePath + " from "
+ + ReportalMailCreator.outputFileSystem + " file system.", e);
+ }
+ }
+ }
+ }
+
+ private void cleanReportalMailTempDir() {
+ final File dir = ReportalServlet.this.reportalMailTempDirectory;
+ final long pastTimeThreshold =
+ System.currentTimeMillis() - this.MAIL_TEMP_DIR_RETENTION_MS;
+
+ final File[] oldMailTempDirs = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(final File path) {
+ if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ for (final File tempDir : oldMailTempDirs) {
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ } catch (final IOException e) {
+ logger.error(
+ "Error cleaning Reportal mail temp dir " + tempDir.getPath(), e);
+ }
+ }
+ }
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
new file mode 100644
index 0000000..1dd845a
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import azkaban.reportal.util.Reportal;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+
+public enum ReportalType {
+
+ PigJob("ReportalPig", "reportalpig", "hadoop") {
+ @Override
+ public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
+ final File jobFile, final String jobName, final String queryScript,
+ final String proxyUser) {
+ final File resFolder = new File(jobFile.getParentFile(), "res");
+ resFolder.mkdirs();
+ final File scriptFile = new File(resFolder, jobName + ".pig");
+
+ OutputStream fileOutput = null;
+ try {
+ scriptFile.createNewFile();
+ fileOutput = new BufferedOutputStream(new FileOutputStream(scriptFile));
+ fileOutput.write(queryScript.getBytes(Charset.forName("UTF-8")));
+ } catch (final IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (fileOutput != null) {
+ try {
+ fileOutput.close();
+ } catch (final IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ propertiesFile.put("reportal.pig.script", "res/" + jobName + ".pig");
+ }
+ },
+ HiveJob("ReportalHive", "reportalhive", "hadoop"), TeraDataJob(
+ "ReportalTeraData", "reportalteradata", "teradata"), DataCollectorJob(
+ ReportalTypeManager.DATA_COLLECTOR_JOB,
+ ReportalTypeManager.DATA_COLLECTOR_JOB_TYPE, "") {
+ @Override
+ public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
+ final File jobFile, final String jobName, final String queryScript,
+ final String proxyUser) {
+ propertiesFile.put("user.to.proxy", proxyUser);
+ }
+ };
+
+ private static final HashMap<String, ReportalType> reportalTypes =
+ new HashMap<>();
+
+ static {
+ for (final ReportalType type : ReportalType.values()) {
+ reportalTypes.put(type.typeName, type);
+ }
+ }
+
+ private final String typeName;
+ private final String jobTypeName;
+ private final String permissionName;
+
+ private ReportalType(final String typeName, final String jobTypeName,
+ final String permissionName) {
+ this.typeName = typeName;
+ this.jobTypeName = jobTypeName;
+ this.permissionName = permissionName;
+ }
+
+ public static ReportalType getTypeByName(final String typeName) {
+ return reportalTypes.get(typeName);
+ }
+
+ public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
+ final File jobFile, final String jobName, final String queryScript, final String proxyUser) {
+
+ }
+
+ public String getJobTypeName() {
+ return this.jobTypeName;
+ }
+
+ public boolean checkPermission(final User user) {
+ return user.hasPermission(this.permissionName);
+ }
+
+ @Override
+ public String toString() {
+ return this.typeName;
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalTypeManager.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalTypeManager.java
new file mode 100644
index 0000000..b0e1a56
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalTypeManager.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.reportal;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.reportal.util.Reportal;
+import azkaban.utils.Props;
+import java.io.File;
+import java.util.Map;
+
+public class ReportalTypeManager {
+ public static final String DATA_COLLECTOR_JOB = "ReportalDataCollector";
+ public static final String DATA_COLLECTOR_JOB_TYPE = "reportaldatacollector";
+
+ public static void createJobAndFiles(final Reportal reportal, final File jobFile,
+ final String jobName, final String queryTitle, final String queryType,
+ final String queryScript,
+ final String dependentJob, final String userName, final Map<String, String> extras)
+ throws Exception {
+
+ // Create props for the job
+ final Props propertiesFile = new Props();
+ propertiesFile.put("title", queryTitle);
+
+ final ReportalType type = ReportalType.getTypeByName(queryType);
+
+ if (type == null) {
+ throw new Exception("Type " + queryType + " is invalid.");
+ }
+
+ propertiesFile.put("reportal.title", reportal.title);
+ propertiesFile.put("reportal.job.title", jobName);
+ propertiesFile.put("reportal.job.query", queryScript);
+ propertiesFile.put("user.to.proxy", "${reportal.execution.user}");
+ propertiesFile.put("reportal.proxy.user", "${reportal.execution.user}");
+
+ type.buildJobFiles(reportal, propertiesFile, jobFile, jobName, queryScript,
+ userName);
+
+ propertiesFile.put(CommonJobProperties.JOB_TYPE, type.getJobTypeName());
+ propertiesFile.put(JavaProcessJob.JVM_PARAMS, "-Dreportal.user.name=${reportal.execution.user}"
+ + " -Dreportal.execid=${azkaban.flow.execid}");
+
+ // Order dependency
+ if (dependentJob != null) {
+ propertiesFile.put(CommonJobProperties.DEPENDENCIES, dependentJob);
+ }
+
+ if (extras != null) {
+ propertiesFile.putAll(extras);
+ }
+
+ propertiesFile.storeLocal(jobFile);
+ }
+}
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaldatapage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaldatapage.vm
new file mode 100644
index 0000000..49ec7a3
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaldatapage.vm
@@ -0,0 +1,195 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Reportal</title>
+ <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+ <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/bootstrap.min.js"></script>
+
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var homeDir = "${homedir}";
+#if($project)
+ var projectId = ${project.id};
+#end
+ </script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal-data.js"></script>
+ <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+ </head>
+ <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+ #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+ <div class="content" style="margin-top: 41px;">
+ <div id="box-error">
+ #if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+ #else
+ #if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+ #elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+ #end
+ #end
+ </div>
+ <div> </div>
+#if($project)
+ <div class="container">
+ <h2 style="font-size: 31.5px;">${project.metadata.title}</h2>
+ <div class="well">${project.description}</div>
+ </div>
+#end
+ <div style="text-align: center;">
+ <a class="btn btn-info" href="${context}/reportal">Reportal Home</a>
+#if($project)
+ <a class="btn btn-primary" href="${context}/reportal?view&id=${project.id}">Report History</a>
+#if($execid)
+ <a class="btn btn-primary" href="${context}/reportal?view&id=${project.id}&execid=${execid}">Data</a>
+ <a class="btn btn-primary" href="${context}/reportal?view&logs&id=${project.id}&execid=${execid}">Logs</a>
+#end
+ <a class="btn btn-warning" href="${context}/reportal?edit&id=${project.id}">Edit</a>
+ <a class="btn btn-success button-run" href="${context}/reportal?run&id=${project.id}">Run</a>
+#end
+ </div>
+ <div> </div>
+ <div class="container">
+ <div>
+#if($view-executions)
+ <div style="text-align: center;">
+ #if($pagePrev)
+ <a class="btn btn-primary" href="${context}/reportal?view&id=${project.id}&page=${pagePrev}">Previous page</a>
+ #end
+ <a class="btn btn-inverse" href="${context}/reportal?view&id=${project.id}&page=${page}">Page ${page}</a>
+ #if($pageNext)
+ <a class="btn btn-primary" href="${context}/reportal?view&id=${project.id}&page=${pageNext}">Next page</a>
+ #end
+ </div>
+ <div> </div>
+ <table id="report-results" border="0" cellspacing="0" cellpadding="0" class="table table-bordered table-striped">
+ <thead>
+ <tr>
+ <th>Time</th>
+ <th>Status</th>
+ <th>View Data</th>
+ <th>View Logs</th>
+ </tr>
+ </thead>
+ <tbody>
+ #if($executions)
+ #foreach($execution in $executions)
+ <tr>
+ <td>$utils.formatDateTime($execution.startTime)</td>
+ <td>${execution.status}</td>
+ <td><a href="${context}/reportal?view&id=${project.id}&execid=${execution.execId}">Data</a></td>
+ <td><a href="${context}/reportal?view&logs&id=${project.id}&execid=${execution.execId}">Logs</a></td>
+ </tr>
+ #end
+ #else
+ <tr>
+ <td colspan="4" style="text-align:center;">#if($page > 1)There are no reports in this page. #else This report has never been run.#end</td>
+ </tr>
+ #end
+ </tbody>
+ </table>
+#end
+#if($view-logs)
+ <table id="report-results" border="0" cellspacing="0" cellpadding="0" class="table table-bordered table-striped">
+ <thead>
+ <tr>
+ <th>Title</th>
+ <th>Status</th>
+ <th>View Log</th>
+ </tr>
+ </thead>
+ <tbody>
+ #if($logs)
+ #foreach($log in $logs)
+ <tr>
+ <td>${log.id}</td>
+ <td>${log.status}</td>
+ <td><a href="${context}/reportal?view&logs&id=${project.id}&execid=${execid}&log=${log.id}">Log</a></td>
+ </tr>
+ #end
+ #else
+ <tr>
+ <td colspan="2" style="text-align:center;">No log available.</td>
+ </tr>
+ #end
+ </tbody>
+ </table>
+#end
+#if($view-log)
+ <script>
+ var execId = ${execid};
+ var jobId = "${jobId}";
+ var projectId = ${project.id};
+ </script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal-data-log.js"></script>
+ <div id="jobLogView" class="logView" style="top:245px;">
+ <div style="text-align: center;">
+ <a class="btn btn-inverse" id="updateLogBtn" href="#">Refresh</a>
+ <a class="btn btn-inverse" id="toggleLineWrap" href="#">Toggle Line wrapping</a>
+ </div>
+ <div class="logViewer">
+ <pre id="logSection" class="log" style="background-color:#FFFFFF;">Loading log...</pre>
+ </div>
+ </div>
+#end
+#if($view-preview)
+ #if($files)
+ #foreach($file in $files)
+ #set($fileName = $file.get("name"))
+ <div>
+ <a download="$fileName" href="$context/reportal?view&id=${project.id}&execid=$execid&download=$fileName"><b>$fileName</b></a>
+ </div>
+
+ <table id="report-results" border="0" cellspacing="0" cellpadding="0" class="table table-bordered table-striped">
+ #foreach($line in $file.get("content"))
+ <tr>
+ #foreach($item in $line)
+ <td>$esc.html($item)</td>
+ #end
+ </tr>
+ #end
+
+ #if($file.get("hasMore"))
+ #set($numColumns = $file.get("content").get(0).size())
+ <tr>
+ <td colspan="$numColumns">...</td>
+ </tr>
+ #end
+ </table>
+ #end
+ #else
+ <div style="text-align:center;">No data available.</div>
+ #end
+#end
+ </div>
+ </div>
+ </div>
+ </body>
+</html>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
new file mode 100644
index 0000000..4993a78
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
@@ -0,0 +1,316 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Reportal</title>
+
+ <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+
+ <link href="${context}/css/jquery-ui.css" rel="stylesheet">
+ <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+
+ <script type="text/javascript" src="${context}/js/bootstrap.min.js"></script>
+ <link href="${context}/css/bootstrap-datetimepicker.css" rel="stylesheet">
+ <script type="text/javascript" src="${context}/js/moment.min.js"></script>
+ <script type="text/javascript" src="${context}/js/bootstrap-datetimepicker.min.js"></script>
+
+ <link href="${context}/reportal/css/codemirror.css" rel="stylesheet">
+ <link href="${context}/reportal/css/solarized.css" rel="stylesheet">
+ <script type="text/javascript" src="${context}/reportal/js/codemirror.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/mode/sql/sql.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/mode/pig/pig.js"></script>
+
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var startQueries = [
+ #if($queries)
+ #foreach($query in $queries)
+ #set($num = $velocityCount - 1)
+
+ ## Escape the query title and script strings as JavaScript strings before loading them.
+ #set($queryTitle = $esc.javascript(${query.title}))
+ #set($queryScript = $esc.javascript(${query.script}))
+ {
+ "num" : "${num}",
+ "title" : #if($queryTitle) "$queryTitle" #else "" #end,
+ "type" : "${query.type}",
+ "script" : #if($queryScript) "$queryScript" #else "" #end
+ },
+ #end
+ #end
+ ];
+ var startVariables = [
+ #if($variables)
+ #foreach($variable in $variables)
+ #set($num = $velocityCount - 1)
+ {
+ "num" : "${num}",
+ "title" : "${variable.title}",
+ "name" : "${variable.name}",
+ },
+ #end
+ #end
+ ];
+ </script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal-edit.js"></script>
+ <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+ </head>
+ <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+ #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+ <div class="content" style="margin-top: 41px;">
+ <div id="box-error">
+ #if($errorMsgs)
+ <div class="box-error-message">
+ #foreach($errorMsg in $errorMsgs)
+ #set($num = $velocityCount - 1)
+
+ #if($num > 0)
+ <br/>
+ #end
+
+ $esc.html($errorMsg)
+ #end
+ </div>
+ #else
+ #if($error_message != "null")
+ <div class="box-error-message">$esc.html($error_message)</div>
+ #elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+ #end
+ #end
+ </div>
+ <div> </div>
+ <form method="post" class="form-horizontal">
+ <div style="text-align: center;">
+ <a class="btn btn-primary" href="${context}/reportal">Reportal Home</a>
+#if($projectId)
+ <a class="btn btn-info" href="${context}/reportal?view&id=${projectId}">Report History</a>
+ <input type="submit" class="btn btn-primary" name="submit" value="Save"/>
+ <input type="submit" class="btn btn-success" name="submit" value="Save and Run"/>
+#end
+ </div>
+#set($title = $esc.html($title))
+ <div> </div>
+ <div class="container">
+ <input id="queryNumber" type="hidden" name="queryNumber" value="#if($queryNumber)${queryNumber}#{else}1#end">
+ <input id="variableNumber" type="hidden" name="variableNumber" value="#if($variableNumber)${variableNumber}#{else}1#end">
+ <fieldset>
+ <legend>Report</legend>
+ <div class="control-group required">
+ <label class="control-label">Title<abbr title="Required" class="required-mark">*</abbr></label>
+ <div class="controls"><input type="text" name="title"#if($title) value="$title"#end></div>
+ </div>
+ <div class="control-group required">
+ <label class="control-label">Description<abbr title="Required" class="required-mark">*</abbr></label>
+ <div class="controls"><textarea class="span8" name="description">${description}</textarea></div>
+ </div>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox">
+ <input name="render-results-as-html" type="checkbox"#if($renderResultsAsHtml) checked#end />Render results as HTML
+ </label>
+ </div>
+ </div>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox">
+ <input id="schedule-options" name="schedule" type="checkbox"#if($schedule) checked#end />Schedule
+ </label>
+ </div>
+ </div>
+ <div id="schedule-fields">
+ <div class="control-group">
+ <label for="schedule-time" class="control-label">Time</label>
+ <div id="schedule-time" class="controls">
+ #set($TWELVE=12)
+ #set($ZERO_ZERO="00")
+ <input name="schedule-hour" type="text" maxlength="2" value="#if($scheduleHour && $scheduleHour != "")${scheduleHour}#else$TWELVE#end" style="width:20px;"/>:<input name="schedule-minute" type="text" size="2" maxlength="2" value="#if($scheduleMinute && $scheduleMinute != "")${scheduleMinute}#else$ZERO_ZERO#end" style="width:20px;"/>
+ <select name="schedule-am_pm" style="width:60px;">
+ <option#if($scheduleAmPm=="am") selected#end>am</option>
+ <option#if($scheduleAmPm=="pm") selected#end>pm</option>
+ </select>
+ <select name="schedule-timezone" style="width:70px;">
+ <option#if($scheduleTimeZone==$timezone) selected#end>${timezone}</option>
+ <option#if($scheduleTimeZone=="UTC") selected#end>UTC</option>
+ </select>
+ </div>
+ </div>
+ <div class="control-group">
+ <label for="date" class="control-label">Date</label>
+ <div id="date" class="controls" style="position: relative">
+ <input type="text" id="schedule-date" name="schedule-date"#if($scheduleDate && $scheduleDate != "") value="$scheduleDate"#end/>
+ </div>
+ </div>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox">
+ <input id="schedule-repeat" name="schedule-repeat" type="checkbox"#if($scheduleRepeat) checked#end/>Repeat
+ </label>
+ </div>
+ </div>
+ <div id="schedule-repeat-fields">
+ <div class="control-group">
+ <label for="interval" class="control-label">Every</label>
+ <div id="interval" class="controls">
+ #set($ONE=1)
+ <input name="schedule-interval-quantity" type="text" maxlength="2" value="#if($scheduleIntervalQuantity)${scheduleIntervalQuantity}#else$ONE#end" style="width:20px;"/>
+ <select name="schedule-interval" style="width:100px;">
+ <option value="M"#if($scheduleInterval=="M") selected#end>Minute(s)</option>
+ <option value="h"#if($scheduleInterval=="h") selected#end>Hour(s)</option>
+ <option value="d"#if($scheduleInterval=="d") selected#end>Day(s)</option>
+ <option value="w"#if($scheduleInterval=="w") selected#end>Week(s)</option>
+ <option value="m"#if($scheduleInterval=="m") selected#end>Month(s)</option>
+ <option value="y"#if($scheduleInterval=="y") selected#end>Year(s)</option>
+ </select>
+ </div>
+ </div>
+
+ <div class="control-group" id='endScheduleId' data-max="$max_allowed_schedule_dates" data-default="$default_schedule_dates">
+ <label for="endSchedule" class="control-label"> <span style="color:#ff1059">End Schedule Time</span> </label>
+ <div id="interval_2" class="controls" style="position: relative">
+ <input type="text" id="end-schedule-date" name="end-schedule-date"#if($endScheduleDate && $endScheduleDate != "") value="$endScheduleDate"#end/>
+ </div>
+ <div id="expireNote" class="controls" data-end="$endScheduleDate">
+ </div>
+ </div>
+ </div>
+ </div>
+ </fieldset>
+ <fieldset>
+ <div id="variable-fields">
+ <legend>Variables</legend>
+ <ol id="variable-list" class="reportal-list">
+ </ol>
+ <ol id="variable-template" style="display:none;">
+ <li class="reportal-list-item">
+ <div class="query-actions" style="float: right;">
+ <a class="btn btn-icon btn-left bump-up" title="Move the variable up."><span class="icon icon-arrow-up icon-gray-light icon-medium"></span></a>
+ <a class="btn btn-icon btn-right bump-down disabled" title="Move the variable down."><span class="icon icon-arrow-down icon-gray-light icon-medium"></span></a>
+ <a class="btn btn-danger btn-icon delete" style="float: right; margin-left:5px;" title="Remove">Remove</a>
+ </div>
+ <div class="control-group required">
+ <label class="control-label">Title<abbr title="Required" class="required-mark">*</abbr></label>
+ <div class="controls"><input type="text" class="variabletitle" nametemplate="variable#title" value=""></div>
+ </div>
+ <div class="control-group required">
+ <label class="control-label">Name<abbr title="Required" class="required-mark">*</abbr></label>
+ <div class="controls"><input type="text" class="variablename" nametemplate="variable#name" value=""></div>
+ </div>
+ </li>
+ </ol>
+ <div class="control-group">
+ <label class="control-label"></label>
+ <div class="controls"><button id="buttonAddVariable" type="button" class="btn btn-success" value="Add another Variable"><span class="icon icon-plus-alt icon-white icon-small"></span>Add Another Variable</button></div>
+ </div>
+ </div>
+ </fieldset>
+ <legend>Queries</legend>
+ <fieldset>
+ <ol id="query-list" class="reportal-list">
+ </ol>
+ <ol id="query-template" style="display:none;">
+ <li class="reportal-list-item">
+ <div class="query-actions" style="float: right;">
+ <a class="btn btn-icon btn-left bump-up" title="Move the query up in execution order."><span class="icon icon-arrow-up icon-gray-light icon-medium"></span></a>
+ <a class="btn btn-icon btn-right bump-down disabled" title="Move the query down in execution order."><span class="icon icon-arrow-down icon-gray-light icon-medium"></span></a>
+ <a class="btn btn-danger btn-icon delete" style="float: right; margin-left:5px;" title="Remove">Remove</a>
+ </div>
+ <div class="control-group">
+ <label class="control-label">Title</label>
+ <div class="controls"><input type="text" class="querytitle" nametemplate="query#title" value="" maxlength="249"></div>
+ </div>
+ <div class="control-group required">
+ <label class="control-label">Type<abbr title="Required" class="required-mark">*</abbr></label>
+ <div class="controls">
+ <select class="querytype" nametemplate="query#type">
+ <option value="ReportalHive">Hive</option>
+ <option value="ReportalTeraData">Teradata</option>
+ <option value="ReportalPig">Pig</option>
+ </select>
+ </div>
+ </div>
+ <div class="control-group">
+ <label class="control-label">Script</label>
+ <div class="controls"><textarea class="span8 queryscript" nametemplate="query#script"></textarea></div>
+ </div>
+ </li>
+ </ol>
+ <div class="control-group">
+ <label class="control-label"></label>
+ <div class="controls"><button id="buttonAddQuery" type="button" class="btn btn-success" value="Add another Query"><span class="icon icon-plus-alt icon-white icon-small"></span>Add Another Query</button></div>
+ </div>
+ </fieldset>
+ <fieldset>
+ <legend>Access</legend>
+ <div class="control-group">
+ <label class="control-label">Viewers</label>
+ <div class="controls">
+ <textarea class="span8" name="access-viewer">$accessViewer</textarea>
+ <br/>Separate usernames (e.g.: jdoe) by commas, spaces, or semicolons. If left blank, the runs of this report are viewable by everyone.
+ </div>
+ </div>
+ <div class="control-group">
+ <label class="control-label">Executors</label>
+ <div class="controls">
+ <textarea class="span8" name="access-executor">$accessExecutor</textarea>
+ <br/>Separate usernames (e.g.: jdoe) by commas, spaces, or semicolons. If left blank, anyone can execute this report.
+ </div>
+ </div>
+ <div class="control-group">
+ <label class="control-label">Owners</label>
+ <div class="controls">
+ <textarea class="span8" name="access-owner">$accessOwner</textarea>
+ <br/>Separate usernames (e.g.: jdoe) by commas, spaces, or semicolons.
+ </div>
+ </div>
+ </fieldset>
+ <fieldset>
+ <legend>Notification</legend>
+ <div class="control-group">
+ <label class="control-label">Success Emails</label>
+ <div class="controls">
+ <textarea class="span8" name="notifications">$notifications</textarea>
+ <br/>Separate emails (e.g.: jdoe@example.com) by commas, spaces, or semicolons. Note: If the report returns no results, no email is sent, except for unscheduled runs.
+ </div>
+ </div>
+ <div class="control-group">
+ <label class="control-label">Failure Emails</label>
+ <div class="controls">
+ <textarea class="span8" name="failure-notifications">$failureNotifications</textarea>
+ <br/>Separate emails (e.g.: jdoe@example.com) by commas, spaces, or semicolons.
+ </div>
+ </div>
+ </fieldset>
+ <div class="form-actions">
+ <input type="submit" class="btn btn-primary" name="submit" value="Save">
+ <input type="submit" class="btn btn-success" name="submit" value="Save and Run"/>
+ <a href="${context}/reportal" class="btn">Cancel</a>
+ </div>
+ </div>
+ </form>
+ </div>
+ </body>
+</html>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportallistpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportallistpage.vm
new file mode 100644
index 0000000..0e0704e
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportallistpage.vm
@@ -0,0 +1,147 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Reportal</title>
+ <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+ <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/bootstrap.min.js"></script>
+
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var homeDir = "${homedir}";
+ </script>
+ <script type="text/javascript">
+ var reportals = [
+#if($projects)
+ #foreach($project in $projects)
+ #set($title = $esc.javascript(${project.getMetadata().title}))
+ {
+ "id" : "${project.id}",
+ "title" : "$title",
+ "time" : $project.createTimestamp,
+ "timeText" : "$utils.formatDateTime($project.createTimestamp)",
+ "user" : "$project.getMetadata().get("accessOwner")",
+ "scheduled" : $ReportalHelper.isScheduledProject($project),
+ "scheduledRepeating" : $ReportalHelper.isScheduledRepeatingProject($project),
+ "bookmark" : $ReportalHelper.isBookmarkProject($project, $user),
+ "subscribe" : $ReportalHelper.isSubscribeProject($project, $user),
+ "shown" : true
+ },
+ #end
+#end
+ ];
+ </script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal-list.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/jquery.tablesorter.min.js"></script>
+ <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+ </head>
+ <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+ #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+ <div class="content" style="margin-top: 41px;">
+ <div id="box-error">
+ #if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+ #else
+ #if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+ #elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+ #end
+ #end
+ </div>
+ <div> </div>
+ <div class="container-fluid" style"font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif;">
+ <div class="row-fluid">
+ <div class="span2">
+ <div class="well">
+ <p style="margin-bottom:20px;"><a href="${context}/reportal?new" class="btn btn-success">Create Report</a></p>
+ <form method="get" accept-charset="utf-8" class="form-inline" id="search-facet-form">
+ <label class="input-list-label">Filter Reports</label>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox"><input id="facet_bookmarked" type="checkbox">Bookmarked</label>
+ </div>
+ </div>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox"><input id="facet_subscribed" type="checkbox">Subscribed</label>
+ </div>
+ </div>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox"><input id="facet_date_created" type="checkbox">Date Created</label>
+ </div>
+ </div>
+ <div class="facet-input">
+ <input class="span2 hasDatepicker" id="date_created_from" name="date_created_from" type="text" value="${startDate}" style="width:130px;">
+ to
+ <input class="span2 hasDatepicker" id="date_created_to" name="date_created_to" type="text" value="${endDate}" style="width:130px;">
+ </div>
+ <div class="control-group">
+ <div class="controls">
+ <label class="checkbox"><input id="facet_owner" type="checkbox" checked="checked">Owner</label>
+ </div>
+ </div>
+ <div class="facet-input">
+ <input class="span2" id="owner" type="text" value="$userid" style="width:130px;">
+ </div>
+ </form>
+ </div>
+ </div>
+ <div class="span10">
+ <table id="reportalTable" class="table table-bordered table-striped">
+ <thead>
+ <tr>
+ <th>Title</th>
+ <th>Date Created</th>
+ <th>Owner</th>
+ </tr>
+ </thead>
+ <tbody>
+ </tbody>
+ </table>
+ </div>
+ <div id="action-template" style="display:none;">
+ <div class="report-actions">
+ <a href="" title="Toggle bookmarked" class="btn btn-icon btn-left button-bookmark"><span class="icon icon-bookmark icon-gray-light icon-small"></span></a><a href="" title="Toggle subscription" class="btn btn-icon btn-middle button-subscribe"><span class="icon icon-mail icon-gray-light icon-small"></span></a><div class="btn-dropdown"><a class="btn btn-icon btn-right dropdown-toggle"><span class="icon icon-cog icon-gray-light icon-small"></span></a>
+ <ul class="dropdown-menu">
+ <li><a class="button-view" href="">View</a></li>
+ <li><a class="button-edit" href="">Edit</a></li>
+ <li class="divider"></li>
+ <li><a class="button-run" href="">Run</a></li>
+ <li><a class="button-delete" href="">Delete</a></li>
+ </ul>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+ </body>
+</html>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalnavigation.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalnavigation.vm
new file mode 100644
index 0000000..e22a53d
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalnavigation.vm
@@ -0,0 +1,30 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<div class="topbar navbar navbar-fixed-top navbar-inverse">
+ <div class="navbar-inner">
+ <div class="container-fluid">
+ <div class="topbar-content">
+ <a class="brand" href="${context}/reportal">Reportal</a>
+ <div class="login pull-right">
+ <p class="navbar-text">
+ Logged in as $userid | <a href="${context}/reportal?logout">Sign out</a>
+ </p>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalrunpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalrunpage.vm
new file mode 100644
index 0000000..73c250c
--- /dev/null
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportalrunpage.vm
@@ -0,0 +1,99 @@
+#*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Reportal</title>
+ <link href="${context}/reportal/css/bootstrap.min.css" rel="stylesheet">
+ <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ </script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal.js"></script>
+ <script type="text/javascript" src="${context}/reportal/js/reportal-run.js"></script>
+ <link href="${context}/reportal/css/reportal.css" rel="stylesheet">
+ </head>
+ <body>
+#set($current_page="$viewerName")
+#if(!$hideNavigation)
+ #parse("azkaban/webapp/servlet/velocity/nav.vm")
+#end
+
+#parse("azkaban/viewer/reportal/reportalnavigation.vm")
+
+ <div class="content" style="margin-top: 41px;">
+ <div id="box-error">
+ #if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+ #else
+ #if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+ #elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+ #end
+ #end
+ </div>
+ <div> </div>
+ <div style="text-align: center;">
+ <a class="btn btn-info" href="${context}/reportal">Reportal Home</a>
+#if($projectId)
+ <a class="btn btn-primary" href="${context}/reportal?view&id=$projectId">Report History</a>
+ <a class="btn btn-warning" href="${context}/reportal?edit&id=$projectId">Edit</a>
+#end
+ </div>
+#if($projectId)
+ <div> </div>
+ <div class="container">
+ <div>
+ <form method="post" class="form-horizontal">
+ <input type="hidden" name="id" value="$projectId">
+ <fieldset>
+ <legend>Report</legend>
+ <div class="control-group">
+ <label class="control-label">Title</label>
+ <div class="controls" style="padding-top:5px;">${title}</div>
+ </div>
+ <div class="control-group">
+ <label class="control-label">Description</label>
+ <div class="controls" style="padding-top:5px;">${description}</div>
+ </div>
+ </fieldset>
+ #if($variables)
+ <legend>Variables</legend>
+ <fieldset>
+ #foreach($variable in $variables)
+ #set($num = $velocityCount - 1)
+ <div class="control-group">
+ <label class="control-label">${variable.title}</label>
+ <div class="controls"><input type="text" name="variable${num}" value=""></div>
+ </div>
+ #end
+ </fieldset>
+ #end
+ <div class="form-actions">
+ <input id="run-button" type="submit" class="btn btn-primary" value="Run">
+ <input id="test-run-button" type="submit" class="btn btn-info" value="Test Run (only email executor)">
+ <a href="${context}/reportal" class="btn">Cancel</a>
+ </div>
+ <form>
+ </div>
+ </div>
+#end
+ </div>
+ </body>
+</html>
build.gradle 3(+3 -0)
diff --git a/build.gradle b/build.gradle
index 60eea52..9f33f3f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,6 +54,7 @@ allprojects {
ext.versions = [
hadoop: '2.6.1',
hive : '1.1.0',
+ pig : '0.15.0',
restli: '1.15.7',
slf4j : '1.7.18',
]
@@ -78,6 +79,7 @@ ext.deps = [
hadoopHdfs : "org.apache.hadoop:hadoop-hdfs:" + versions.hadoop,
hadoopMRClientCommon: "org.apache.hadoop:hadoop-mapreduce-client-common:" + versions.hadoop,
hadoopMRClientCore : "org.apache.hadoop:hadoop-mapreduce-client-core:" + versions.hadoop,
+ hiveCli : "org.apache.hive:hive-cli:" + versions.hive,
hiveExecCore : "org.apache.hive:hive-exec:" + versions.hive + ":core",
hiveMetastore : "org.apache.hive:hive-metastore:" + versions.hive,
httpclient : 'org.apache.httpcomponents:httpclient:4.5.3',
@@ -99,6 +101,7 @@ ext.deps = [
metricsJvm : 'io.dropwizard.metrics:metrics-jvm:3.1.0',
mockito : 'org.mockito:mockito-core:2.10.0',
mysqlConnector : 'mysql:mysql-connector-java:5.1.28',
+ pig : 'org.apache.pig:pig:' + versions.pig,
quartz : 'org.quartz-scheduler:quartz:2.2.1',
restliGenerator : 'com.linkedin.pegasus:generator:' + versions.restli,
restliServer : 'com.linkedin.pegasus:restli-server:' + versions.restli,
settings.gradle 2(+2 -0)
diff --git a/settings.gradle b/settings.gradle
index dae347c..def0782 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -27,3 +27,5 @@ include 'azkaban-solo-server'
include 'azkaban-web-server'
include 'az-flow-trigger-dependency-plugin'
include 'test'
+include 'az-reportal'
+