azkaban-aplcache

Dynamically generate job link URL. (#1695) * Dynamically

3/21/2018 8:54:17 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 9497ffe..3ee3af4 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -105,6 +105,17 @@ public class Constants {
     // Designates one of the external link topics to correspond to a job log viewer
     public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC = "azkaban.server.external.logviewer.topic";
     public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_LABEL = "azkaban.server.external.logviewer.label";
+    
+    /*
+     * Hadoop/Spark user job link.
+     * Example:
+     * a) azkaban.server.external.resource_manager_job_url=http://***rm***:8088/cluster/app/application_${application.id}
+     * b) azkaban.server.external.history_server_job_url=http://***jh***:19888/jobhistory/job/job_${application.id}
+     * c) azkaban.server.external.spark_history_server_job_url=http://***sh***:18080/history/application_${application.id}/1/jobs
+     * */
+    public static final String RESOURCE_MANAGER_JOB_URL = "azkaban.server.external.resource_manager_job_url";
+    public static final String HISTORY_SERVER_JOB_URL = "azkaban.server.external.history_server_job_url";
+    public static final String SPARK_HISTORY_SERVER_JOB_URL = "azkaban.server.external.spark_history_server_job_url";
 
     // Configures the Kafka appender for logging user jobs, specified for the exec server
     public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6cf0fa8..f000db8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,7 +16,10 @@
 
 package azkaban.executor;
 
+import static java.util.Objects.requireNonNull;
+
 import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.alert.Alerter;
 import azkaban.event.EventHandler;
 import azkaban.executor.selector.ExecutorComparator;
@@ -26,15 +29,21 @@ import azkaban.flow.FlowUtils;
 import azkaban.metrics.CommonMetrics;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
+import azkaban.utils.AuthenticationUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import com.google.common.collect.Lists;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.lang.Thread.State;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,6 +61,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.lang.StringUtils;
@@ -65,6 +76,16 @@ import org.joda.time.DateTime;
 public class ExecutorManager extends EventHandler implements
     ExecutorManagerAdapter {
 
+  private static final String SPARK_JOB_TYPE = "spark";
+  private static final String APPLICATION_ID = "${application.id}";
+  // The regex to look for while fetching application ID from the Hadoop/Spark job log
+  private static final Pattern APPLICATION_ID_PATTERN = Pattern
+      .compile("application_\\d+_\\d+");
+  // The regex to look for while validating the content from RM job link
+  private static final Pattern FAILED_TO_READ_APPLICATION_PATTERN = Pattern
+      .compile("Failed to read the application");
+  private static final Pattern INVALID_APPLICATION_ID_PATTERN = Pattern
+      .compile("Invalid Application ID");
   private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
   // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
@@ -135,6 +156,16 @@ public class ExecutorManager extends EventHandler implements
 
   }
 
+  private String findApplicationIdFromLog(final String logData) {
+    final Matcher matcher = APPLICATION_ID_PATTERN.matcher(logData);
+    String appId = null;
+    if (matcher.find()) {
+      appId = matcher.group().substring(12);
+    }
+    this.logger.info("Application ID is " + appId);
+    return appId;
+  }
+
   private void setupMultiExecutorMode() {
     // initliatize hard filters for executor selector from azkaban.properties
     final String filters = this.azkProps
@@ -744,6 +775,99 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
+  public String getJobLinkUrl(final ExecutableFlow exFlow, final String jobId, final int attempt) {
+    if (!this.azkProps.containsKey(ConfigurationKeys.RESOURCE_MANAGER_JOB_URL) || !this.azkProps
+        .containsKey(ConfigurationKeys.HISTORY_SERVER_JOB_URL) || !this.azkProps
+        .containsKey(ConfigurationKeys.SPARK_HISTORY_SERVER_JOB_URL)) {
+      return null;
+    }
+
+    final String applicationId = getApplicationId(exFlow, jobId, attempt);
+    if (applicationId == null) {
+      return null;
+    }
+
+    final URL url;
+    final String jobLinkUrl;
+    boolean isRMJobLinkValid = true;
+
+    try {
+      url = new URL(this.azkProps.getString(ConfigurationKeys.RESOURCE_MANAGER_JOB_URL)
+          .replace(APPLICATION_ID, applicationId));
+      final String keytabPrincipal = requireNonNull(
+          this.azkProps.getString(ConfigurationKeys.AZKABAN_KERBEROS_PRINCIPAL));
+      final String keytabPath = requireNonNull(this.azkProps.getString(ConfigurationKeys
+          .AZKABAN_KEYTAB_PATH));
+      final HttpURLConnection connection = AuthenticationUtils.loginAuthenticatedURL(url,
+          keytabPrincipal, keytabPath);
+
+      try (final BufferedReader in = new BufferedReader(
+          new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
+        String inputLine;
+        while ((inputLine = in.readLine()) != null) {
+          if (FAILED_TO_READ_APPLICATION_PATTERN.matcher(inputLine).find()
+              || INVALID_APPLICATION_ID_PATTERN.matcher(inputLine).find()) {
+            this.logger.info(
+                "RM job link is invalid or has expired for application_" + applicationId);
+            isRMJobLinkValid = false;
+            break;
+          }
+        }
+      }
+    } catch (final Exception e) {
+      this.logger.error("Failed to get job link for application_" + applicationId, e);
+      return null;
+    }
+
+    if (isRMJobLinkValid) {
+      jobLinkUrl = url.toString();
+    } else {
+      // If RM job link is invalid or has expired, fetch the job link from JHS or SHS.
+      if (exFlow.getExecutableNode(jobId).getType().equals(SPARK_JOB_TYPE)) {
+        jobLinkUrl =
+            this.azkProps.get(ConfigurationKeys.SPARK_HISTORY_SERVER_JOB_URL).replace
+                (APPLICATION_ID, applicationId);
+      } else {
+        jobLinkUrl =
+            this.azkProps.get(ConfigurationKeys.HISTORY_SERVER_JOB_URL).replace(APPLICATION_ID,
+                applicationId);
+      }
+    }
+
+    this.logger.info(
+        "Job link url is " + jobLinkUrl + " for execution " + exFlow.getExecutionId() + ", job "
+            + jobId);
+    return jobLinkUrl;
+  }
+
+  private String getApplicationId(final ExecutableFlow exFlow, final String jobId,
+      final int attempt) {
+    String applicationId;
+    boolean finished = false;
+    int offset = 0;
+    try {
+      while (!finished) {
+        final LogData data = getExecutionJobLog(exFlow, jobId, offset, 50000, attempt);
+        if (data != null) {
+          applicationId = findApplicationIdFromLog(data.getData());
+          if (applicationId != null) {
+            return applicationId;
+          }
+          offset = data.getOffset() + data.getLength();
+          this.logger.info("Get application ID for execution " + exFlow.getExecutionId() + ", job"
+              + " " + jobId + ", attempt " + attempt + ", data offset " + offset);
+        } else {
+          finished = true;
+        }
+      }
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get application ID for execution " + exFlow.getExecutionId() +
+          ", job " + jobId + ", attempt " + attempt + ", data offset " + offset, e);
+    }
+    return null;
+  }
+
+  @Override
   public JobMetaData getExecutionJobMetaData(final ExecutableFlow exFlow,
       final String jobId, final int offset, final int length, final int attempt)
       throws ExecutorManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 6ab7993..c939d6f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -87,6 +87,8 @@ public interface ExecutorManagerAdapter {
   public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId,
       int attempt) throws ExecutorManagerException;
 
+  public String getJobLinkUrl(ExecutableFlow exFlow, String jobId, int attempt);
+
   public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
       String jobId, int offset, int length, int attempt)
       throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/utils/AuthenticationUtils.java b/azkaban-common/src/main/java/azkaban/utils/AuthenticationUtils.java
new file mode 100644
index 0000000..c643f4c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/AuthenticationUtils.java
@@ -0,0 +1,62 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.utils;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The util class for hadoop authentication.
+ */
+public class AuthenticationUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(AuthenticationUtils.class);
+
+  public static HttpURLConnection loginAuthenticatedURL(final URL url, final String keytabPrincipal,
+      final String keytabPath) throws Exception {
+    final List<URL> resources = new ArrayList<>();
+    resources.add(url);
+
+    final URLClassLoader ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
+    final Configuration conf = new Configuration();
+    conf.setClassLoader(ucl);
+    UserGroupInformation.setConfiguration(conf);
+
+    logger.info(
+        "Logging in URL: " + url.toString() + " using Principal: " + keytabPrincipal + ", Keytab: "
+            + keytabPath);
+
+    UserGroupInformation.loginUserFromKeytab(keytabPrincipal, keytabPath);
+
+    final HttpURLConnection connection = UserGroupInformation.getLoginUser().doAs(
+        (PrivilegedExceptionAction<HttpURLConnection>) () -> {
+          final Token token = new Token();
+          return new AuthenticatedURL().openConnection(url, token);
+        });
+
+    return connection;
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 764e32b..7f2bb6b 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -291,6 +291,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
     ExecutableFlow flow = null;
     ExecutableNode node = null;
+    final String jobLinkUrl;
     try {
       flow = this.executorManager.getExecutableFlow(execId);
       if (flow == null) {
@@ -307,6 +308,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
+      jobLinkUrl = this.executorManager.getJobLinkUrl(flow, jobId, attempt);
+
       final List<ViewerPlugin> jobViewerPlugins =
           PluginRegistry.getRegistry().getViewerPluginsForJobType(
               node.getType());
@@ -329,6 +332,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     page.add("flowid", flow.getId());
     page.add("parentflowid", node.getParentFlow().getFlowId());
     page.add("jobname", node.getId());
+    page.add("jobLinkUrl", jobLinkUrl);
+    page.add("jobType", node.getType());
+
+    if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
+      page.add("jobFailed", true);
+    } else {
+      page.add("jobFailed", false);
+    }
 
     page.render();
   }
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
index 801f013..a655d08 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -26,6 +26,15 @@
       </div>
       <div class="header-control">
         <div class="pull-right header-form">
+          #if ($jobLinkUrl)
+            #if ($jobType == "spark")
+              <a href="$jobLinkUrl" target="_blank" class="btn btn-primary btn-sm"
+                 id="jobLinkUrl">Spark Job Log</a>
+            #else
+              <a href="$jobLinkUrl" target="_blank" class="btn btn-primary btn-sm"
+                 id="jobLinkUrl">Hadoop Job Log</a>
+            #end
+          #end
           <a href="${context}/manager?project=${projectName}&flow=${parentflowid}&job=$jobname"
              class="btn btn-info btn-sm">Job Properties</a>
         </div>
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 59cfc41..6ad9432 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -37,6 +37,13 @@
     var jobId = "${jobid}";
     var attempt = ${attempt};
   </script>
+  <script type="text/javascript">
+    $(document).ready(function () {
+      #if ($jobFailed == "true")
+        $("#jobLinkUrl").removeClass("btn-primary").addClass("btn-danger");
+      #end
+    });
+  </script>
 </head>
 <body>