azkaban-developers
Changes
az-reportal/build.gradle 24(+20 -4)
Details
az-reportal/build.gradle 24(+20 -4)
diff --git a/az-reportal/build.gradle b/az-reportal/build.gradle
index 2e3cd0a..f06f857 100644
--- a/az-reportal/build.gradle
+++ b/az-reportal/build.gradle
@@ -16,16 +16,19 @@
apply plugin: 'distribution'
+
dependencies {
compile project(':az-core')
- compile project(":azkaban-common")
- compile project(":azkaban-web-server")
- compile project(":azkaban-hadoop-security-plugin")
+ compileOnly project(":azkaban-web-server")
+ compileOnly project(":azkaban-common")
+ compileOnly project(":azkaban-hadoop-security-plugin")
+ compile project(':az-crypto')
+ compileOnly deps.bcprov
compileOnly deps.hadoopCommon
compileOnly deps.hadoopMRClientCommon
compileOnly deps.hadoopMRClientCore
- compileOnly (deps.hiveCli) {
+ compileOnly(deps.hiveCli) {
transitive = false
}
compileOnly deps.hiveMetastore
@@ -47,3 +50,16 @@ distributions {
}
}
}
+
+distributions {
+ main {
+ contents {
+ from(configurations.runtime) {
+ into 'lib'
+ }
+ from(jar) {
+ into 'lib'
+ }
+ }
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalPrestoRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalPrestoRunner.java
new file mode 100644
index 0000000..32579ce
--- /dev/null
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalPrestoRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import azkaban.crypto.Decryptions;
+import azkaban.reportal.util.ReportalUtil;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+public class ReportalPrestoRunner extends ReportalAbstractRunner {
+
+ public static final String JDBC_DRIVER_KEY = "presto.driver";
+ public static final String PRESTO_USER = "presto.driver.user";
+ public static final String DRIVER_URL = "presto.driver.jdbc.url";
+ private static final String PRESTO_DRIVER_PROP_PREFIX = "presto.driver.";
+ private static final String IMPERSONATED_USER_KEY = "presto.execute.user";
+
+ public ReportalPrestoRunner(final String jobName, final Properties props) {
+ super(props);
+
+ Preconditions.checkArgument(props.containsKey(JDBC_DRIVER_KEY), "missing " + JDBC_DRIVER_KEY);
+ Preconditions.checkArgument(props.containsKey(PRESTO_USER), "missing " + PRESTO_USER);
+ Preconditions.checkArgument(props.containsKey(DRIVER_URL), "missing " + DRIVER_URL);
+ }
+
+ private String decrypt(final String encrypted, final String keyPath) throws IOException {
+ final FileSystem fs = FileSystem.get(URI.create("file:///"), new Configuration());
+ return new Decryptions()
+ .decrypt(encrypted, keyPath, fs);
+ }
+
+ private Properties getProperties() throws IOException {
+ final Properties connProperties = new Properties();
+ final Map<String, String> prestoProps = this.props.getMapByPrefix(PRESTO_DRIVER_PROP_PREFIX);
+
+ for (final Entry<String, String> entry : prestoProps.entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+ if (key.contains("encrypted.")) {
+ // if props contains "encrypted." then decrypted it with key file and put the decrypted
+ // value into jdbc connection props
+ //"encrypted.password" => "password"
+ final String jdbcProp = key.replaceFirst("encrypted.", "");
+ connProperties.put(jdbcProp, decrypt(value, prestoProps.get("jdbc.crypto.key.path")));
+ } else if (!key.equals("jdbc.url") && !key.equals("jdbc.crypto.key.path")) {
+ connProperties.put(key, value);
+ }
+ }
+
+ return connProperties;
+ }
+
+ private Connection getConnection(final String jdbcUrl, final String userToProxy) {
+ try {
+ Class.forName(this.props.get(JDBC_DRIVER_KEY));
+ final Properties connProperties = getProperties();
+ connProperties.put(IMPERSONATED_USER_KEY, userToProxy);
+ final Connection conn = DriverManager.getConnection(jdbcUrl, connProperties);
+ return conn;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void runReportal() throws Exception {
+ final Connection conn = getConnection(this.props.get(PRESTO_DRIVER_PROP_PREFIX + "jdbc.url"),
+ this.proxyUser);
+ final Statement statement = conn.createStatement();
+ try {
+ statement.execute(this.jobQuery);
+ ReportalUtil.outputQueryResult(statement.getResultSet(), this.outputStream);
+ } finally {
+ statement.close();
+ conn.close();
+ }
+ }
+}
diff --git a/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
index 81b0e5e..174d9c4 100644
--- a/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
+++ b/az-reportal/src/main/java/azkaban/jobtype/ReportalTeradataRunner.java
@@ -18,8 +18,7 @@ package azkaban.jobtype;
import azkaban.flow.CommonJobProperties;
import azkaban.reportal.util.CompositeException;
-import java.io.OutputStream;
-import java.io.PrintStream;
+import azkaban.reportal.util.ReportalUtil;
import java.net.URI;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -96,7 +95,7 @@ public class ReportalTeradataRunner extends ReportalAbstractRunner {
final PreparedStatement stmt = prepareStatement(conn, queryLine);
stmt.execute();
final ResultSet rs = stmt.getResultSet();
- outputQueryResult(rs, this.outputStream);
+ ReportalUtil.outputQueryResult(rs, this.outputStream);
stmt.close();
} else {
try {
@@ -169,56 +168,6 @@ public class ReportalTeradataRunner extends ReportalAbstractRunner {
return null;
}
- private void outputQueryResult(final ResultSet result, final OutputStream outputStream)
- throws SQLException {
- final PrintStream outFile = new PrintStream(outputStream);
- final String delim = ",";
- boolean isHeaderPending = true;
- if (result != null) {
- while (result.next()) {
- final int numColumns = result.getMetaData().getColumnCount();
- final StringBuilder dataString = new StringBuilder();
-
- if (isHeaderPending) {
- final StringBuilder headerString = new StringBuilder();
- for (int j = 1; j <= numColumns; j++) {
- final String colName = formatValue(result.getMetaData().getColumnName(j));
- if (j > 1) {
- headerString.append(delim).append(colName);
- } else {
- headerString.append(colName);
- }
- }
- isHeaderPending = false;
- outFile.println(headerString.toString());
- }
-
- for (int j = 1; j <= numColumns; j++) {
- String colVal = result.getString(j);
-
- if (colVal == null) {
- colVal = "\"null\"";
- } else {
- colVal = formatValue(colVal);
- }
-
- if (j > 1) {
- dataString.append(delim).append(colVal);
- } else {
- dataString.append(colVal);
- }
- }
-
- outFile.println(dataString.toString());
- }
- }
- outFile.close();
- }
-
- private String formatValue(final String value) {
- return "\"" + value.replace("\"", "") + "\"";
- }
-
private PreparedStatement prepareStatement(final Connection conn, String line)
throws SQLException {
line = injectVariables(line);
diff --git a/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
index 104616a..b2c26b3 100644
--- a/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
+++ b/az-reportal/src/main/java/azkaban/reportal/util/ReportalUtil.java
@@ -19,6 +19,10 @@ package azkaban.reportal.util;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.reportal.util.Reportal.Variable;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -38,9 +42,6 @@ public class ReportalUtil {
/**
* Returns a list of the executable nodes in the specified flow in execution
* order. Assumes that the flow is linear.
- *
- * @param nodes
- * @return
*/
public static List<ExecutableNode> sortExecutableNodes(final ExecutableFlow flow) {
final List<ExecutableNode> sortedNodes = new ArrayList<>();
@@ -65,15 +66,12 @@ public class ReportalUtil {
/**
* Get runtime variables to be set in unscheduled mode of execution.
* Returns empty list, if no runtime variable is found
- *
- * @param variables
- * @return
*/
public static List<Variable> getRunTimeVariables(
final Collection<Variable> variables) {
final List<Variable> runtimeVariables =
- ReportalUtil.getVariablesByRegex(variables,
- Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);
+ ReportalUtil.getVariablesByRegex(variables,
+ Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);
return runtimeVariables;
}
@@ -81,10 +79,6 @@ public class ReportalUtil {
/**
* Shortlist variables which match a given regex. Returns empty empty list, if no
* eligible variable is found
- *
- * @param variables
- * @param regex
- * @return
*/
public static List<Variable> getVariablesByRegex(
final Collection<Variable> variables, final String regex) {
@@ -103,22 +97,69 @@ public class ReportalUtil {
* Shortlist variables which match a given prefix. Returns empty map, if no
* eligible variable is found.
*
- * @param variables
- * variables to be processed
- * @param prefix
- * prefix to be matched
+ * @param variables variables to be processed
+ * @param prefix prefix to be matched
* @return a map with shortlisted variables and prefix removed
*/
public static Map<String, String> getVariableMapByPrefix(
final Collection<Variable> variables, final String prefix) {
final Map<String, String> shortlistMap = new HashMap<>();
- if (variables!=null && prefix != null) {
+ if (variables != null && prefix != null) {
for (final Variable var : getVariablesByRegex(variables,
- Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
+ Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
shortlistMap
- .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
+ .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
}
}
return shortlistMap;
}
+
+ private static String formatValue(final String value) {
+ return "\"" + value.replace("\"", "") + "\"";
+ }
+
+ public static void outputQueryResult(final ResultSet result, final OutputStream outputStream)
+ throws SQLException {
+ final PrintStream outFile = new PrintStream(outputStream);
+ final String delim = ",";
+ boolean isHeaderPending = true;
+ if (result != null) {
+ while (result.next()) {
+ final int numColumns = result.getMetaData().getColumnCount();
+ final StringBuilder dataString = new StringBuilder();
+
+ if (isHeaderPending) {
+ final StringBuilder headerString = new StringBuilder();
+ for (int j = 1; j <= numColumns; j++) {
+ final String colName = formatValue(result.getMetaData().getColumnName(j));
+ if (j > 1) {
+ headerString.append(delim).append(colName);
+ } else {
+ headerString.append(colName);
+ }
+ }
+ isHeaderPending = false;
+ outFile.println(headerString.toString());
+ }
+
+ for (int j = 1; j <= numColumns; j++) {
+ String colVal = result.getString(j);
+ if (colVal == null) {
+ colVal = "\"null\"";
+ } else {
+ colVal = formatValue(colVal);
+ }
+
+ if (j > 1) {
+ dataString.append(delim).append(colVal);
+ } else {
+ dataString.append(colVal);
+ }
+ }
+
+ outFile.println(dataString.toString());
+ }
+ }
+ outFile.close();
+ }
}
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
index a579a78..b01e9d4 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalServlet.java
@@ -903,7 +903,7 @@ public class ReportalServlet extends LoginAbstractAzkabanServlet {
// Validate if the session user (who interact with UI) is part of specified user.to.proxy
// user. If not, reportal can not be saved and warn users.
if (variable.title.equals("reportal.config.user.to.proxy")) {
- String userToProxy = variable.name;
+ final String userToProxy = variable.name;
final UserManager userManager = getApplication().getUserManager();
if (!userManager.validateProxyUser(userToProxy, user)) {
errors.add("User " + user.getUserId() + " has no permission to add " + userToProxy
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
index 1fe7e35..3555c59 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalType.java
@@ -57,10 +57,12 @@ public enum ReportalType {
propertiesFile.put("reportal.pig.script", "res/" + jobName + ".pig");
}
},
+
+
HiveJob("ReportalHive", "reportalhive", "hadoop"), TeraDataJob(
"ReportalTeraData", "reportalteradata", "teradata"),
TableauJob("ReportalTableau", "reportaltableau", "hadoop"),
- DataCollectorJob(
+ PrestoJob("ReportalPresto", "reportalpresto", "hadoop"), DataCollectorJob(
ReportalTypeManager.DATA_COLLECTOR_JOB, ReportalTypeManager.DATA_COLLECTOR_JOB_TYPE, "") {
@Override
public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
@@ -96,7 +98,6 @@ public enum ReportalType {
public void buildJobFiles(final Reportal reportal, final Props propertiesFile,
final File jobFile, final String jobName, final String queryScript, final String proxyUser) {
-
}
public String getJobTypeName() {
diff --git a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
index a40c484..cc76afd 100644
--- a/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
+++ b/az-reportal/src/main/resources/azkaban/viewer/reportal/reportaleditpage.vm
@@ -300,6 +300,7 @@
<option id="ReportalHive" value="ReportalHive">Hive</option>
<option value="ReportalTeraData">Teradata</option>
<option value="ReportalPig">Pig</option>
+ <option value="ReportalPresto">Presto</option>
<option value="ReportalTableau">Tableau</option>
</select>
</div>
@@ -378,4 +379,4 @@
</form>
</div>
</body>
-</html>
+</html>
\ No newline at end of file
diff --git a/az-reportal/src/test/java/azkaban/reportal/ReportalPrestoRunnerTest.java b/az-reportal/src/test/java/azkaban/reportal/ReportalPrestoRunnerTest.java
new file mode 100644
index 0000000..e6f1b4c
--- /dev/null
+++ b/az-reportal/src/test/java/azkaban/reportal/ReportalPrestoRunnerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.reportal;
+
+import static org.assertj.core.api.Java6Assertions.assertThatThrownBy;
+
+import azkaban.jobtype.ReportalPrestoRunner;
+import java.util.Properties;
+import org.junit.Test;
+
+public class ReportalPrestoRunnerTest {
+
+ @Test
+ public void testNoDriverURL() throws InterruptedException {
+ final Properties props = new Properties();
+ props.put(ReportalPrestoRunner.DRIVER_URL, "test");
+ assertThatThrownBy(() -> {
+ new ReportalPrestoRunner("presto", props);
+ }).isInstanceOf
+ (IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testNoPrestoUser() throws InterruptedException {
+ final Properties props = new Properties();
+ props.put(ReportalPrestoRunner.PRESTO_USER, "test");
+ assertThatThrownBy(() -> {
+ new ReportalPrestoRunner("presto", props);
+ }).isInstanceOf
+ (IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testNoJdbcDriverKey() throws InterruptedException {
+ final Properties props = new Properties();
+ props.put(ReportalPrestoRunner.JDBC_DRIVER_KEY, "test");
+ assertThatThrownBy(() -> {
+ new ReportalPrestoRunner("presto", props);
+ }).isInstanceOf
+ (IllegalArgumentException.class);
+ }
+}