azkaban-developers

Reportal presto jobtype (#1929) This PR introduces presto

8/30/2018 7:43:58 PM
3.54.0

Details

diff --git a/az-reportal/build.gradle b/az-reportal/build.gradle
index 2e3cd0a..f06f857 100644
--- a/az-reportal/build.gradle
+++ b/az-reportal/build.gradle
@@ -16,16 +16,19 @@
 
 apply plugin: 'distribution'
 
+
 dependencies {
     compile project(':az-core')
-    compile project(":azkaban-common")
-    compile project(":azkaban-web-server")
-    compile project(":azkaban-hadoop-security-plugin")
+    compileOnly project(":azkaban-web-server")
+    compileOnly project(":azkaban-common")
+    compileOnly project(":azkaban-hadoop-security-plugin")
+    compile project(':az-crypto')
 
+    compileOnly deps.bcprov
     compileOnly deps.hadoopCommon
     compileOnly deps.hadoopMRClientCommon
     compileOnly deps.hadoopMRClientCore
-    compileOnly (deps.hiveCli) {
+    compileOnly(deps.hiveCli) {
         transitive = false
     }
     compileOnly deps.hiveMetastore
@@ -47,3 +50,16 @@ distributions {
         }
     }
 }
+
+distributions {
+    main {
+        contents {
+            from(configurations.runtime) {
+                into 'lib'
+            }
+            from(jar) {
+                into 'lib'
+            }
+        }
+    }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalPrestoRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalPrestoRunner.java
new file mode 100644
index 0000000..32579ce
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalPrestoRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.crypto.Decryptions;
+import azkaban.reportal.util.ReportalUtil;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+public class ReportalPrestoRunner extends ReportalAbstractRunner {
+
+  public static final String JDBC_DRIVER_KEY = "presto.driver";
+  public static final String PRESTO_USER = "presto.driver.user";
+  public static final String DRIVER_URL = "presto.driver.jdbc.url";
+  private static final String PRESTO_DRIVER_PROP_PREFIX = "presto.driver.";
+  private static final String IMPERSONATED_USER_KEY = "presto.execute.user";
+
+  public ReportalPrestoRunner(final String jobName, final Properties props) {
+    super(props);
+
+    Preconditions.checkArgument(props.containsKey(JDBC_DRIVER_KEY), "missing " + JDBC_DRIVER_KEY);
+    Preconditions.checkArgument(props.containsKey(PRESTO_USER), "missing " + PRESTO_USER);
+    Preconditions.checkArgument(props.containsKey(DRIVER_URL), "missing " + DRIVER_URL);
+  }
+
+  private String decrypt(final String encrypted, final String keyPath) throws IOException {
+    final FileSystem fs = FileSystem.get(URI.create("file:///"), new Configuration());
+    return new Decryptions()
+        .decrypt(encrypted, keyPath, fs);
+  }
+
+  private Properties getProperties() throws IOException {
+    final Properties connProperties = new Properties();
+    final Map<String, String> prestoProps = this.props.getMapByPrefix(PRESTO_DRIVER_PROP_PREFIX);
+
+    for (final Entry<String, String> entry : prestoProps.entrySet()) {
+      final String key = entry.getKey();
+      final String value = entry.getValue();
+      if (key.contains("encrypted.")) {
+        // if props contains "encrypted." then decrypted it with key file and put the decrypted
+        // value into jdbc connection props
+        //"encrypted.password" => "password"
+        final String jdbcProp = key.replaceFirst("encrypted.", "");
+        connProperties.put(jdbcProp, decrypt(value, prestoProps.get("jdbc.crypto.key.path")));
+      } else if (!key.equals("jdbc.url") && !key.equals("jdbc.crypto.key.path")) {
+        connProperties.put(key, value);
+      }
+    }
+
+    return connProperties;
+  }
+
+  private Connection getConnection(final String jdbcUrl, final String userToProxy) {
+    try {
+      Class.forName(this.props.get(JDBC_DRIVER_KEY));
+      final Properties connProperties = getProperties();
+      connProperties.put(IMPERSONATED_USER_KEY, userToProxy);
+      final Connection conn = DriverManager.getConnection(jdbcUrl, connProperties);
+      return conn;
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void runReportal() throws Exception {
+    final Connection conn = getConnection(this.props.get(PRESTO_DRIVER_PROP_PREFIX + "jdbc.url"),
+        this.proxyUser);
+    final Statement statement = conn.createStatement();
+    try {
+      statement.execute(this.jobQuery);
+      ReportalUtil.outputQueryResult(statement.getResultSet(), this.outputStream);
+    } finally {
+      statement.close();
+      conn.close();
+    }
+  }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
index 81b0e5e..174d9c4 100644
--- a/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
@@ -18,8 +18,7 @@ package azkaban.jobtype;
 
 import azkaban.flow.CommonJobProperties;
 import azkaban.reportal.util.CompositeException;
-import java.io.OutputStream;
-import java.io.PrintStream;
+import azkaban.reportal.util.ReportalUtil;
 import java.net.URI;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -96,7 +95,7 @@ public class ReportalTeradataRunner extends ReportalAbstractRunner {
           final PreparedStatement stmt = prepareStatement(conn, queryLine);
           stmt.execute();
           final ResultSet rs = stmt.getResultSet();
-          outputQueryResult(rs, this.outputStream);
+          ReportalUtil.outputQueryResult(rs, this.outputStream);
           stmt.close();
         } else {
           try {
@@ -169,56 +168,6 @@ public class ReportalTeradataRunner extends ReportalAbstractRunner {
     return null;
   }
 
-  private void outputQueryResult(final ResultSet result, final OutputStream outputStream)
-      throws SQLException {
-    final PrintStream outFile = new PrintStream(outputStream);
-    final String delim = ",";
-    boolean isHeaderPending = true;
-    if (result != null) {
-      while (result.next()) {
-        final int numColumns = result.getMetaData().getColumnCount();
-        final StringBuilder dataString = new StringBuilder();
-
-        if (isHeaderPending) {
-          final StringBuilder headerString = new StringBuilder();
-          for (int j = 1; j <= numColumns; j++) {
-            final String colName = formatValue(result.getMetaData().getColumnName(j));
-            if (j > 1) {
-              headerString.append(delim).append(colName);
-            } else {
-              headerString.append(colName);
-            }
-          }
-          isHeaderPending = false;
-          outFile.println(headerString.toString());
-        }
-
-        for (int j = 1; j <= numColumns; j++) {
-          String colVal = result.getString(j);
-
-          if (colVal == null) {
-            colVal = "\"null\"";
-          } else {
-            colVal = formatValue(colVal);
-          }
-
-          if (j > 1) {
-            dataString.append(delim).append(colVal);
-          } else {
-            dataString.append(colVal);
-          }
-        }
-
-        outFile.println(dataString.toString());
-      }
-    }
-    outFile.close();
-  }
-
-  private String formatValue(final String value) {
-    return "\"" + value.replace("\"", "") + "\"";
-  }
-
   private PreparedStatement prepareStatement(final Connection conn, String line)
       throws SQLException {
     line = injectVariables(line);
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
index 104616a..b2c26b3 100644
--- a/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
@@ -19,6 +19,10 @@ package azkaban.reportal.util;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.reportal.util.Reportal.Variable;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -38,9 +42,6 @@ public class ReportalUtil {
   /**
    * Returns a list of the executable nodes in the specified flow in execution
    * order. Assumes that the flow is linear.
-   *
-   * @param nodes
-   * @return
    */
   public static List<ExecutableNode> sortExecutableNodes(final ExecutableFlow flow) {
     final List<ExecutableNode> sortedNodes = new ArrayList<>();
@@ -65,15 +66,12 @@ public class ReportalUtil {
   /**
    * Get runtime variables to be set in unscheduled mode of execution.
    * Returns empty list, if no runtime variable is found
-   *
-   * @param variables
-   * @return
    */
   public static List<Variable> getRunTimeVariables(
       final Collection<Variable> variables) {
     final List<Variable> runtimeVariables =
-      ReportalUtil.getVariablesByRegex(variables,
-        Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);
+        ReportalUtil.getVariablesByRegex(variables,
+            Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);
 
     return runtimeVariables;
   }
@@ -81,10 +79,6 @@ public class ReportalUtil {
   /**
    * Shortlist variables which match a given regex. Returns empty empty list, if no
    * eligible variable is found
-   *
-   * @param variables
-   * @param regex
-   * @return
    */
   public static List<Variable> getVariablesByRegex(
       final Collection<Variable> variables, final String regex) {
@@ -103,22 +97,69 @@ public class ReportalUtil {
    * Shortlist variables which match a given prefix. Returns empty map, if no
    * eligible variable is found.
    *
-   * @param variables
-   *          variables to be processed
-   * @param prefix
-   *          prefix to be matched
+   * @param variables variables to be processed
+   * @param prefix prefix to be matched
    * @return a map with shortlisted variables and prefix removed
    */
   public static Map<String, String> getVariableMapByPrefix(
       final Collection<Variable> variables, final String prefix) {
     final Map<String, String> shortlistMap = new HashMap<>();
-    if (variables!=null && prefix != null) {
+    if (variables != null && prefix != null) {
       for (final Variable var : getVariablesByRegex(variables,
-        Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
+          Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
         shortlistMap
-          .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
+            .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
       }
     }
     return shortlistMap;
   }
+
+  private static String formatValue(final String value) {
+    return "\"" + value.replace("\"", "") + "\"";
+  }
+
+  public static void outputQueryResult(final ResultSet result, final OutputStream outputStream)
+      throws SQLException {
+    final PrintStream outFile = new PrintStream(outputStream);
+    final String delim = ",";
+    boolean isHeaderPending = true;
+    if (result != null) {
+      while (result.next()) {
+        final int numColumns = result.getMetaData().getColumnCount();
+        final StringBuilder dataString = new StringBuilder();
+
+        if (isHeaderPending) {
+          final StringBuilder headerString = new StringBuilder();
+          for (int j = 1; j <= numColumns; j++) {
+            final String colName = formatValue(result.getMetaData().getColumnName(j));
+            if (j > 1) {
+              headerString.append(delim).append(colName);
+            } else {
+              headerString.append(colName);
+            }
+          }
+          isHeaderPending = false;
+          outFile.println(headerString.toString());
+        }
+
+        for (int j = 1; j <= numColumns; j++) {
+          String colVal = result.getString(j);
+          if (colVal == null) {
+            colVal = "\"null\"";
+          } else {
+            colVal = formatValue(colVal);
+          }
+
+          if (j > 1) {
+            dataString.append(delim).append(colVal);
+          } else {
+            dataString.append(colVal);
+          }
+        }
+
+        outFile.println(dataString.toString());
+      }
+    }
+    outFile.close();
+  }
 }
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
index a579a78..b01e9d4 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
@@ -903,7 +903,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
       // Validate if the session user (who interact with UI) is part of specified user.to.proxy
       // user. If not, reportal can not be saved and warn users.
       if (variable.title.equals("reportal.config.user.to.proxy")) {
-        String userToProxy = variable.name;
+        final String userToProxy = variable.name;
         final UserManager userManager = getApplication().getUserManager();
         if (!userManager.validateProxyUser(userToProxy, user)) {
           errors.add("User " + user.getUserId() + " has no permission to add " + userToProxy
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
index 1fe7e35..3555c59 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
@@ -57,10 +57,12 @@ public enum ReportalType {
       propertiesFile.put("reportal.pig.script", "res/" + jobName + ".pig");
     }
   },
+
+
   HiveJob("ReportalHive", "reportalhive", "hadoop"), TeraDataJob(
       "ReportalTeraData", "reportalteradata", "teradata"),
   TableauJob("ReportalTableau", "reportaltableau", "hadoop"),
-  DataCollectorJob(
+  PrestoJob("ReportalPresto", "reportalpresto", "hadoop"), DataCollectorJob(
       ReportalTypeManager.DATA_COLLECTOR_JOB, ReportalTypeManager.DATA_COLLECTOR_JOB_TYPE, "") {
     @Override
     public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
@@ -96,7 +98,6 @@ public enum ReportalType {
 
   public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
       final File jobFile, final String jobName, final String queryScript, final String proxyUser) {
-
   }
 
   public String getJobTypeName() {
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
index a40c484..cc76afd 100644
--- a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
@@ -300,6 +300,7 @@
                   <option id="ReportalHive" value="ReportalHive">Hive</option>
                   <option value="ReportalTeraData">Teradata</option>
                   <option value="ReportalPig">Pig</option>
+                  <option value="ReportalPresto">Presto</option>
                   <option value="ReportalTableau">Tableau</option>
                 </select>
               </div>
@@ -378,4 +379,4 @@
   </form>
 </div>
 </body>
-</html>
+</html>
\ No newline at end of file
diff --git a/az-reportal/src/test/java/azkaban/reportal/ReportalPrestoRunnerTest.java b/az-reportal/src/test/java/azkaban/reportal/ReportalPrestoRunnerTest.java
new file mode 100644
index 0000000..e6f1b4c
--- /dev/null
+++ b/az-reportal/src/test/java/azkaban/reportal/ReportalPrestoRunnerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal;
+
+import static org.assertj.core.api.Java6Assertions.assertThatThrownBy;
+
+import azkaban.jobtype.ReportalPrestoRunner;
+import java.util.Properties;
+import org.junit.Test;
+
+public class ReportalPrestoRunnerTest {
+
+  @Test
+  public void testNoDriverURL() throws InterruptedException {
+    final Properties props = new Properties();
+    props.put(ReportalPrestoRunner.DRIVER_URL, "test");
+    assertThatThrownBy(() -> {
+      new ReportalPrestoRunner("presto", props);
+    }).isInstanceOf
+        (IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testNoPrestoUser() throws InterruptedException {
+    final Properties props = new Properties();
+    props.put(ReportalPrestoRunner.PRESTO_USER, "test");
+    assertThatThrownBy(() -> {
+      new ReportalPrestoRunner("presto", props);
+    }).isInstanceOf
+        (IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testNoJdbcDriverKey() throws InterruptedException {
+    final Properties props = new Properties();
+    props.put(ReportalPrestoRunner.JDBC_DRIVER_KEY, "test");
+    assertThatThrownBy(() -> {
+      new ReportalPrestoRunner("presto", props);
+    }).isInstanceOf
+        (IllegalArgumentException.class);
+  }
+}