azkaban-aplcache

Provide a way for Azkaban to specify links to external URLs (#851)

12/15/2016 8:36:49 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 3359923..3618eef 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -41,8 +41,9 @@ dependencies {
   compile('io.dropwizard.metrics:metrics-core:3.1.0')
   compile('io.dropwizard.metrics:metrics-jvm:3.1.0')
 
-  testCompile(project(':azkaban-test').sourceSets.test.output)
   testCompile('org.hamcrest:hamcrest-all:1.3')
+  testCompile('org.mockito:mockito-all:1.10.19')
+  testCompile(project(':azkaban-test').sourceSets.test.output)
 }
 
 tasks.withType(JavaCompile) {
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
index 895aeda..1158e95 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -19,19 +19,29 @@ package azkaban.constants;
 public class ServerProperties {
   // These properties are configurable through azkaban.properties
 
+  // Defines a list of external links, each referred to as a topic
+  public static final String AZKABAN_SERVER_EXTERNAL_TOPICS = "azkaban.server.external.topics";
+
+  // External URL template of a given topic, specified in the list defined above
+  public static final String AZKABAN_SERVER_EXTERNAL_TOPIC_URL = "azkaban.server.external.${topic}.url";
+
+  // Designates one of the external link topics to correspond to an execution analyzer
+  public static final String AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC = "azkaban.server.external.analyzer.topic";
+  public static final String AZKABAN_SERVER_EXTERNAL_ANALYZER_LABEL = "azkaban.server.external.analyzer.label";
+
+  // Designates one of the external link topics to correspond to a job log viewer
+  public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC = "azkaban.server.external.logviewer.topic";
+  public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_LABEL = "azkaban.server.external.logviewer.label";
+
   // Configures the Kafka appender for logging user jobs, specified for the exec server
   public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
   public static final String AZKABAN_SERVER_LOGGING_KAFKA_TOPIC = "azkaban.server.logging.kafka.topic";
 
-  /**
-   * Represent the class name of azkaban metrics reporter.
-   */
+  // Represent the class name of azkaban metrics reporter.
   public static final String CUSTOM_METRICS_REPORTER_CLASS_NAME =
       "azkaban.metrics.reporter.name";
 
-  /**
-   * Represent the metrics server URL.
-   */
+  // Represent the metrics server URL.
   public static final String METRICS_SERVER_URL =
       "azkaban.metrics.server.url";
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java b/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
new file mode 100644
index 0000000..53ec8fc
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import azkaban.constants.FlowProperties;
+import azkaban.constants.ServerProperties;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.log4j.Logger;
+
+public class ExternalLinkUtils {
+  private static final Logger logger = Logger.getLogger(ExternalLinkUtils.class);
+
+  public static String getExternalAnalyzerOnReq(Props azkProps, HttpServletRequest req) {
+    // If no topic was configured to be an external analyzer, return empty
+    if (!azkProps.containsKey(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC)) {
+      return "";
+    }
+    // Find out which external link we should use to lead to our analyzer
+    String topic = azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC);
+    return getLinkFromRequest(topic, azkProps, req);
+  }
+
+  public static String getExternalLogViewer(Props azkProps, String jobId, Props jobProps) {
+    // If no topic was configured to be an external analyzer, return empty
+    if (!azkProps.containsKey(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC)) {
+      return "";
+    }
+    // Find out which external link we should use to lead to our log viewer
+    String topic = azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC);
+    return getLinkFromJobAndExecId(topic, azkProps, jobId, jobProps);
+  }
+
+  private static String getLinkFromJobAndExecId(String topic, Props azkProps, String jobId, Props jobProps) {
+    String urlTemplate = getURLForTopic(topic, azkProps);
+    if (urlTemplate.isEmpty()) {
+      logger.error("No URL specified for topic " + topic);
+      return "";
+    }
+    String job = encodeToUTF8(jobId);
+    String execid = encodeToUTF8(jobProps.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
+
+    urlTemplate = urlTemplate.replace("${jobid}", job).replace("${execid}", execid);
+    logger.info("Creating link: " + urlTemplate);
+    return urlTemplate;
+  }
+
+  private static String getLinkFromRequest(String topic, Props azkProps, HttpServletRequest req) {
+    String urlTemplate = getURLForTopic(topic, azkProps);
+    if (urlTemplate.isEmpty()) {
+      logger.error("No URL specified for topic " + topic);
+      return "";
+    }
+    String flowExecutionURL = "";
+    flowExecutionURL += req.getRequestURL();
+    flowExecutionURL += "?";
+    flowExecutionURL += req.getQueryString();
+    flowExecutionURL = encodeToUTF8(flowExecutionURL);
+
+    urlTemplate = urlTemplate.replace("${url}", flowExecutionURL);
+    logger.info("Creating link: " + urlTemplate);
+    return urlTemplate;
+  }
+
+  static String getURLForTopic(String topic, Props azkProps) {
+    return azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", topic), "");
+  }
+
+  static String encodeToUTF8(String url) {
+    try {
+      return URLEncoder.encode(url, "UTF-8").replaceAll("\\+", "%20");
+    } catch (UnsupportedEncodingException e) {
+      logger.error("Specified encoding is not supported", e);
+    }
+    return "";
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java b/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java
index 089dba9..707c33a 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java
@@ -10,6 +10,14 @@ import org.apache.log4j.spi.LoggingEvent;
  * can be found one in place.
  */
 public class PatternLayoutEscaped extends PatternLayout {
+  public PatternLayoutEscaped(String s) {
+    super(s);
+  }
+
+  public PatternLayoutEscaped() {
+    super();
+  }
+
   @Override
   public String format(final LoggingEvent event) {
     if (event.getMessage() instanceof String) {
diff --git a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
new file mode 100644
index 0000000..a0485af
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.constants.FlowProperties;
+import azkaban.constants.ServerProperties;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExternalLinkUtilsTest {
+  private Props azkProps;
+
+  private Props jobProps;
+
+  private String jobId;
+
+  private HttpServletRequest mockRequest;
+
+  private static final String EXEC_URL = "http://localhost:8081/executor";
+
+  private static final String EXEC_QUERY_STRING = "execid=1";
+
+  private static final String EXTERNAL_ANALYZER_TOPIC = "elephant";
+
+  private static final String EXTERNAL_ANALYZER_URL_VALID_FORMAT =
+      "http://elephant.linkedin.com/search?q=${url}";
+
+  private static final String EXTERNAL_ANALYZER_EXPECTED_URL =
+      "http://elephant.linkedin.com/search?q="
+          + "http%3A%2F%2Flocalhost%3A8081%2Fexecutor%3Fexecid%3D1";
+
+  private static final String EXTERNAL_LOGVIEWER_TOPIC = "kibana";
+
+  private static final String EXTERNAL_LOGVIEWER_URL_VALID_FORMAT =
+      "http://kibana.linkedin.com/search?jobid=${jobid}&&execid=${execid}";
+
+  private static final String EXTERNAL_LOGVIEWER_EXPECTED_URL =
+      "http://kibana.linkedin.com/search?jobid=Some%20%2B%20job&&execid=1";
+
+  @Before
+  public void setUp() {
+    // Empty server configuration
+    azkProps = new Props();
+
+    // Job configuration consisting of only an exec id and job id
+    jobProps = new Props();
+    jobProps.put(FlowProperties.AZKABAN_FLOW_EXEC_ID, 1);
+    jobId = "Some + job";
+
+    mockRequest = mock(HttpServletRequest.class);
+  }
+
+  /**
+   * Test validates the happy path when an external analyzer is configured
+   * with '${url}' as the format in 'azkaban.properties'.
+   */
+  @Test
+  public void testGetExternalAnalyzerValidFormat() {
+    azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC, EXTERNAL_ANALYZER_TOPIC);
+    azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_ANALYZER_TOPIC),
+        EXTERNAL_ANALYZER_URL_VALID_FORMAT);
+
+    when(mockRequest.getRequestURL()).thenReturn(new StringBuffer(EXEC_URL));
+    when(mockRequest.getQueryString()).thenReturn(EXEC_QUERY_STRING);
+
+    String externalURL =
+        ExternalLinkUtils.getExternalAnalyzerOnReq(azkProps, mockRequest);
+    assertTrue(externalURL.equals(EXTERNAL_ANALYZER_EXPECTED_URL));
+  }
+
+  /**
+   * Test validates the happy path when an log viewer is configured
+   * with '${execid}'  and '${jobid} as the format in 'azkaban.properties'.
+   */
+  @Test
+  public void testGetExternalLogViewerValidFormat() {
+    azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC, EXTERNAL_LOGVIEWER_TOPIC);
+    azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_LOGVIEWER_TOPIC),
+        EXTERNAL_LOGVIEWER_URL_VALID_FORMAT);
+
+    String externalURL =
+        ExternalLinkUtils.getExternalLogViewer(azkProps, jobId, jobProps);
+    assertTrue(externalURL.equals(EXTERNAL_LOGVIEWER_EXPECTED_URL));
+  }
+
+  /**
+   * Test validates the condition when an external analyzer is not configured
+   * in 'azkaban.properties'.
+   */
+  @Test
+  public void testGetExternalAnalyzerNotConfigured() {
+    String executionExternalLinkURL =
+        ExternalLinkUtils.getExternalAnalyzerOnReq(azkProps, mockRequest);
+    assertTrue(executionExternalLinkURL.equals(""));
+  }
+
+  /**
+   * Test validates the condition when an external log viewer is not configured
+   * in 'azkaban.properties'.
+   */
+  @Test
+  public void testGetLogViewerNotConfigured() {
+    String executionExternalLinkURL =
+        ExternalLinkUtils.getExternalLogViewer(azkProps, jobId, jobProps);
+    assertTrue(executionExternalLinkURL.equals(""));
+  }
+
+  /**
+   * Test validates that when we encode URLs to UTF-8, it does not give us incorrect encodings.
+   */
+  @Test
+  public void testEncodingToUFT8() {
+    assertTrue(ExternalLinkUtils.encodeToUTF8(" ").equals("%20"));
+    assertTrue(ExternalLinkUtils.encodeToUTF8("+").equals("%2B"));
+    assertTrue(ExternalLinkUtils.encodeToUTF8("/").equals("%2F"));
+    assertTrue(ExternalLinkUtils.encodeToUTF8(":").equals("%3A"));
+    assertTrue(ExternalLinkUtils.encodeToUTF8("?").equals("%3F"));
+    assertTrue(ExternalLinkUtils.encodeToUTF8("=").equals("%3D"));
+  }
+
+  /**
+   * Make sure that URLs for analyzers and logviewers are fetched correctly by setting it manually and then fetching them
+   */
+  @Test
+  public void testFetchURL() {
+    azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", "someTopic"), "This is a link");
+    assertTrue(ExternalLinkUtils.getURLForTopic("someTopic", azkProps).equals("This is a link"));
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index dbd5b80..41a34fc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -57,9 +57,11 @@ import azkaban.jobExecutor.JavaProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.Props;
 import azkaban.utils.StringUtils;
 import azkaban.utils.UndefinedPropertyException;
+import azkaban.utils.PatternLayoutEscaped;
 
 public class JobRunner extends EventHandler implements Runnable {
   public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
@@ -246,6 +248,11 @@ public class JobRunner extends EventHandler implements Runnable {
         }
       }
     }
+
+    String externalViewer = ExternalLinkUtils.getExternalLogViewer(azkabanProps, this.jobId, props);
+    if (!externalViewer.isEmpty()) {
+      logger.info("See logs at: " + externalViewer);
+    }
   }
 
   private void attachFileAppender(FileAppender appender) {
@@ -292,7 +299,7 @@ public class JobRunner extends EventHandler implements Runnable {
 
   private KafkaLog4jAppender createKafkaAppender() throws UndefinedPropertyException {
     KafkaLog4jAppender kafkaProducer = new KafkaLog4jAppender();
-    kafkaProducer.setSyncSend(false);
+    kafkaProducer.setSyncSend(true);
     kafkaProducer.setBrokerList(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST));
     kafkaProducer.setTopic(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
 
@@ -302,12 +309,13 @@ public class JobRunner extends EventHandler implements Runnable {
     layout.put("message", "%m");
     layout.put("projectname", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
     layout.put("flowid", props.getString(FlowProperties.AZKABAN_FLOW_FLOW_ID));
+    layout.put("jobid", this.jobId);
     layout.put("submituser", props.getString(FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
     layout.put("execid", props.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
     layout.put("projectversion", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
     layout.put("logsource", "userJob");
 
-    kafkaProducer.setLayout(new EnhancedPatternLayout(layout.toString()));
+    kafkaProducer.setLayout(new PatternLayoutEscaped(layout.toString()));
     kafkaProducer.activateOptions();
 
     flowLogger.info("Created kafka appender for " + this.jobId);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 3d36e63..e36a441 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.log4j.Logger;
 
+import azkaban.constants.ServerProperties;
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
@@ -52,6 +53,7 @@ import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.user.UserManager;
+import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -60,9 +62,9 @@ import azkaban.webapp.plugin.PluginRegistry;
 import azkaban.webapp.plugin.ViewerPlugin;
 
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
-  private static final Logger LOGGER = 
+  private static final Logger LOGGER =
       Logger.getLogger(ExecutorServlet.class.getName());
-  private static final long serialVersionUID = 1L;  
+  private static final long serialVersionUID = 1L;
   private ProjectManager projectManager;
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
@@ -341,23 +343,22 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       page.render();
       return;
     }
-    
+
     Props props = getApplication().getServerProps();
-    String execExternalLinkURL = 
-        ExternalAnalyzerUtils.getExternalAnalyzer(props, req);
+    String execExternalLinkURL = ExternalLinkUtils.getExternalAnalyzerOnReq(props, req);
 
     if(execExternalLinkURL.length() > 0) {
       page.add("executionExternalLinkURL", execExternalLinkURL);
       LOGGER.debug("Added an External analyzer to the page");
       LOGGER.debug("External analyzer url: " + execExternalLinkURL);
-      
-      String execExternalLinkLabel = 
-          props.getString(ExternalAnalyzerUtils.EXECUTION_EXTERNAL_LINK_LABEL, 
+
+      String execExternalLinkLabel =
+          props.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_LABEL,
               "External Analyzer");
       page.add("executionExternalLinkLabel", execExternalLinkLabel);
       LOGGER.debug("External analyzer label set to : " + execExternalLinkLabel);
     }
-    
+
     page.add("projectId", project.getId());
     page.add("projectName", project.getName());
     page.add("flowid", flow.getFlowId());