azkaban-aplcache
Changes
azkaban-common/build.gradle 3(+2 -1)
Details
azkaban-common/build.gradle 3(+2 -1)
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 3359923..3618eef 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -41,8 +41,9 @@ dependencies {
compile('io.dropwizard.metrics:metrics-core:3.1.0')
compile('io.dropwizard.metrics:metrics-jvm:3.1.0')
- testCompile(project(':azkaban-test').sourceSets.test.output)
testCompile('org.hamcrest:hamcrest-all:1.3')
+ testCompile('org.mockito:mockito-all:1.10.19')
+ testCompile(project(':azkaban-test').sourceSets.test.output)
}
tasks.withType(JavaCompile) {
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
index 895aeda..1158e95 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -19,19 +19,29 @@ package azkaban.constants;
public class ServerProperties {
// These properties are configurable through azkaban.properties
+ // Defines a list of external links, each referred to as a topic
+ public static final String AZKABAN_SERVER_EXTERNAL_TOPICS = "azkaban.server.external.topics";
+
+ // External URL template of a given topic, specified in the list defined above
+ public static final String AZKABAN_SERVER_EXTERNAL_TOPIC_URL = "azkaban.server.external.${topic}.url";
+
+ // Designates one of the external link topics to correspond to an execution analyzer
+ public static final String AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC = "azkaban.server.external.analyzer.topic";
+ public static final String AZKABAN_SERVER_EXTERNAL_ANALYZER_LABEL = "azkaban.server.external.analyzer.label";
+
+ // Designates one of the external link topics to correspond to a job log viewer
+ public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC = "azkaban.server.external.logviewer.topic";
+ public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_LABEL = "azkaban.server.external.logviewer.label";
+
// Configures the Kafka appender for logging user jobs, specified for the exec server
public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
public static final String AZKABAN_SERVER_LOGGING_KAFKA_TOPIC = "azkaban.server.logging.kafka.topic";
- /**
- * Represent the class name of azkaban metrics reporter.
- */
+ // Represent the class name of azkaban metrics reporter.
public static final String CUSTOM_METRICS_REPORTER_CLASS_NAME =
"azkaban.metrics.reporter.name";
- /**
- * Represent the metrics server URL.
- */
+ // Represent the metrics server URL.
public static final String METRICS_SERVER_URL =
"azkaban.metrics.server.url";
diff --git a/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java b/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
new file mode 100644
index 0000000..53ec8fc
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import azkaban.constants.FlowProperties;
+import azkaban.constants.ServerProperties;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.log4j.Logger;
+
+public class ExternalLinkUtils {
+ private static final Logger logger = Logger.getLogger(ExternalLinkUtils.class);
+
+ public static String getExternalAnalyzerOnReq(Props azkProps, HttpServletRequest req) {
+ // If no topic was configured to be an external analyzer, return empty
+ if (!azkProps.containsKey(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC)) {
+ return "";
+ }
+ // Find out which external link we should use to lead to our analyzer
+ String topic = azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC);
+ return getLinkFromRequest(topic, azkProps, req);
+ }
+
+ public static String getExternalLogViewer(Props azkProps, String jobId, Props jobProps) {
+ // If no topic was configured to be an external analyzer, return empty
+ if (!azkProps.containsKey(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC)) {
+ return "";
+ }
+ // Find out which external link we should use to lead to our log viewer
+ String topic = azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC);
+ return getLinkFromJobAndExecId(topic, azkProps, jobId, jobProps);
+ }
+
+ private static String getLinkFromJobAndExecId(String topic, Props azkProps, String jobId, Props jobProps) {
+ String urlTemplate = getURLForTopic(topic, azkProps);
+ if (urlTemplate.isEmpty()) {
+ logger.error("No URL specified for topic " + topic);
+ return "";
+ }
+ String job = encodeToUTF8(jobId);
+ String execid = encodeToUTF8(jobProps.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
+
+ urlTemplate = urlTemplate.replace("${jobid}", job).replace("${execid}", execid);
+ logger.info("Creating link: " + urlTemplate);
+ return urlTemplate;
+ }
+
+ private static String getLinkFromRequest(String topic, Props azkProps, HttpServletRequest req) {
+ String urlTemplate = getURLForTopic(topic, azkProps);
+ if (urlTemplate.isEmpty()) {
+ logger.error("No URL specified for topic " + topic);
+ return "";
+ }
+ String flowExecutionURL = "";
+ flowExecutionURL += req.getRequestURL();
+ flowExecutionURL += "?";
+ flowExecutionURL += req.getQueryString();
+ flowExecutionURL = encodeToUTF8(flowExecutionURL);
+
+ urlTemplate = urlTemplate.replace("${url}", flowExecutionURL);
+ logger.info("Creating link: " + urlTemplate);
+ return urlTemplate;
+ }
+
+ static String getURLForTopic(String topic, Props azkProps) {
+ return azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", topic), "");
+ }
+
+ static String encodeToUTF8(String url) {
+ try {
+ return URLEncoder.encode(url, "UTF-8").replaceAll("\\+", "%20");
+ } catch (UnsupportedEncodingException e) {
+ logger.error("Specified encoding is not supported", e);
+ }
+ return "";
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java b/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java
index 089dba9..707c33a 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PatternLayoutEscaped.java
@@ -10,6 +10,14 @@ import org.apache.log4j.spi.LoggingEvent;
* can be found one in place.
*/
public class PatternLayoutEscaped extends PatternLayout {
+ public PatternLayoutEscaped(String s) {
+ super(s);
+ }
+
+ public PatternLayoutEscaped() {
+ super();
+ }
+
@Override
public String format(final LoggingEvent event) {
if (event.getMessage() instanceof String) {
diff --git a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
new file mode 100644
index 0000000..a0485af
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.constants.FlowProperties;
+import azkaban.constants.ServerProperties;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExternalLinkUtilsTest {
+ private Props azkProps;
+
+ private Props jobProps;
+
+ private String jobId;
+
+ private HttpServletRequest mockRequest;
+
+ private static final String EXEC_URL = "http://localhost:8081/executor";
+
+ private static final String EXEC_QUERY_STRING = "execid=1";
+
+ private static final String EXTERNAL_ANALYZER_TOPIC = "elephant";
+
+ private static final String EXTERNAL_ANALYZER_URL_VALID_FORMAT =
+ "http://elephant.linkedin.com/search?q=${url}";
+
+ private static final String EXTERNAL_ANALYZER_EXPECTED_URL =
+ "http://elephant.linkedin.com/search?q="
+ + "http%3A%2F%2Flocalhost%3A8081%2Fexecutor%3Fexecid%3D1";
+
+ private static final String EXTERNAL_LOGVIEWER_TOPIC = "kibana";
+
+ private static final String EXTERNAL_LOGVIEWER_URL_VALID_FORMAT =
+ "http://kibana.linkedin.com/search?jobid=${jobid}&&execid=${execid}";
+
+ private static final String EXTERNAL_LOGVIEWER_EXPECTED_URL =
+ "http://kibana.linkedin.com/search?jobid=Some%20%2B%20job&&execid=1";
+
+ @Before
+ public void setUp() {
+ // Empty server configuration
+ azkProps = new Props();
+
+ // Job configuration consisting of only an exec id and job id
+ jobProps = new Props();
+ jobProps.put(FlowProperties.AZKABAN_FLOW_EXEC_ID, 1);
+ jobId = "Some + job";
+
+ mockRequest = mock(HttpServletRequest.class);
+ }
+
+ /**
+ * Test validates the happy path when an external analyzer is configured
+ * with '${url}' as the format in 'azkaban.properties'.
+ */
+ @Test
+ public void testGetExternalAnalyzerValidFormat() {
+ azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC, EXTERNAL_ANALYZER_TOPIC);
+ azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_ANALYZER_TOPIC),
+ EXTERNAL_ANALYZER_URL_VALID_FORMAT);
+
+ when(mockRequest.getRequestURL()).thenReturn(new StringBuffer(EXEC_URL));
+ when(mockRequest.getQueryString()).thenReturn(EXEC_QUERY_STRING);
+
+ String externalURL =
+ ExternalLinkUtils.getExternalAnalyzerOnReq(azkProps, mockRequest);
+ assertTrue(externalURL.equals(EXTERNAL_ANALYZER_EXPECTED_URL));
+ }
+
+ /**
+ * Test validates the happy path when an log viewer is configured
+ * with '${execid}' and '${jobid} as the format in 'azkaban.properties'.
+ */
+ @Test
+ public void testGetExternalLogViewerValidFormat() {
+ azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC, EXTERNAL_LOGVIEWER_TOPIC);
+ azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_LOGVIEWER_TOPIC),
+ EXTERNAL_LOGVIEWER_URL_VALID_FORMAT);
+
+ String externalURL =
+ ExternalLinkUtils.getExternalLogViewer(azkProps, jobId, jobProps);
+ assertTrue(externalURL.equals(EXTERNAL_LOGVIEWER_EXPECTED_URL));
+ }
+
+ /**
+ * Test validates the condition when an external analyzer is not configured
+ * in 'azkaban.properties'.
+ */
+ @Test
+ public void testGetExternalAnalyzerNotConfigured() {
+ String executionExternalLinkURL =
+ ExternalLinkUtils.getExternalAnalyzerOnReq(azkProps, mockRequest);
+ assertTrue(executionExternalLinkURL.equals(""));
+ }
+
+ /**
+ * Test validates the condition when an external log viewer is not configured
+ * in 'azkaban.properties'.
+ */
+ @Test
+ public void testGetLogViewerNotConfigured() {
+ String executionExternalLinkURL =
+ ExternalLinkUtils.getExternalLogViewer(azkProps, jobId, jobProps);
+ assertTrue(executionExternalLinkURL.equals(""));
+ }
+
+ /**
+ * Test validates that when we encode URLs to UTF-8, it does not give us incorrect encodings.
+ */
+ @Test
+ public void testEncodingToUFT8() {
+ assertTrue(ExternalLinkUtils.encodeToUTF8(" ").equals("%20"));
+ assertTrue(ExternalLinkUtils.encodeToUTF8("+").equals("%2B"));
+ assertTrue(ExternalLinkUtils.encodeToUTF8("/").equals("%2F"));
+ assertTrue(ExternalLinkUtils.encodeToUTF8(":").equals("%3A"));
+ assertTrue(ExternalLinkUtils.encodeToUTF8("?").equals("%3F"));
+ assertTrue(ExternalLinkUtils.encodeToUTF8("=").equals("%3D"));
+ }
+
+ /**
+ * Make sure that URLs for analyzers and logviewers are fetched correctly by setting it manually and then fetching them
+ */
+ @Test
+ public void testFetchURL() {
+ azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", "someTopic"), "This is a link");
+ assertTrue(ExternalLinkUtils.getURLForTopic("someTopic", azkProps).equals("This is a link"));
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index dbd5b80..41a34fc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -57,9 +57,11 @@ import azkaban.jobExecutor.JavaProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
+import azkaban.utils.ExternalLinkUtils;
import azkaban.utils.Props;
import azkaban.utils.StringUtils;
import azkaban.utils.UndefinedPropertyException;
+import azkaban.utils.PatternLayoutEscaped;
public class JobRunner extends EventHandler implements Runnable {
public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
@@ -246,6 +248,11 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
}
+
+ String externalViewer = ExternalLinkUtils.getExternalLogViewer(azkabanProps, this.jobId, props);
+ if (!externalViewer.isEmpty()) {
+ logger.info("See logs at: " + externalViewer);
+ }
}
private void attachFileAppender(FileAppender appender) {
@@ -292,7 +299,7 @@ public class JobRunner extends EventHandler implements Runnable {
private KafkaLog4jAppender createKafkaAppender() throws UndefinedPropertyException {
KafkaLog4jAppender kafkaProducer = new KafkaLog4jAppender();
- kafkaProducer.setSyncSend(false);
+ kafkaProducer.setSyncSend(true);
kafkaProducer.setBrokerList(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST));
kafkaProducer.setTopic(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
@@ -302,12 +309,13 @@ public class JobRunner extends EventHandler implements Runnable {
layout.put("message", "%m");
layout.put("projectname", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
layout.put("flowid", props.getString(FlowProperties.AZKABAN_FLOW_FLOW_ID));
+ layout.put("jobid", this.jobId);
layout.put("submituser", props.getString(FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
layout.put("execid", props.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
layout.put("projectversion", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
layout.put("logsource", "userJob");
- kafkaProducer.setLayout(new EnhancedPatternLayout(layout.toString()));
+ kafkaProducer.setLayout(new PatternLayoutEscaped(layout.toString()));
kafkaProducer.activateOptions();
flowLogger.info("Created kafka appender for " + this.jobId);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 3d36e63..e36a441 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.log4j.Logger;
+import azkaban.constants.ServerProperties;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
@@ -52,6 +53,7 @@ import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.user.UserManager;
+import azkaban.utils.ExternalLinkUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -60,9 +62,9 @@ import azkaban.webapp.plugin.PluginRegistry;
import azkaban.webapp.plugin.ViewerPlugin;
public class ExecutorServlet extends LoginAbstractAzkabanServlet {
- private static final Logger LOGGER =
+ private static final Logger LOGGER =
Logger.getLogger(ExecutorServlet.class.getName());
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
private ProjectManager projectManager;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
@@ -341,23 +343,22 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.render();
return;
}
-
+
Props props = getApplication().getServerProps();
- String execExternalLinkURL =
- ExternalAnalyzerUtils.getExternalAnalyzer(props, req);
+ String execExternalLinkURL = ExternalLinkUtils.getExternalAnalyzerOnReq(props, req);
if(execExternalLinkURL.length() > 0) {
page.add("executionExternalLinkURL", execExternalLinkURL);
LOGGER.debug("Added an External analyzer to the page");
LOGGER.debug("External analyzer url: " + execExternalLinkURL);
-
- String execExternalLinkLabel =
- props.getString(ExternalAnalyzerUtils.EXECUTION_EXTERNAL_LINK_LABEL,
+
+ String execExternalLinkLabel =
+ props.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_LABEL,
"External Analyzer");
page.add("executionExternalLinkLabel", execExternalLinkLabel);
LOGGER.debug("External analyzer label set to : " + execExternalLinkLabel);
}
-
+
page.add("projectId", project.getId());
page.add("projectName", project.getName());
page.add("flowid", flow.getFlowId());