azkaban-aplcache
Changes
az-core/src/main/java/azkaban/Constants.java 11(+11 -0)
Details
az-core/src/main/java/azkaban/Constants.java 11(+11 -0)
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 9497ffe..3ee3af4 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -105,6 +105,17 @@ public class Constants {
// Designates one of the external link topics to correspond to a job log viewer
public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC = "azkaban.server.external.logviewer.topic";
public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_LABEL = "azkaban.server.external.logviewer.label";
+
+ /*
+ * Hadoop/Spark user job link.
+ * Example:
+ * a) azkaban.server.external.resource_manager_job_url=http://***rm***:8088/cluster/app/application_${application.id}
+ * b) azkaban.server.external.history_server_job_url=http://***jh***:19888/jobhistory/job/job_${application.id}
+ * c) azkaban.server.external.spark_history_server_job_url=http://***sh***:18080/history/application_${application.id}/1/jobs
+ * */
+ public static final String RESOURCE_MANAGER_JOB_URL = "azkaban.server.external.resource_manager_job_url";
+ public static final String HISTORY_SERVER_JOB_URL = "azkaban.server.external.history_server_job_url";
+ public static final String SPARK_HISTORY_SERVER_JOB_URL = "azkaban.server.external.spark_history_server_job_url";
// Configures the Kafka appender for logging user jobs, specified for the exec server
public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6cf0fa8..f000db8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,7 +16,10 @@
package azkaban.executor;
+import static java.util.Objects.requireNonNull;
+
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.event.EventHandler;
import azkaban.executor.selector.ExecutorComparator;
@@ -26,15 +29,21 @@ import azkaban.flow.FlowUtils;
import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
+import azkaban.utils.AuthenticationUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.collect.Lists;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.lang.Thread.State;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -52,6 +61,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
@@ -65,6 +76,16 @@ import org.joda.time.DateTime;
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+ private static final String SPARK_JOB_TYPE = "spark";
+ private static final String APPLICATION_ID = "${application.id}";
+ // The regex to look for while fetching application ID from the Hadoop/Spark job log
+ private static final Pattern APPLICATION_ID_PATTERN = Pattern
+ .compile("application_\\d+_\\d+");
+ // The regex to look for while validating the content from RM job link
+ private static final Pattern FAILED_TO_READ_APPLICATION_PATTERN = Pattern
+ .compile("Failed to read the application");
+ private static final Pattern INVALID_APPLICATION_ID_PATTERN = Pattern
+ .compile("Invalid Application ID");
private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
@@ -135,6 +156,16 @@ public class ExecutorManager extends EventHandler implements
}
+ private String findApplicationIdFromLog(final String logData) {
+ final Matcher matcher = APPLICATION_ID_PATTERN.matcher(logData);
+ String appId = null;
+ if (matcher.find()) {
+ appId = matcher.group().substring(12);
+ }
+ this.logger.info("Application ID is " + appId);
+ return appId;
+ }
+
private void setupMultiExecutorMode() {
// initliatize hard filters for executor selector from azkaban.properties
final String filters = this.azkProps
@@ -744,6 +775,99 @@ public class ExecutorManager extends EventHandler implements
}
@Override
+ public String getJobLinkUrl(final ExecutableFlow exFlow, final String jobId, final int attempt) {
+ if (!this.azkProps.containsKey(ConfigurationKeys.RESOURCE_MANAGER_JOB_URL) || !this.azkProps
+ .containsKey(ConfigurationKeys.HISTORY_SERVER_JOB_URL) || !this.azkProps
+ .containsKey(ConfigurationKeys.SPARK_HISTORY_SERVER_JOB_URL)) {
+ return null;
+ }
+
+ final String applicationId = getApplicationId(exFlow, jobId, attempt);
+ if (applicationId == null) {
+ return null;
+ }
+
+ final URL url;
+ final String jobLinkUrl;
+ boolean isRMJobLinkValid = true;
+
+ try {
+ url = new URL(this.azkProps.getString(ConfigurationKeys.RESOURCE_MANAGER_JOB_URL)
+ .replace(APPLICATION_ID, applicationId));
+ final String keytabPrincipal = requireNonNull(
+ this.azkProps.getString(ConfigurationKeys.AZKABAN_KERBEROS_PRINCIPAL));
+ final String keytabPath = requireNonNull(this.azkProps.getString(ConfigurationKeys
+ .AZKABAN_KEYTAB_PATH));
+ final HttpURLConnection connection = AuthenticationUtils.loginAuthenticatedURL(url,
+ keytabPrincipal, keytabPath);
+
+ try (final BufferedReader in = new BufferedReader(
+ new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ if (FAILED_TO_READ_APPLICATION_PATTERN.matcher(inputLine).find()
+ || INVALID_APPLICATION_ID_PATTERN.matcher(inputLine).find()) {
+ this.logger.info(
+ "RM job link is invalid or has expired for application_" + applicationId);
+ isRMJobLinkValid = false;
+ break;
+ }
+ }
+ }
+ } catch (final Exception e) {
+ this.logger.error("Failed to get job link for application_" + applicationId, e);
+ return null;
+ }
+
+ if (isRMJobLinkValid) {
+ jobLinkUrl = url.toString();
+ } else {
+ // If RM job link is invalid or has expired, fetch the job link from JHS or SHS.
+ if (exFlow.getExecutableNode(jobId).getType().equals(SPARK_JOB_TYPE)) {
+ jobLinkUrl =
+ this.azkProps.get(ConfigurationKeys.SPARK_HISTORY_SERVER_JOB_URL).replace
+ (APPLICATION_ID, applicationId);
+ } else {
+ jobLinkUrl =
+ this.azkProps.get(ConfigurationKeys.HISTORY_SERVER_JOB_URL).replace(APPLICATION_ID,
+ applicationId);
+ }
+ }
+
+ this.logger.info(
+ "Job link url is " + jobLinkUrl + " for execution " + exFlow.getExecutionId() + ", job "
+ + jobId);
+ return jobLinkUrl;
+ }
+
+ private String getApplicationId(final ExecutableFlow exFlow, final String jobId,
+ final int attempt) {
+ String applicationId;
+ boolean finished = false;
+ int offset = 0;
+ try {
+ while (!finished) {
+ final LogData data = getExecutionJobLog(exFlow, jobId, offset, 50000, attempt);
+ if (data != null) {
+ applicationId = findApplicationIdFromLog(data.getData());
+ if (applicationId != null) {
+ return applicationId;
+ }
+ offset = data.getOffset() + data.getLength();
+ this.logger.info("Get application ID for execution " + exFlow.getExecutionId() + ", job"
+ + " " + jobId + ", attempt " + attempt + ", data offset " + offset);
+ } else {
+ finished = true;
+ }
+ }
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get application ID for execution " + exFlow.getExecutionId() +
+ ", job " + jobId + ", attempt " + attempt + ", data offset " + offset, e);
+ }
+ return null;
+ }
+
+ @Override
public JobMetaData getExecutionJobMetaData(final ExecutableFlow exFlow,
final String jobId, final int offset, final int length, final int attempt)
throws ExecutorManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 6ab7993..c939d6f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -87,6 +87,8 @@ public interface ExecutorManagerAdapter {
public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId,
int attempt) throws ExecutorManagerException;
+ public String getJobLinkUrl(ExecutableFlow exFlow, String jobId, int attempt);
+
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
String jobId, int offset, int length, int attempt)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/utils/AuthenticationUtils.java b/azkaban-common/src/main/java/azkaban/utils/AuthenticationUtils.java
new file mode 100644
index 0000000..c643f4c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/AuthenticationUtils.java
@@ -0,0 +1,62 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.utils;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The util class for hadoop authentication.
+ */
+public class AuthenticationUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(AuthenticationUtils.class);
+
+ public static HttpURLConnection loginAuthenticatedURL(final URL url, final String keytabPrincipal,
+ final String keytabPath) throws Exception {
+ final List<URL> resources = new ArrayList<>();
+ resources.add(url);
+
+ final URLClassLoader ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
+ final Configuration conf = new Configuration();
+ conf.setClassLoader(ucl);
+ UserGroupInformation.setConfiguration(conf);
+
+ logger.info(
+ "Logging in URL: " + url.toString() + " using Principal: " + keytabPrincipal + ", Keytab: "
+ + keytabPath);
+
+ UserGroupInformation.loginUserFromKeytab(keytabPrincipal, keytabPath);
+
+ final HttpURLConnection connection = UserGroupInformation.getLoginUser().doAs(
+ (PrivilegedExceptionAction<HttpURLConnection>) () -> {
+ final Token token = new Token();
+ return new AuthenticatedURL().openConnection(url, token);
+ });
+
+ return connection;
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 764e32b..7f2bb6b 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -291,6 +291,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableFlow flow = null;
ExecutableNode node = null;
+ final String jobLinkUrl;
try {
flow = this.executorManager.getExecutableFlow(execId);
if (flow == null) {
@@ -307,6 +308,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
+ jobLinkUrl = this.executorManager.getJobLinkUrl(flow, jobId, attempt);
+
final List<ViewerPlugin> jobViewerPlugins =
PluginRegistry.getRegistry().getViewerPluginsForJobType(
node.getType());
@@ -329,6 +332,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.add("flowid", flow.getId());
page.add("parentflowid", node.getParentFlow().getFlowId());
page.add("jobname", node.getId());
+ page.add("jobLinkUrl", jobLinkUrl);
+ page.add("jobType", node.getType());
+
+ if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
+ page.add("jobFailed", true);
+ } else {
+ page.add("jobFailed", false);
+ }
page.render();
}
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
index 801f013..a655d08 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -26,6 +26,15 @@
</div>
<div class="header-control">
<div class="pull-right header-form">
+ #if ($jobLinkUrl)
+ #if ($jobType == "spark")
+ <a href="$jobLinkUrl" target="_blank" class="btn btn-primary btn-sm"
+ id="jobLinkUrl">Spark Job Log</a>
+ #else
+ <a href="$jobLinkUrl" target="_blank" class="btn btn-primary btn-sm"
+ id="jobLinkUrl">Hadoop Job Log</a>
+ #end
+ #end
<a href="${context}/manager?project=${projectName}&flow=${parentflowid}&job=$jobname"
class="btn btn-info btn-sm">Job Properties</a>
</div>
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 59cfc41..6ad9432 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -37,6 +37,13 @@
var jobId = "${jobid}";
var attempt = ${attempt};
</script>
+ <script type="text/javascript">
+ $(document).ready(function () {
+ #if ($jobFailed == "true")
+ $("#jobLinkUrl").removeClass("btn-primary").addClass("btn-danger");
+ #end
+ });
+ </script>
</head>
<body>