azkaban-aplcache
Changes
azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java 240(+240 -0)
build.gradle 8(+6 -2)
Details
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackConstants.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackConstants.java
new file mode 100644
index 0000000..122640d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackConstants.java
@@ -0,0 +1,40 @@
+package azkaban.jobcallback;
+
+public interface JobCallbackConstants {
+ public static final String STATUS_TOKEN = "status";
+ public static final String SEQUENCE_TOKEN = "sequence";
+ public static final String HTTP_GET = "GET";
+ public static final String HTTP_POST = "POST";
+
+ public static final String MAX_POST_BODY_LENGTH_PROPERTY_KEY =
+ "jobcallback.max.body.length";
+ public static final int DEFAULT_POST_BODY_LENGTH = 4096;
+
+ public static final String MAX_CALLBACK_COUNT_PROPERTY_KEY =
+ "jobcallback.max_count";
+ public static final int DEFAULT_MAX_CALLBACK_COUNT = 3;
+
+ public static final String FIRST_JOB_CALLBACK_URL_TEMPLATE =
+ "job.notification." + STATUS_TOKEN + ".1.url";
+
+ public static final String JOB_CALLBACK_URL_TEMPLATE = "job.notification."
+ + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".url";
+ public static final String JOB_CALLBACK_REQUEST_METHOD_TEMPLATE =
+ "job.notification." + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".method";
+
+ public static final String JOB_CALLBACK_REQUEST_HEADERS_TEMPLATE =
+ "job.notification." + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".headers";
+
+ public static final String JOB_CALLBACK_BODY_TEMPLATE = "job.notification."
+ + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".body";
+
+ public static final String CONTEXT_SERVER_TOKEN = "?{server}";
+ public static final String CONTEXT_PROJECT_TOKEN = "?{project}";
+ public static final String CONTEXT_FLOW_TOKEN = "?{flow}";
+ public static final String CONTEXT_EXECUTION_ID_TOKEN = "?{executionId}";
+ public static final String CONTEXT_JOB_TOKEN = "?{job}";
+ public static final String CONTEXT_JOB_STATUS_TOKEN = "?{status}";
+
+ public static final String HEADER_ELEMENT_DELIMITER = "\r\n";
+ public static final String HEADER_NAME_VALUE_DELIMITER = ":";
+}
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackStatusEnum.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackStatusEnum.java
new file mode 100644
index 0000000..b10679b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackStatusEnum.java
@@ -0,0 +1,5 @@
+package azkaban.jobcallback;
+
+public enum JobCallbackStatusEnum {
+ STARTED, SUCCESS, FAILURE, COMPLETED
+}
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
new file mode 100644
index 0000000..7b1462f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -0,0 +1,130 @@
+package azkaban.jobcallback;
+
+import static azkaban.jobcallback.JobCallbackConstants.DEFAULT_POST_BODY_LENGTH;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_GET;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_POST;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_BODY_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_REQUEST_METHOD_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_URL_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.MAX_POST_BODY_LENGTH_PROPERTY_KEY;
+import static azkaban.jobcallback.JobCallbackConstants.SEQUENCE_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.STATUS_TOKEN;
+
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/**
+ * Responsible for validating the job callback related properties at project
+ * upload time
+ *
+ * @author hluu
+ *
+ */
+public class JobCallbackValidator {
+
+ private static final Logger logger = Logger
+ .getLogger(JobCallbackValidator.class);
+
+ /**
+ * Make sure all the job callback related properties are valid
+ *
+ * @param jobProps
+ * @param error
+ * @return number of valid job callback properties. Mainly for testing
+ * purpose.
+ */
+ public static int validate(String jobName, Props serverProps, Props jobProps,
+ Collection<String> errors) {
+ int maxNumCallback =
+ serverProps.getInt(
+ JobCallbackConstants.MAX_CALLBACK_COUNT_PROPERTY_KEY,
+ JobCallbackConstants.DEFAULT_MAX_CALLBACK_COUNT);
+
+ int maxPostBodyLength =
+ serverProps.getInt(MAX_POST_BODY_LENGTH_PROPERTY_KEY,
+ DEFAULT_POST_BODY_LENGTH);
+
+ int totalCallbackCount = 0;
+ for (JobCallbackStatusEnum jobStatus : JobCallbackStatusEnum.values()) {
+ totalCallbackCount +=
+ validateBasedOnStatus(jobProps, errors, jobStatus, maxNumCallback,
+ maxPostBodyLength);
+ }
+
+ logger.info("Found " + totalCallbackCount + " job callbacks for job "
+ + jobName);
+ return totalCallbackCount;
+ }
+
+ private static int validateBasedOnStatus(Props jobProps,
+ Collection<String> errors, JobCallbackStatusEnum jobStatus,
+ int maxNumCallback, int maxPostBodyLength) {
+
+ int callbackCount = 0;
+ // replace property templates with status
+ String jobCallBackUrl =
+ JOB_CALLBACK_URL_TEMPLATE.replaceFirst(STATUS_TOKEN, jobStatus.name()
+ .toLowerCase());
+
+ String requestMethod =
+ JOB_CALLBACK_REQUEST_METHOD_TEMPLATE.replaceFirst(STATUS_TOKEN,
+ jobStatus.name().toLowerCase());
+
+ String httpBody =
+ JOB_CALLBACK_BODY_TEMPLATE.replaceFirst(STATUS_TOKEN, jobStatus.name()
+ .toLowerCase());
+
+ for (int i = 0; i <= maxNumCallback; i++) {
+ // callback url
+ String callbackUrlKey =
+ jobCallBackUrl.replaceFirst(SEQUENCE_TOKEN, Integer.toString(i));
+ String callbackUrlValue = jobProps.get(callbackUrlKey);
+
+ // sequence number should start at 1, this is to check for sequence
+ // number that starts a 0
+ if (i == 0) {
+ if (callbackUrlValue != null) {
+ errors.add("Sequence number starts at 1, not 0");
+ }
+ continue;
+ }
+
+ if (callbackUrlValue == null || callbackUrlValue.length() == 0) {
+ break;
+ } else {
+ String requestMethodKey =
+ requestMethod.replaceFirst(SEQUENCE_TOKEN, Integer.toString(i));
+
+ String methodValue = jobProps.getString(requestMethodKey, HTTP_GET);
+
+ if (HTTP_POST.equals(methodValue)) {
+ // now try to get the post body
+ String postBodyKey =
+ httpBody.replaceFirst(SEQUENCE_TOKEN, Integer.toString(i));
+ String postBodyValue = jobProps.get(postBodyKey);
+ if (postBodyValue == null || postBodyValue.length() == 0) {
+ errors.add("No POST body was specified for job callback '"
+ + callbackUrlValue + "'");
+ } else if (postBodyValue.length() > maxPostBodyLength) {
+ errors.add("POST body length is : " + postBodyValue.length()
+ + " which is larger than supported length of "
+ + maxPostBodyLength);
+ } else {
+ callbackCount++;
+ }
+ } else if (HTTP_GET.equals(methodValue)) {
+ // that's cool
+ callbackCount++;
+ } else {
+ errors.add("Unsupported request method: " + methodValue
+ + " Only POST and GET are supported");
+ }
+ }
+ }
+
+ return callbackCount;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 1a5f12d..6279099 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -36,6 +36,7 @@ import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
+import azkaban.jobcallback.JobCallbackValidator;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.project.validator.ProjectValidator;
@@ -115,6 +116,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
// Resolve embedded flows
resolveEmbeddedFlows();
+
}
private void loadProjectFromDir(String base, File dir, Props parent) {
@@ -379,9 +381,10 @@ public class DirectoryFlowLoader implements ProjectValidator {
}
private void jobPropertiesCheck(Project project) {
- //if project is in the memory check whitelist, then we don't need to check its memory settings
+ // if project is in the memory check whitelist, then we don't need to check
+ // its memory settings
if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
- ProjectWhitelist.WhitelistType.MemoryCheck)) {
+ ProjectWhitelist.WhitelistType.MemoryCheck)) {
return;
}
@@ -391,19 +394,25 @@ public class DirectoryFlowLoader implements ProjectValidator {
long sizeMaxXmx = Utils.parseMemString(maxXmx);
for (String jobName : jobPropsMap.keySet()) {
+
Props jobProps = jobPropsMap.get(jobName);
String xms = jobProps.getString(XMS, null);
- if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
- && Utils.parseMemString(xms) > sizeMaxXms) {
- errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
- jobName, maxXms));
+ if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
+ && Utils.parseMemString(xms) > sizeMaxXms) {
+ errors.add(String.format(
+ "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+ jobName, maxXms));
}
String xmx = jobProps.getString(XMX, null);
if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
- && Utils.parseMemString(xmx) > sizeMaxXmx) {
- errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
- jobName, maxXmx));
+ && Utils.parseMemString(xmx) > sizeMaxXmx) {
+ errors.add(String.format(
+ "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+ jobName, maxXmx));
}
+
+ // job callback properties check
+ JobCallbackValidator.validate(jobName, props, jobProps, errors);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
new file mode 100644
index 0000000..9741b6a
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
@@ -0,0 +1,223 @@
+package azkaban.jobcallback;
+
+import static azkaban.jobcallback.JobCallbackConstants.DEFAULT_MAX_CALLBACK_COUNT;
+import static azkaban.jobcallback.JobCallbackConstants.MAX_CALLBACK_COUNT_PROPERTY_KEY;
+import static azkaban.jobcallback.JobCallbackConstants.MAX_POST_BODY_LENGTH_PROPERTY_KEY;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+
+public class JobCallbackValidatorTest {
+ private Props serverProps;
+
+ @Before
+ public void setup() {
+ serverProps = new Props();
+ serverProps
+ .put(MAX_CALLBACK_COUNT_PROPERTY_KEY, DEFAULT_MAX_CALLBACK_COUNT);
+ }
+
+ @Test
+ public void noJobCallbackProps() {
+ Props jobProps = new Props();
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(0, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(0, errors.size());
+ }
+
+ @Test
+ public void sequenceStartWithZeroProps() {
+ Props jobProps = new Props();
+ Set<String> errors = new HashSet<String>();
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".0.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.COMPLETED.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(1, errors.size());
+ }
+
+ @Test
+ public void oneGetJobCallback() {
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(0, errors.size());
+ }
+
+ @Test
+ public void onePostJobCallback() {
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+ JobCallbackConstants.HTTP_POST);
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.body",
+ "doc:id");
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(0, errors.size());
+ }
+
+ @Test
+ public void multiplePostJobCallbacks() {
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+ JobCallbackConstants.HTTP_POST);
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.body",
+ "doc:id");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.url",
+ "http://www.linkedin2.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.method",
+ JobCallbackConstants.HTTP_POST);
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.body",
+ "doc2:id");
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(2, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(0, errors.size());
+ }
+
+ @Test
+ public void noPostBodyJobCallback() {
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+ JobCallbackConstants.HTTP_POST);
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(0, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(1, errors.size());
+ System.out.println(errors);
+ }
+
+ @Test
+ public void multipleGetJobCallbacks() {
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(2, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(0, errors.size());
+ }
+
+ @Test
+ public void multipleGetJobCallbackWithGap() {
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".2.url",
+ "http://www.linkedin.com");
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(2, JobCallbackValidator.validate("bogusJob",
+ serverProps, jobProps, errors));
+
+ Assert.assertEquals(0, errors.size());
+ }
+
+ @Test
+ public void postBodyLengthTooLargeTest() {
+
+ Props jobProps = new Props();
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+ "http://www.linkedin.com");
+
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+ JobCallbackConstants.HTTP_POST);
+
+ String postBodyValue = "abcdefghijklmnopqrstuvwxyz";
+
+ int postBodyLength = 20;
+ Assert.assertTrue(postBodyValue.length() > postBodyLength);
+ jobProps.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.body",
+ postBodyValue);
+
+ Props localServerProps = new Props();
+ localServerProps.put(MAX_POST_BODY_LENGTH_PROPERTY_KEY, postBodyLength);
+
+ Set<String> errors = new HashSet<String>();
+
+ Assert.assertEquals(0, JobCallbackValidator.validate("bogusJob",
+ localServerProps, jobProps, errors));
+
+ System.out.println(errors);
+ Assert.assertEquals(1, errors.size());
+
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 5c9ea10..39004b0 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
@@ -36,17 +37,18 @@ import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.log.Log;
import org.mortbay.thread.QueuedThreadPool;
-import azkaban.executor.ExecutorLoader;
-import azkaban.executor.JdbcExecutorLoader;
+import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.execapp.metric.NumFailedFlowMetric;
import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.execapp.metric.NumQueuedFlowMetric;
import azkaban.execapp.metric.NumRunningFlowMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxJettyServer;
import azkaban.metric.IMetricEmitter;
import azkaban.metric.MetricException;
@@ -60,19 +62,23 @@ import azkaban.utils.Props;
import azkaban.utils.SystemMemoryInfo;
import azkaban.utils.Utils;
-
public class AzkabanExecutorServer {
- private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
+ private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY =
+ "jmx.attribute.processor.class";
+ private static final Logger logger = Logger
+ .getLogger(AzkabanExecutorServer.class);
private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
public static final String AZKABAN_HOME = "AZKABAN_HOME";
public static final String DEFAULT_CONF_PATH = "conf";
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
- public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
+ public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
+ "azkaban.private.properties";
public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
- public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
+ public static final String METRIC_INTERVAL =
+ "executor.metric.milisecinterval.";
public static final int DEFAULT_PORT_NUMBER = 12321;
- public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
private static final int DEFAULT_THREAD_NUMBER = 50;
@@ -106,14 +112,17 @@ public class AzkabanExecutorServer {
boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
logger.info("Setting up connector with stats on: " + isStatsOn);
-
- for (Connector connector : server.getConnectors()) {
+
+ for (Connector connector : server.getConnectors()) {
connector.setStatsOn(isStatsOn);
- logger.info(String.format("Jetty connector name: %s, default header buffer size: %d",
- connector.getName(), connector.getHeaderBufferSize()));
- connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize", DEFAULT_HEADER_BUFFER_SIZE));
- logger.info(String.format("Jetty connector name: %s, (if) new header buffer size: %d",
- connector.getName(), connector.getHeaderBufferSize()));
+ logger.info(String.format(
+ "Jetty connector name: %s, default header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
+ connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
+ DEFAULT_HEADER_BUFFER_SIZE));
+ logger.info(String.format(
+ "Jetty connector name: %s, (if) new header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
}
Context root = new Context(server, "/", Context.SESSIONS);
@@ -127,13 +136,22 @@ public class AzkabanExecutorServer {
executionLoader = createExecLoader(props);
projectLoader = createProjectLoader(props);
- runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
+ runnerManager =
+ new FlowRunnerManager(props, executionLoader, projectLoader, this
+ .getClass().getClassLoader());
+
+ JmxJobMBeanManager.getInstance().initialize(props);
+
+ // make sure this happens before
+ configureJobCallback(props);
configureMBeanServer();
configureMetricReports();
SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));
+ loadCustomJMXAttributeProcessor(props);
+
try {
server.start();
} catch (Exception e) {
@@ -144,8 +162,20 @@ public class AzkabanExecutorServer {
logger.info("Azkaban Executor Server started on port " + portNumber);
}
+ private void configureJobCallback(Props props) {
+ boolean jobCallbackEnabled =
+ props.getBoolean("azkaban.executor.jobcallback.enabled", true);
+
+ logger.info("Job callback enabled? " + jobCallbackEnabled);
+
+ if (jobCallbackEnabled) {
+ JobCallbackManager.initialize(props);
+ }
+ }
+
/**
* Configure Metric Reporting as per azkaban.properties settings
+ *
* @throws MetricException
*/
private void configureMetricReports() throws MetricException {
@@ -157,29 +187,74 @@ public class AzkabanExecutorServer {
metricManager.addMetricEmitter(metricEmitter);
logger.info("Adding number of failed flow metric");
- metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt(METRIC_INTERVAL
- + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+ metricManager.addMetric(new NumFailedFlowMetric(metricManager, props
+ .getInt(METRIC_INTERVAL
+ + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
logger.info("Adding number of failed jobs metric");
- metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt(METRIC_INTERVAL
- + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+ metricManager.addMetric(new NumFailedJobMetric(metricManager, props
+ .getInt(METRIC_INTERVAL
+ + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
logger.info("Adding number of running Jobs metric");
- metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt(METRIC_INTERVAL
- + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+ metricManager.addMetric(new NumRunningJobMetric(metricManager, props
+ .getInt(METRIC_INTERVAL
+ + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
logger.info("Adding number of running flows metric");
- metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
- METRIC_INTERVAL + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
- props.getInt(METRIC_INTERVAL + "default"))));
+ metricManager.addMetric(new NumRunningFlowMetric(runnerManager,
+ metricManager, props.getInt(METRIC_INTERVAL
+ + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
logger.info("Adding number of queued flows metric");
- metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
- METRIC_INTERVAL + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
- props.getInt(METRIC_INTERVAL + "default"))));
+ metricManager.addMetric(new NumQueuedFlowMetric(runnerManager,
+ metricManager, props.getInt(METRIC_INTERVAL
+ + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+ props.getInt(METRIC_INTERVAL + "default"))));
logger.info("Completed configuring Metric Reports");
}
+
+ }
+
+ /**
+ * Load a custom class, which is provided by a configuration
+ * CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
+ *
+ * This method will try to instantiate an instance of this custom class and
+ * with given properties as the argument in the constructor.
+ *
+ * Basically the custom class must have a constructor that takes an argument
+ * with type Properties.
+ *
+ * @param props
+ */
+ private void loadCustomJMXAttributeProcessor(Props props) {
+ String jmxAttributeEmitter =
+ props.get(CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY);
+ if (jmxAttributeEmitter != null) {
+ try {
+ logger.info("jmxAttributeEmitter: " + jmxAttributeEmitter);
+ Constructor<Props>[] constructors =
+ (Constructor<Props>[]) Class.forName(jmxAttributeEmitter)
+ .getConstructors();
+
+ constructors[0].newInstance(props.toProperties());
+ } catch (Exception e) {
+ logger.error("Encountered error while loading and instantiating "
+ + jmxAttributeEmitter, e);
+ throw new IllegalStateException(
+ "Encountered error while loading and instantiating "
+ + jmxAttributeEmitter, e);
+ }
+ } else {
+ logger.info("No value for property: "
+ + CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY + " was found");
+ }
}
private ExecutorLoader createExecLoader(Props props) {
@@ -274,23 +349,25 @@ public class AzkabanExecutorServer {
public void logTopMemoryConsumers() throws Exception, IOException {
if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
- && new File("/usr/bin/head").exists()) {
+ && new File("/usr/bin/head").exists()) {
logger.info("logging top memeory consumer");
java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+ new java.lang.ProcessBuilder("/bin/bash", "-c",
+ "/bin/ps aux --sort -rss | /usr/bin/head");
Process p = processBuilder.start();
p.waitFor();
-
+
InputStream is = p.getInputStream();
- java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(is));
String line = null;
while ((line = reader.readLine()) != null) {
logger.info(line);
}
is.close();
}
- }
+ }
});
}
@@ -307,14 +384,16 @@ public class AzkabanExecutorServer {
return null;
}
- if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
+ if (!new File(azkabanHome).isDirectory()
+ || !new File(azkabanHome).canRead()) {
logger.error(azkabanHome + " is not a readable directory.");
return null;
}
File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
- logger.error(azkabanHome + " does not contain a readable conf directory.");
+ logger
+ .error(azkabanHome + " does not contain a readable conf directory.");
return null;
}
@@ -332,7 +411,8 @@ public class AzkabanExecutorServer {
* @return
*/
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
- File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPrivatePropsFile =
+ new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
Props props = null;
@@ -350,7 +430,9 @@ public class AzkabanExecutorServer {
} catch (FileNotFoundException e) {
logger.error("File not found. Could not load azkaban config file", e);
} catch (IOException e) {
- logger.error("File found, but error reading. Could not load azkaban config file", e);
+ logger.error(
+ "File found, but error reading. Could not load azkaban config file",
+ e);
}
return props;
@@ -362,6 +444,13 @@ public class AzkabanExecutorServer {
registerMbean("executorJetty", new JmxJettyServer(server));
registerMbean("flowRunnerManager", new JmxFlowRunnerManager(runnerManager));
+ registerMbean("jobJMXMBean", JmxJobMBeanManager.getInstance());
+
+ if (JobCallbackManager.isInitialized()) {
+ JobCallbackManager jobCallbackMgr = JobCallbackManager.getInstance();
+ registerMbean("jobCallbackJMXMBean",
+ jobCallbackMgr.getJmxJobCallbackMBean());
+ }
}
public void close() {
@@ -384,7 +473,8 @@ public class AzkabanExecutorServer {
logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
registeredMBeans.add(mbeanName);
} catch (Exception e) {
- logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+ logger.error("Error registering mbean " + mbeanClass.getCanonicalName(),
+ e);
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
new file mode 100644
index 0000000..c417b1c
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -0,0 +1,272 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackStatusEnum.COMPLETED;
+import static azkaban.jobcallback.JobCallbackStatusEnum.FAILURE;
+import static azkaban.jobcallback.JobCallbackStatusEnum.STARTED;
+import static azkaban.jobcallback.JobCallbackStatusEnum.SUCCESS;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.message.BasicHeader;
+import org.apache.log4j.Logger;
+
+import azkaban.event.Event;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.execapp.jmx.JmxJobCallback;
+import azkaban.execapp.jmx.JmxJobCallbackMBean;
+import azkaban.executor.Status;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+
+/**
+ * Responsible processing job callback properties on job status change events.
+ *
+ * When job callback properties are specified, they will be converted to HTTP
+ * calls to execute. The HTTP requests will be made in asynchronous mode so the
+ * caller to the handleEvent method will not be block. In addition, the HTTP
+ * calls will be configured to time appropriately for connection request,
+ * creating connection, and socket timeout.
+ *
+ * The HTTP request and response will be logged out the job's log for debugging
+ * and traceability purpose.
+ *
+ * @author hluu
+ *
+ */
+public class JobCallbackManager implements EventListener {
+
+ private static final Logger logger = Logger
+ .getLogger(JobCallbackManager.class);
+
+ private static boolean isInitialized = false;
+ private static JobCallbackManager instance;
+
+ private static int maxNumCallBack = 3;
+
+ private JmxJobCallbackMBean callbackMbean;
+ private String azkabanHostName;
+ private SimpleDateFormat gmtDateFormatter;
+
+ private static final JobCallbackStatusEnum[] ON_COMPLETION_JOB_CALLBACK_STATUS =
+ { SUCCESS, FAILURE, COMPLETED };
+
+ public static void initialize(Props props) {
+ if (isInitialized) {
+ logger.info("Already initialized");
+ return;
+ }
+
+ logger.info("Initializing");
+ instance = new JobCallbackManager(props);
+
+ isInitialized = true;
+ }
+
+ public static boolean isInitialized() {
+ return isInitialized;
+ }
+
+ public static JobCallbackManager getInstance() {
+ if (!isInitialized) {
+ throw new IllegalStateException(JobCallbackManager.class.getName()
+ + " has not been initialized");
+ }
+ return instance;
+ }
+
+ private JobCallbackManager(Props props) {
+ maxNumCallBack = props.getInt("jobcallback.max_count", maxNumCallBack);
+
+ // initialize the request maker
+ JobCallbackRequestMaker.initialize(props);
+
+ callbackMbean =
+ new JmxJobCallback(JobCallbackRequestMaker.getInstance()
+ .getJobcallbackMetrics());
+
+ azkabanHostName = getAzkabanHostName(props);
+
+ gmtDateFormatter = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
+ gmtDateFormatter.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ logger.info("Initialization completed " + getClass().getName());
+ logger.info("azkabanHostName " + azkabanHostName);
+ }
+
+ public JmxJobCallbackMBean getJmxJobCallbackMBean() {
+ return callbackMbean;
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ if (!isInitialized) {
+ return;
+ }
+
+ if (event.getRunner() instanceof JobRunner) {
+ try {
+ if (event.getType() == Event.Type.JOB_STARTED) {
+ processJobCallOnStart(event);
+ } else if (event.getType() == Event.Type.JOB_FINISHED) {
+ processJobCallOnFinish(event);
+ }
+ } catch (Throwable e) {
+ // Use job runner logger so user can see the issue in their job log
+ JobRunner jobRunner = (JobRunner) event.getRunner();
+ jobRunner.getLogger().error(
+ "Encountered error while hanlding job callback event", e);
+ }
+ } else {
+ logger.warn("((( Got an unsupported runner: "
+ + event.getRunner().getClass().getName() + " )))");
+ }
+
+ }
+
+ private void processJobCallOnFinish(Event event) {
+ JobRunner jobRunner = (JobRunner) event.getRunner();
+
+ if (!JobCallbackUtil.isThereJobCallbackProperty(jobRunner.getProps(),
+ ON_COMPLETION_JOB_CALLBACK_STATUS)) {
+ return;
+ }
+
+ // don't want to waste time resolving properties if there are no
+ // callback properties to parse
+ Props props = PropsUtils.resolveProps(jobRunner.getProps());
+
+ Map<String, String> contextInfo =
+ JobCallbackUtil.buildJobContextInfoMap(event, azkabanHostName);
+
+ JobCallbackStatusEnum jobCallBackStatusEnum = null;
+ Logger jobLogger = jobRunner.getLogger();
+
+ Status jobStatus = jobRunner.getNode().getStatus();
+
+ if (jobStatus == Status.SUCCEEDED) {
+
+ jobCallBackStatusEnum = JobCallbackStatusEnum.SUCCESS;
+
+ } else if (jobStatus == Status.FAILED
+ || jobStatus == Status.FAILED_FINISHING || jobStatus == Status.KILLED) {
+
+ jobCallBackStatusEnum = JobCallbackStatusEnum.FAILURE;
+ } else {
+ jobLogger.info("!!!! WE ARE NOT SUPPORTING JOB CALLBACKS FOR STATUS: "
+ + jobStatus);
+ jobCallBackStatusEnum = null; // to be explicit
+ }
+
+ String jobId = contextInfo.get(CONTEXT_JOB_TOKEN);
+
+ if (jobCallBackStatusEnum != null) {
+ List<HttpRequestBase> jobCallbackHttpRequests =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ jobCallBackStatusEnum, contextInfo, maxNumCallBack, jobLogger);
+
+ if (!jobCallbackHttpRequests.isEmpty()) {
+ String msg =
+ String.format("Making %d job callbacks for status: %s",
+ jobCallbackHttpRequests.size(), jobCallBackStatusEnum.name());
+ jobLogger.info(msg);
+
+ addDefaultHeaders(jobCallbackHttpRequests);
+
+ JobCallbackRequestMaker.getInstance().makeHttpRequest(jobId, jobLogger,
+ jobCallbackHttpRequests);
+ } else {
+ jobLogger.info("No job callbacks for status: " + jobCallBackStatusEnum);
+ }
+ }
+
+ // for completed status
+ List<HttpRequestBase> httpRequestsForCompletedStatus =
+ JobCallbackUtil.parseJobCallbackProperties(props, COMPLETED,
+ contextInfo, maxNumCallBack, jobLogger);
+
+ // now make the call
+ if (!httpRequestsForCompletedStatus.isEmpty()) {
+ jobLogger.info("Making " + httpRequestsForCompletedStatus.size()
+ + " job callbacks for status: " + COMPLETED);
+
+ addDefaultHeaders(httpRequestsForCompletedStatus);
+ JobCallbackRequestMaker.getInstance().makeHttpRequest(jobId, jobLogger,
+ httpRequestsForCompletedStatus);
+ } else {
+ jobLogger.info("No job callbacks for status: " + COMPLETED);
+ }
+ }
+
+ private void processJobCallOnStart(Event event) {
+ JobRunner jobRunner = (JobRunner) event.getRunner();
+
+ if (JobCallbackUtil.isThereJobCallbackProperty(jobRunner.getProps(),
+ JobCallbackStatusEnum.STARTED)) {
+
+ // don't want to waste time resolving properties if there are
+ // callback properties to parse
+ Props props = PropsUtils.resolveProps(jobRunner.getProps());
+
+ Map<String, String> contextInfo =
+ JobCallbackUtil.buildJobContextInfoMap(event, azkabanHostName);
+
+ List<HttpRequestBase> jobCallbackHttpRequests =
+ JobCallbackUtil.parseJobCallbackProperties(props, STARTED,
+ contextInfo, maxNumCallBack, jobRunner.getLogger());
+
+ String jobId = contextInfo.get(CONTEXT_JOB_TOKEN);
+ String msg =
+ String.format("Making %d job callbacks for job %s for jobStatus: %s",
+ jobCallbackHttpRequests.size(), jobId, STARTED.name());
+
+ jobRunner.getLogger().info(msg);
+
+ addDefaultHeaders(jobCallbackHttpRequests);
+
+ JobCallbackRequestMaker.getInstance().makeHttpRequest(jobId,
+ jobRunner.getLogger(), jobCallbackHttpRequests);
+ }
+ }
+
+ private String getAzkabanHostName(Props props) {
+ String baseURL = props.get(JobRunner.AZKABAN_WEBSERVER_URL);
+ try {
+ String hostName = InetAddress.getLocalHost().getHostName();
+ if (baseURL != null) {
+ URL url = new URL(baseURL);
+ hostName = url.getHost() + ":" + url.getPort();
+ }
+ return hostName;
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Encountered while getting azkaban host name", e);
+ }
+ }
+
+ private void addDefaultHeaders(List<HttpRequestBase> httpRequests) {
+ if (httpRequests == null) {
+ return;
+ }
+
+ SimpleDateFormat format =
+ new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
+ format.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ for (HttpRequestBase httpRequest : httpRequests) {
+ httpRequest.addHeader(new BasicHeader("Date", gmtDateFormatter
+ .format(new Date())));
+ httpRequest.addHeader(new BasicHeader("Host", azkabanHostName));
+ }
+
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
new file mode 100644
index 0000000..e5673c1
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
@@ -0,0 +1,241 @@
+package azkaban.execapp.event;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.FutureRequestExecutionMetrics;
+import org.apache.http.impl.client.FutureRequestExecutionService;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpRequestFutureTask;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/**
+ * Responsible for making the job callback HTTP requests.
+ *
+ * One of the requirements is to log out the request information and response
+ * using the given logger, which should be the job logger.
+ *
+ * @author hluu
+ *
+ */
+public class JobCallbackRequestMaker {
+
+ private static final Logger logger = Logger
+ .getLogger(JobCallbackRequestMaker.class);
+
+ private static final int DEFAULT_TIME_OUT_MS = 3000;
+ private static final int DEFAULT_RESPONSE_WAIT_TIME_OUT_MS = 5000;
+ private static final int MAX_RESPONSE_LINE_TO_PRINT = 50;
+
+ private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+
+ private static JobCallbackRequestMaker instance;
+ private static boolean isInitialized = false;
+
+ private FutureRequestExecutionService futureRequestExecutionService;
+ private int responseWaitTimeoutMS = -1;
+
+ public static void initialize(Props props) {
+ if (props == null) {
+ throw new NullPointerException("props argument can't be null");
+ }
+
+ if (isInitialized) {
+ return;
+ }
+
+ instance = new JobCallbackRequestMaker(props);
+ isInitialized = true;
+ logger.info("Initialization for " + JobCallbackRequestMaker.class.getName()
+ + " is completed");
+ }
+
+ public static boolean isInitialized() {
+ return isInitialized;
+ }
+
+ public static JobCallbackRequestMaker getInstance() {
+ if (!isInitialized) {
+ throw new IllegalStateException(JobCallbackRequestMaker.class.getName()
+ + " hasn't initialzied");
+ }
+ return instance;
+ }
+
+ private JobCallbackRequestMaker(Props props) {
+
+ int connectionRequestTimeout =
+ props.getInt("jobcallback.connection.request.timeout",
+ DEFAULT_TIME_OUT_MS);
+
+ int connectionTimeout =
+ props.getInt("jobcallback.connection.timeout", DEFAULT_TIME_OUT_MS);
+
+ int socketTimeout =
+ props.getInt("jobcallback.socket.timeout", DEFAULT_TIME_OUT_MS);
+
+ responseWaitTimeoutMS =
+ props.getInt("jobcallback.response.wait.timeout",
+ DEFAULT_RESPONSE_WAIT_TIME_OUT_MS);
+
+ logger.info("responseWaitTimeoutMS: " + responseWaitTimeoutMS);
+
+ RequestConfig requestConfig =
+ RequestConfig.custom()
+ .setConnectionRequestTimeout(connectionRequestTimeout)
+ .setConnectTimeout(connectionTimeout)
+ .setSocketTimeout(socketTimeout).build();
+
+ logger.info("Global request configuration " + requestConfig.toString());
+
+ HttpClient httpClient =
+ HttpClientBuilder.create().setDefaultRequestConfig(requestConfig)
+ .build();
+
+ int jobCallbackThreadPoolSize =
+ props.getInt("jobcallback.thread.pool.size", DEFAULT_THREAD_POOL_SIZE);
+ logger.info("Jobcall thread pool size: " + jobCallbackThreadPoolSize);
+
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(jobCallbackThreadPoolSize);
+ futureRequestExecutionService =
+ new FutureRequestExecutionService(httpClient, executorService);
+ }
+
+ public FutureRequestExecutionMetrics getJobcallbackMetrics() {
+ return futureRequestExecutionService.metrics();
+ }
+
+ public void makeHttpRequest(String jobId, Logger logger,
+ List<HttpRequestBase> httpRequestList) {
+
+ if (httpRequestList == null || httpRequestList.isEmpty()) {
+ logger.info("No HTTP requests to make");
+ return;
+ }
+
+ for (HttpRequestBase httpRequest : httpRequestList) {
+
+ logger.info("Job callback http request: " + httpRequest.toString());
+ logger.info("headers [");
+ for (Header header : httpRequest.getAllHeaders()) {
+ logger.info(String.format(" %s : %s", header.getName(),
+ header.getValue()));
+ }
+ logger.info("]");
+
+ HttpRequestFutureTask<Integer> task =
+ futureRequestExecutionService.execute(httpRequest,
+ HttpClientContext.create(), new LoggingResponseHandler(logger));
+
+ try {
+ // get with timeout
+ Integer statusCode =
+ task.get(responseWaitTimeoutMS, TimeUnit.MILLISECONDS);
+
+ logger.info("http callback status code: " + statusCode);
+ } catch (TimeoutException timeOutEx) {
+ logger
+ .warn("Job callback target took longer "
+ + (responseWaitTimeoutMS / 1000) + " seconds to respond",
+ timeOutEx);
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof SocketTimeoutException) {
+ logger.warn("Job callback target took longer "
+ + (responseWaitTimeoutMS / 1000) + " seconds to respond", ee);
+ } else {
+ logger.warn(
+ "Encountered error while waiting for job callback to complete",
+ ee);
+ }
+ } catch (Throwable e) {
+ logger.warn(
+ "Encountered error while waiting for job callback to complete", e);
+ }
+ }
+ }
+
+ /**
+ * Response handler for logging job callback response using the given logger
+ * instance
+ *
+ * @author hluu
+ *
+ */
+ private final class LoggingResponseHandler implements
+ ResponseHandler<Integer> {
+
+ private Logger logger;
+
+ public LoggingResponseHandler(Logger logger) {
+ if (logger == null) {
+ throw new NullPointerException("Argument logger can't be null");
+ }
+ this.logger = logger;
+ }
+
+ public Integer handleResponse(final HttpResponse response)
+ throws ClientProtocolException, IOException {
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ BufferedReader bufferedReader = null;
+
+ try {
+ HttpEntity responseEntity = response.getEntity();
+ if (responseEntity != null) {
+ bufferedReader =
+ new BufferedReader(new InputStreamReader(
+ responseEntity.getContent()));
+
+ String line = "";
+ int lineCount = 0;
+ logger.info("HTTP response [");
+ while ((line = bufferedReader.readLine()) != null) {
+ logger.info(line);
+ lineCount++;
+ if (lineCount > MAX_RESPONSE_LINE_TO_PRINT) {
+ break;
+ }
+ }
+ logger.info("]");
+ } else {
+ logger.info("No response");
+ }
+
+ } catch (Throwable t) {
+ logger.warn(
+ "Encountered error while logging out job callback response", t);
+ } finally {
+ if (bufferedReader != null) {
+ try {
+ bufferedReader.close();
+ } catch (IOException ex) {
+ // don't care
+ }
+ }
+ }
+ return statusCode;
+
+ }
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackUtil.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
new file mode 100644
index 0000000..a6ed354
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
@@ -0,0 +1,324 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_EXECUTION_ID_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.FIRST_JOB_CALLBACK_URL_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_FLOW_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+import static azkaban.jobcallback.JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_GET;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_POST;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_BODY_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_REQUEST_HEADERS_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_REQUEST_METHOD_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_URL_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_STATUS_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_PROJECT_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.SEQUENCE_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_SERVER_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.STATUS_TOKEN;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.message.BasicHeader;
+import org.apache.log4j.Logger;
+
+import azkaban.event.Event;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.ExecutableNode;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+
+public class JobCallbackUtil {
+ private static final Logger logger = Logger.getLogger(JobCallbackUtil.class);
+
+ private static Map<JobCallbackStatusEnum, String> firstJobcallbackPropertyMap =
+ new HashMap<JobCallbackStatusEnum, String>(
+ JobCallbackStatusEnum.values().length);
+
+ static {
+ for (JobCallbackStatusEnum statusEnum : JobCallbackStatusEnum.values()) {
+ firstJobcallbackPropertyMap.put(statusEnum,
+ replaceStatusToken(FIRST_JOB_CALLBACK_URL_TEMPLATE, statusEnum));
+ }
+ }
+
+ /**
+ * Use to quickly determine if there is a job callback related property in the
+ * Props.
+ *
+ * @param props
+ * @param status
+ * @return true if there is job callback related property
+ */
+ public static boolean isThereJobCallbackProperty(Props props,
+ JobCallbackStatusEnum status) {
+
+ if (props == null || status == null) {
+ throw new NullPointerException("One of the argument is null");
+ }
+
+ String jobCallBackUrl = firstJobcallbackPropertyMap.get(status);
+ return props.containsKey(jobCallBackUrl);
+ }
+
+ public static boolean isThereJobCallbackProperty(Props props,
+ JobCallbackStatusEnum... jobStatuses) {
+
+ if (props == null || jobStatuses == null) {
+ throw new NullPointerException("One of the argument is null");
+ }
+
+ for (JobCallbackStatusEnum jobStatus : jobStatuses) {
+ if (JobCallbackUtil.isThereJobCallbackProperty(props, jobStatus)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<HttpRequestBase> parseJobCallbackProperties(Props props,
+ JobCallbackStatusEnum status, Map<String, String> contextInfo,
+ int maxNumCallback) {
+
+ return parseJobCallbackProperties(props, status, contextInfo,
+ maxNumCallback, logger);
+ }
+
+ /**
+ * This method is responsible for parsing job call URL properties and convert
+ * them into a list of HttpRequestBase, which callers can use to execute.
+ *
+ * In addition to parsing, it will also replace the tokens with actual values.
+ *
+ * @param props
+ * @param status
+ * @param event
+ * @return List<HttpRequestBase> - empty if no job callback related properties
+ */
+ public static List<HttpRequestBase> parseJobCallbackProperties(Props props,
+ JobCallbackStatusEnum status, Map<String, String> contextInfo,
+ int maxNumCallback, Logger privateLogger) {
+ String callbackUrl = null;
+
+ if (!isThereJobCallbackProperty(props, status)) {
+ // short circuit
+ return Collections.emptyList();
+ }
+
+ List<HttpRequestBase> result = new ArrayList<HttpRequestBase>();
+
+ // replace property templates with status
+ String jobCallBackUrlKey =
+ replaceStatusToken(JOB_CALLBACK_URL_TEMPLATE, status);
+
+ String requestMethod =
+ replaceStatusToken(JOB_CALLBACK_REQUEST_METHOD_TEMPLATE, status);
+
+ String httpBodyKey = replaceStatusToken(JOB_CALLBACK_BODY_TEMPLATE, status);
+
+ String headersKey =
+ replaceStatusToken(JOB_CALLBACK_REQUEST_HEADERS_TEMPLATE, status);
+
+ for (int sequence = 1; sequence <= maxNumCallback; sequence++) {
+ HttpRequestBase httpRequest = null;
+ String sequenceStr = Integer.toString(sequence);
+ // callback url
+ String callbackUrlKey =
+ jobCallBackUrlKey.replace(SEQUENCE_TOKEN, sequenceStr);
+
+ callbackUrl = props.get(callbackUrlKey);
+ if (callbackUrl == null || callbackUrl.length() == 0) {
+ // no more needs to done
+ break;
+ } else {
+ String callbackUrlWithTokenReplaced =
+ replaceTokens(callbackUrl, contextInfo, true);
+
+ String requestMethodKey =
+ requestMethod.replace(SEQUENCE_TOKEN, sequenceStr);
+
+ String method = props.getString(requestMethodKey, HTTP_GET);
+
+ if (HTTP_POST.equals(method)) {
+ String postBodyKey = httpBodyKey.replace(SEQUENCE_TOKEN, sequenceStr);
+ String httpBodyValue = props.get(postBodyKey);
+ if (httpBodyValue == null) {
+ // missing body for POST, not good
+ // update the wiki about skipping callback url if body is missing
+ privateLogger.warn("Missing value for key: " + postBodyKey
+ + " skipping job callback '" + callbackUrl + " for job "
+ + contextInfo.get(CONTEXT_JOB_TOKEN));
+ } else {
+ // put together an URL
+ HttpPost httpPost = new HttpPost(callbackUrlWithTokenReplaced);
+ String postActualBody =
+ replaceTokens(httpBodyValue, contextInfo, false);
+ privateLogger.info("postActualBody: " + postActualBody);
+ httpPost.setEntity(createStringEntity(postActualBody));
+ httpRequest = httpPost;
+ }
+ } else if (HTTP_GET.equals(method)) {
+ // GET
+ httpRequest = new HttpGet(callbackUrlWithTokenReplaced);
+ } else {
+ privateLogger.warn("Unsupported request method: " + method
+ + ". Only POST and GET are supported");
+ }
+
+ String headersKeyPerSequence =
+ headersKey.replace(SEQUENCE_TOKEN, sequenceStr);
+ String headersValue = props.get(headersKeyPerSequence);
+ privateLogger.info("headers: " + headersValue);
+ Header[] headers = parseHttpHeaders(headersValue);
+ if (headers != null) {
+ httpRequest.setHeaders(headers);
+ privateLogger.info("# of headers found: " + headers.length);
+ }
+ result.add(httpRequest);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Parse headers
+ *
+ * @param headers
+ * @return null if headers is null or empty
+ */
+ public static Header[] parseHttpHeaders(String headers) {
+ if (headers == null || headers.length() == 0) {
+ return null;
+ }
+
+ String[] headerArray = headers.split(HEADER_ELEMENT_DELIMITER);
+ List<Header> headerList = new ArrayList<Header>(headerArray.length);
+ for (int i = 0; i < headerArray.length; i++) {
+ String headerPair = headerArray[i];
+ int index = headerPair.indexOf(HEADER_NAME_VALUE_DELIMITER);
+ if (index != -1) {
+ headerList.add(new BasicHeader(headerPair.substring(0, index),
+ headerPair.substring(index + 1)));
+ }
+
+ }
+ return headerList.toArray(new BasicHeader[0]);
+ }
+
+ private static String replaceStatusToken(String template,
+ JobCallbackStatusEnum status) {
+ return template.replaceFirst(STATUS_TOKEN, status.name().toLowerCase());
+ }
+
+ private static StringEntity createStringEntity(String str) {
+ try {
+ return new StringEntity(str);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Encoding not supported", e);
+ }
+ }
+
+ /**
+ * This method takes the job context info. and put the values into a map with
+ * keys as the tokens.
+ *
+ * @param event
+ * @return Map<String,String>
+ */
+ public static Map<String, String> buildJobContextInfoMap(Event event,
+ String server) {
+
+ if (event.getRunner() instanceof JobRunner) {
+ JobRunner jobRunner = (JobRunner) event.getRunner();
+ ExecutableNode node = jobRunner.getNode();
+ String projectName = node.getParentFlow().getProjectName();
+ String flowName = node.getParentFlow().getFlowId();
+ String executionId =
+ String.valueOf(node.getParentFlow().getExecutionId());
+ String jobId = node.getId();
+
+ Map<String, String> result = new HashMap<String, String>();
+ result.put(CONTEXT_SERVER_TOKEN, server);
+ result.put(CONTEXT_PROJECT_TOKEN, projectName);
+ result.put(CONTEXT_FLOW_TOKEN, flowName);
+ result.put(CONTEXT_EXECUTION_ID_TOKEN, executionId);
+ result.put(CONTEXT_JOB_TOKEN, jobId);
+ result.put(CONTEXT_JOB_STATUS_TOKEN, node.getStatus().name().toLowerCase());
+
+ /*
+ * if (node.getStatus() == Status.SUCCEEDED || node.getStatus() ==
+ * Status.FAILED) { result.put(JOB_STATUS_TOKEN,
+ * node.getStatus().name().toLowerCase()); } else if (node.getStatus() ==
+ * Status.PREPARING) { result.put(JOB_STATUS_TOKEN, "started"); }
+ */
+
+ return result;
+
+ } else {
+ throw new IllegalArgumentException("Provided event is not a job event");
+ }
+ }
+
+ /**
+ * Replace the supported tokens in the URL with values in the contextInfo.
+ * This will also make sure the values are HTTP encoded.
+ *
+ * @param value
+ * @param contextInfo
+ * @param withEncoding - whether the token values will be HTTP encoded
+ * @return String - value with tokens replaced with values
+ */
+ public static String replaceTokens(String value,
+ Map<String, String> contextInfo, boolean withEncoding) {
+
+ String result = value;
+ String tokenValue =
+ encodeQueryParam(contextInfo.get(CONTEXT_SERVER_TOKEN), withEncoding);
+ result = result.replaceFirst(Pattern.quote(CONTEXT_SERVER_TOKEN), tokenValue);
+
+ tokenValue = encodeQueryParam(contextInfo.get(CONTEXT_PROJECT_TOKEN), withEncoding);
+ result = result.replaceFirst(Pattern.quote(CONTEXT_PROJECT_TOKEN), tokenValue);
+
+ tokenValue = encodeQueryParam(contextInfo.get(CONTEXT_FLOW_TOKEN), withEncoding);
+ result = result.replaceFirst(Pattern.quote(CONTEXT_FLOW_TOKEN), tokenValue);
+
+ tokenValue = encodeQueryParam(contextInfo.get(CONTEXT_JOB_TOKEN), withEncoding);
+ result = result.replaceFirst(Pattern.quote(CONTEXT_JOB_TOKEN), tokenValue);
+
+ tokenValue =
+ encodeQueryParam(contextInfo.get(CONTEXT_EXECUTION_ID_TOKEN), withEncoding);
+ result = result.replaceFirst(Pattern.quote(CONTEXT_EXECUTION_ID_TOKEN), tokenValue);
+
+ tokenValue =
+ encodeQueryParam(contextInfo.get(CONTEXT_JOB_STATUS_TOKEN), withEncoding);
+
+ result = result.replaceFirst(Pattern.quote(CONTEXT_JOB_STATUS_TOKEN), tokenValue);
+
+ return result;
+ }
+
+ private static String encodeQueryParam(String str, boolean withEncoding) {
+ if (!withEncoding) {
+ return str;
+ }
+ try {
+ return URLEncoder.encode(str, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(
+ "Encountered problem during encoding:", e);
+ }
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 779d422..da98b99 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,8 @@ import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.JobCallbackManager;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.executor.ExecutableFlow;
@@ -62,13 +64,13 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.SwapQueue;
-
/**
* Class that handles the running of a ExecutableFlow DAG
*
*/
public class FlowRunner extends EventHandler implements Runnable {
- private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout(
+ "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the
// most part, we'll be idling.
private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -97,7 +99,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+ private Set<JobRunner> activeJobRunners = Collections
+ .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
// Thread safe swap queue for finishedExecutions.
private SwapQueue<ExecutableNode> finishedNodes;
@@ -132,8 +135,9 @@ public class FlowRunner extends EventHandler implements Runnable {
* @param jobtypeManager
* @throws ExecutorManagerException
*/
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
- JobTypeManager jobtypeManager) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
+ ProjectLoader projectLoader, JobTypeManager jobtypeManager)
+ throws ExecutorManagerException {
this(flow, executorLoader, projectLoader, jobtypeManager, null);
}
@@ -148,8 +152,9 @@ public class FlowRunner extends EventHandler implements Runnable {
* @param executorService
* @throws ExecutorManagerException
*/
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
- JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
+ ProjectLoader projectLoader, JobTypeManager jobtypeManager,
+ ExecutorService executorService) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -211,18 +216,23 @@ public class FlowRunner extends EventHandler implements Runnable {
runFlow();
} catch (Throwable t) {
if (logger != null) {
- logger.error("An error has occurred during the running of the flow. Quiting.", t);
+ logger
+ .error(
+ "An error has occurred during the running of the flow. Quiting.",
+ t);
}
flow.setStatus(Status.FAILED);
} finally {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
- logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
+ logger
+ .info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
flow.setEndTime(System.currentTimeMillis());
- logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
+ logger.info("Setting end time for flow " + execId + " to "
+ + System.currentTimeMillis());
closeLogger();
updateFlow();
@@ -247,7 +257,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
// If there are flow overrides, we apply them now.
- Map<String, String> flowParam = flow.getExecutionOptions().getFlowParameters();
+ Map<String, String> flowParam =
+ flow.getExecutionOptions().getFlowParameters();
if (flowParam != null && !flowParam.isEmpty()) {
commonFlowProps = new Props(commonFlowProps, flowParam);
}
@@ -260,9 +271,11 @@ public class FlowRunner extends EventHandler implements Runnable {
this.watcher.setLogger(logger);
}
- logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
+ logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
+ + projectId + " version:" + version);
if (pipelineExecId != null) {
- logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
+ logger.info("Running simulateously with " + pipelineExecId
+ + ". Pipelining level " + pipelineLevel);
}
// The current thread is used for interrupting blocks
@@ -272,8 +285,10 @@ public class FlowRunner extends EventHandler implements Runnable {
private void updateFlowReference() throws ExecutorManagerException {
logger.info("Update active reference");
- if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
- throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
+ if (!executorLoader.updateExecutableReference(execId,
+ System.currentTimeMillis())) {
+ throw new ExecutorManagerException(
+ "The executor reference doesn't exist. May have been killed prematurely.");
}
}
@@ -396,7 +411,8 @@ public class FlowRunner extends EventHandler implements Runnable {
resetFailedState(this.flow, retryJobs);
for (ExecutableNode node : retryJobs) {
- if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY
+ || node.getStatus() == Status.DISABLED) {
runReadyJob(node);
} else if (node.getStatus() == Status.SUCCEEDED) {
for (String outNodeId : node.getOutNodes()) {
@@ -465,7 +481,8 @@ public class FlowRunner extends EventHandler implements Runnable {
// Instant kill or skip if necessary.
boolean jobsRun = false;
for (ExecutableNode node : nodesToCheck) {
- if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
+ if (Status.isStatusFinished(node.getStatus())
+ || Status.isStatusRunning(node.getStatus())) {
// Really shouldn't get in here.
continue;
}
@@ -482,7 +499,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private boolean runReadyJob(ExecutableNode node) throws IOException {
- if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
+ if (Status.isStatusFinished(node.getStatus())
+ || Status.isStatusRunning(node.getStatus())) {
return false;
}
@@ -492,7 +510,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (nextNodeStatus == Status.CANCELLED) {
- logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
+ logger.info("Cancelling '" + node.getNestedId()
+ + "' due to prior errors.");
node.cancelNode(System.currentTimeMillis());
finishExecutableNode(node);
} else if (nextNodeStatus == Status.SKIPPED) {
@@ -524,8 +543,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (node.getRetries() > node.getAttempt()) {
- logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of "
- + node.getRetries());
+ logger.info("Job '" + node.getId() + "' will be retried. Attempt "
+ + node.getAttempt() + " of " + node.getRetries());
node.setDelayedExecution(node.getRetryBackoff());
node.resetForRetry();
return true;
@@ -566,7 +585,8 @@ public class FlowRunner extends EventHandler implements Runnable {
for (String end : flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
- if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED
+ if (node.getStatus() == Status.KILLED
+ || node.getStatus() == Status.FAILED
|| node.getStatus() == Status.CANCELLED) {
succeeded = false;
}
@@ -588,19 +608,22 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setUpdateTime(System.currentTimeMillis());
long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
switch (flow.getStatus()) {
- case FAILED_FINISHING:
- logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
- flow.setStatus(Status.FAILED);
- break;
- case FAILED:
- case KILLED:
- case CANCELLED:
- case FAILED_SUCCEEDED:
- logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
- break;
- default:
- flow.setStatus(Status.SUCCEEDED);
- logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+ case FAILED_FINISHING:
+ logger.info("Setting flow '" + id + "' status to FAILED in "
+ + durationSec + " seconds");
+ flow.setStatus(Status.FAILED);
+ break;
+ case FAILED:
+ case KILLED:
+ case CANCELLED:
+ case FAILED_SUCCEEDED:
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
+ + " in " + durationSec + " seconds");
+ break;
+ default:
+ flow.setStatus(Status.SUCCEEDED);
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
+ + " in " + durationSec + " seconds");
}
// If the finalized flow is actually the top level flow, than we finish
@@ -671,10 +694,13 @@ public class FlowRunner extends EventHandler implements Runnable {
// load the override props if any
try {
- props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId() + ".jor");
+ props =
+ projectLoader.fetchProjectProperty(flow.getProjectId(),
+ flow.getVersion(), node.getId() + ".jor");
} catch (ProjectManagerException e) {
e.printStackTrace();
- logger.error("Error loading job override property for job " + node.getId());
+ logger.error("Error loading job override property for job "
+ + node.getId());
}
File path = new File(execDir, source);
@@ -684,7 +710,8 @@ public class FlowRunner extends EventHandler implements Runnable {
props = new Props(null, path);
} catch (IOException e) {
e.printStackTrace();
- logger.error("Error loading job file " + source + " for job " + node.getId());
+ logger.error("Error loading job file " + source + " for job "
+ + node.getId());
}
}
// setting this fake source as this will be used to determine the location
@@ -724,7 +751,8 @@ public class FlowRunner extends EventHandler implements Runnable {
public Status getImpliedStatus(ExecutableNode node) {
// If it's running or finished with 'SUCCEEDED', than don't even
// bother starting this job.
- if (Status.isStatusRunning(node.getStatus()) || node.getStatus() == Status.SUCCEEDED) {
+ if (Status.isStatusRunning(node.getStatus())
+ || node.getStatus() == Status.SUCCEEDED) {
return null;
}
@@ -739,7 +767,8 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!Status.isStatusFinished(depStatus)) {
return null;
- } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
+ } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
+ || depStatus == Status.KILLED) {
// We propagate failures as KILLED states.
shouldKill = true;
}
@@ -747,14 +776,16 @@ public class FlowRunner extends EventHandler implements Runnable {
// If it's disabled but ready to run, we want to make sure it continues
// being disabled.
- if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
+ if (node.getStatus() == Status.DISABLED
+ || node.getStatus() == Status.SKIPPED) {
return Status.SKIPPED;
}
// If the flow has failed, and we want to finish only the currently running
// jobs, we just
// kill everything else. We also kill, if the flow has been cancelled.
- if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+ if (flowFailed
+ && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
return Status.CANCELLED;
} else if (shouldKill || isKilled()) {
return Status.CANCELLED;
@@ -768,7 +799,8 @@ public class FlowRunner extends EventHandler implements Runnable {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
for (String dependency : node.getInNodes()) {
- Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
+ Props output =
+ node.getParentFlow().getExecutableNode(dependency).getOutputProps();
if (output != null) {
output = Props.clone(output);
output.setParent(previousOutput);
@@ -783,7 +815,9 @@ public class FlowRunner extends EventHandler implements Runnable {
// Load job file.
File path = new File(execDir, node.getJobSource());
- JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
+ JobRunner jobRunner =
+ new JobRunner(node, path.getParentFile(), executorLoader,
+ jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
@@ -795,6 +829,10 @@ public class FlowRunner extends EventHandler implements Runnable {
jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
+ if (JobCallbackManager.isInitialized()) {
+ jobRunner.addListener(JobCallbackManager.getInstance());
+ }
+
configureJobLevelMetrics(jobRunner);
return jobRunner;
@@ -802,6 +840,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Configure Azkaban metrics tracking for a new jobRunner instance
+ *
* @param jobRunner
*/
private void configureJobLevelMetrics(JobRunner jobRunner) {
@@ -809,15 +848,17 @@ public class FlowRunner extends EventHandler implements Runnable {
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- //Adding NumRunningJobMetric listener
+ // Adding NumRunningJobMetric listener
jobRunner.addListener((NumRunningJobMetric) metricManager
.getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME));
- //Adding NumFailedJobMetric listener
+ // Adding NumFailedJobMetric listener
jobRunner.addListener((NumFailedJobMetric) metricManager
.getMetricFromName(NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME));
}
+
+ jobRunner.addListener(JmxJobMBeanManager.getInstance());
}
public void pause(String user) {
@@ -879,7 +920,8 @@ public class FlowRunner extends EventHandler implements Runnable {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
- logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
+ logger
+ .info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
logger.info("Killing " + activeJobRunners.size() + " jobs.");
@@ -897,7 +939,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
+ private void resetFailedState(ExecutableFlowBase flow,
+ List<ExecutableNode> nodesToRetry) {
// bottom up
LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
for (String id : flow.getEndNodes()) {
@@ -926,24 +969,24 @@ public class FlowRunner extends EventHandler implements Runnable {
} else if (node instanceof ExecutableFlowBase) {
ExecutableFlowBase base = (ExecutableFlowBase) node;
switch (base.getStatus()) {
- case CANCELLED:
- node.setStatus(Status.READY);
- node.setEndTime(-1);
- node.setStartTime(-1);
- node.setUpdateTime(currentTime);
- // Break out of the switch. We'll reset the flow just like a normal
- // node
- break;
- case KILLED:
- case FAILED:
- case FAILED_FINISHING:
- resetFailedState(base, nodesToRetry);
- continue;
- default:
- // Continue the while loop. If the job is in a finished state that's
- // not
- // a failure, we don't want to reset the job.
- continue;
+ case CANCELLED:
+ node.setStatus(Status.READY);
+ node.setEndTime(-1);
+ node.setStartTime(-1);
+ node.setUpdateTime(currentTime);
+ // Break out of the switch. We'll reset the flow just like a normal
+ // node
+ break;
+ case KILLED:
+ case FAILED:
+ case FAILED_FINISHING:
+ resetFailedState(base, nodesToRetry);
+ continue;
+ default:
+ // Continue the while loop. If the job is in a finished state that's
+ // not
+ // a failure, we don't want to reset the job.
+ continue;
}
} else if (node.getStatus() == Status.CANCELLED) {
// Not a flow, but killed
@@ -951,13 +994,16 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setStartTime(-1);
node.setEndTime(-1);
node.setUpdateTime(currentTime);
- } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
+ } else if (node.getStatus() == Status.FAILED
+ || node.getStatus() == Status.KILLED) {
node.resetForRetry();
nodesToRetry.add(node);
}
- if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
- logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
+ if (!(node instanceof ExecutableFlowBase)
+ && node.getStatus() != oldStatus) {
+ logger.info("Resetting job '" + node.getNestedId() + "' from "
+ + oldStatus + " to " + node.getStatus());
}
for (String inId : node.getInNodes()) {
@@ -979,14 +1025,16 @@ public class FlowRunner extends EventHandler implements Runnable {
// start node has not.
for (String id : flow.getStartNodes()) {
ExecutableNode node = flow.getExecutableNode(id);
- if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY
+ || node.getStatus() == Status.DISABLED) {
nodesToRetry.add(node);
}
}
}
flow.setUpdateTime(System.currentTimeMillis());
flow.setEndTime(-1);
- logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
+ logger.info("Resetting flow '" + flow.getNestedId() + "' from "
+ + oldFlowState + " to " + flow.getStatus());
}
private void interrupt() {
@@ -1007,13 +1055,14 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (mainSyncObj) {
- logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds
- + " seconds");
+ logger.info("Job " + node.getNestedId() + " finished with status "
+ + node.getStatus() + " in " + seconds + " seconds");
// Cancellation is handled in the main thread, but if the flow is
// paused, the main thread is paused too.
// This unpauses the flow for cancellation.
- if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
+ if (flowPaused && node.getStatus() == Status.FAILED
+ && failureAction == FailureAction.CANCEL_ALL) {
flowPaused = false;
}
@@ -1056,7 +1105,8 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = flow.getExecutableNodePath(jobId);
File path = new File(execDir, node.getJobSource());
- String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
+ String attachmentFileName =
+ JobRunner.createAttachmentFileName(node, attempt);
File attachmentFile = new File(path.getParentFile(), attachmentFileName);
if (!attachmentFile.exists()) {
return null;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 124b5c0..c8d7f1c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -43,7 +43,6 @@ import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.execapp.metric.NumFailedFlowMetric;
-import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
@@ -316,8 +315,9 @@ public class FlowRunnerManager implements EventListener,
wait(RECENTLY_FINISHED_TIME_TO_LIVE);
} catch (InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
- } catch (Throwable t){
- logger.warn("Uncaught throwable, please look into why it is not caught", t);
+ } catch (Throwable t) {
+ logger.warn(
+ "Uncaught throwable, please look into why it is not caught", t);
}
}
}
@@ -517,6 +517,7 @@ public class FlowRunnerManager implements EventListener,
/**
* Configure Azkaban metrics tracking for a new flowRunner instance
+ *
* @param flowRunner
*/
private void configureFlowLevelMetrics(FlowRunner flowRunner) {
@@ -524,10 +525,11 @@ public class FlowRunnerManager implements EventListener,
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- //Adding NumFailedFlow Metric listener
+ // Adding NumFailedFlow Metric listener
flowRunner.addListener((NumFailedFlowMetric) metricManager
.getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
}
+
}
private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallback.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallback.java
new file mode 100644
index 0000000..0b3a11d
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallback.java
@@ -0,0 +1,33 @@
+package azkaban.execapp.jmx;
+
+import org.apache.http.impl.client.FutureRequestExecutionMetrics;
+
+public class JmxJobCallback implements JmxJobCallbackMBean {
+
+ private FutureRequestExecutionMetrics jobCallbackMetrics;
+
+ public JmxJobCallback(FutureRequestExecutionMetrics jobCallbackMetrics) {
+ this.jobCallbackMetrics = jobCallbackMetrics;
+ }
+
+ @Override
+ public long getNumJobCallbacks() {
+ return jobCallbackMetrics.getRequestCount();
+ }
+
+ @Override
+ public long getNumSuccessfulJobCallbacks() {
+ return jobCallbackMetrics.getSuccessfulConnectionCount();
+ }
+
+ @Override
+ public long getNumFailedJobCallbacks() {
+ return jobCallbackMetrics.getFailedConnectionCount();
+ }
+
+ @Override
+ public long getNumActiveJobCallbacks() {
+ return jobCallbackMetrics.getActiveConnectionCount();
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallbackMBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallbackMBean.java
new file mode 100644
index 0000000..4b9d27d
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallbackMBean.java
@@ -0,0 +1,18 @@
+package azkaban.execapp.jmx;
+
+import azkaban.jmx.DisplayName;
+
+public interface JmxJobCallbackMBean {
+ @DisplayName("OPERATION: getNumJobCallbacks")
+ public long getNumJobCallbacks();
+
+ @DisplayName("OPERATION: getNumSuccessfulJobCallbacks")
+ public long getNumSuccessfulJobCallbacks();
+
+ @DisplayName("OPERATION: getNumFailedJobCallbacks")
+ public long getNumFailedJobCallbacks();
+
+ @DisplayName("OPERATION: getNumActiveJobCallbacks")
+ public long getNumActiveJobCallbacks();
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
new file mode 100644
index 0000000..4fb0c62
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -0,0 +1,164 @@
+package azkaban.execapp.jmx;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+import azkaban.event.Event;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import azkaban.utils.Props;
+
+/**
+ * Responsible keeping track of job related MBean attributes through listening
+ * to job related events.
+ *
+ * @author hluu
+ *
+ */
+public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
+
+ private static final Logger logger = Logger
+ .getLogger(JmxJobMBeanManager.class);
+
+ private static JmxJobMBeanManager INSTANCE = new JmxJobMBeanManager();
+
+ private AtomicInteger runningJobCount = new AtomicInteger(0);
+ private AtomicInteger totalExecutedJobCount = new AtomicInteger(0);
+ private AtomicInteger totalFailedJobCount = new AtomicInteger(0);
+ private AtomicInteger totalSucceededJobCount = new AtomicInteger(0);
+
+ private Map<String, AtomicInteger> jobTypeFailureMap =
+ new HashMap<String, AtomicInteger>();
+
+ private Map<String, AtomicInteger> jobTypeSucceededMap =
+ new HashMap<String, AtomicInteger>();
+
+ private boolean initialized;
+
+ private JmxJobMBeanManager() {
+ }
+
+ public static JmxJobMBeanManager getInstance() {
+ return INSTANCE;
+ }
+
+ public void initialize(Props props) {
+ logger.info("Initializing " + getClass().getName());
+ initialized = true;
+ }
+
+ @Override
+ public int getNumRunningJobs() {
+ return runningJobCount.get();
+ }
+
+ @Override
+ public int getTotalNumExecutedJobs() {
+ return totalExecutedJobCount.get();
+ }
+
+ @Override
+ public int getTotalFailedJobs() {
+ return totalFailedJobCount.get();
+ }
+
+ @Override
+ public int getTotalSucceededJobs() {
+ return totalSucceededJobCount.get();
+ }
+
+ @Override
+ public Map<String, Integer> getTotalSucceededJobsByJobType() {
+ return convertMapValueToInteger(jobTypeSucceededMap);
+ }
+
+ @Override
+ public Map<String, Integer> getTotalFailedJobsByJobType() {
+ return convertMapValueToInteger(jobTypeFailureMap);
+ }
+
+ private Map<String, Integer> convertMapValueToInteger(
+ Map<String, AtomicInteger> map) {
+ Map<String, Integer> result = new HashMap<String, Integer>(map.size());
+
+ for (Map.Entry<String, AtomicInteger> entry : map.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().intValue());
+ }
+
+ return result;
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ if (!initialized) {
+ throw new RuntimeException("JmxJobMBeanManager has not been initialized");
+ }
+
+ if (event.getRunner() instanceof JobRunner) {
+ JobRunner jobRunner = (JobRunner) event.getRunner();
+ ExecutableNode node = jobRunner.getNode();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("*** got " + event.getType() + " " + node.getId() + " "
+ + event.getRunner().getClass().getName() + " status: "
+ + node.getStatus());
+ }
+
+ if (event.getType() == Event.Type.JOB_STARTED) {
+ runningJobCount.incrementAndGet();
+ } else if (event.getType() == Event.Type.JOB_FINISHED) {
+ totalExecutedJobCount.incrementAndGet();
+ if (runningJobCount.intValue() > 0) {
+ runningJobCount.decrementAndGet();
+ } else {
+ logger.warn("runningJobCount not messed up, it is already zero "
+ + "and we are trying to decrement on job event "
+ + Event.Type.JOB_FINISHED);
+ }
+
+ if (node.getStatus() == Status.FAILED) {
+ totalFailedJobCount.incrementAndGet();
+ } else if (node.getStatus() == Status.SUCCEEDED) {
+ totalSucceededJobCount.incrementAndGet();
+ }
+
+ handleJobFinishedCount(node.getStatus(), node.getType());
+ }
+
+ } else {
+ logger.warn("((((((((( Got a different runner: "
+ + event.getRunner().getClass().getName());
+ }
+ }
+
+ private void handleJobFinishedCount(Status status, String jobType) {
+ switch (status) {
+ case FAILED:
+ handleJobFinishedByType(jobTypeFailureMap, jobType);
+ break;
+ case SUCCEEDED:
+ handleJobFinishedByType(jobTypeSucceededMap, jobType);
+ break;
+ default:
+ }
+ }
+
+ private void handleJobFinishedByType(Map<String, AtomicInteger> jobTypeMap,
+ String jobType) {
+
+ synchronized (jobTypeMap) {
+ AtomicInteger count = jobTypeMap.get(jobType);
+ if (count == null) {
+ count = new AtomicInteger();
+ }
+
+ count.incrementAndGet();
+ jobTypeMap.put(jobType, count);
+ }
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMXBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMXBean.java
new file mode 100644
index 0000000..9940188
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMXBean.java
@@ -0,0 +1,33 @@
+package azkaban.execapp.jmx;
+
+import java.util.Map;
+
+import azkaban.jmx.DisplayName;
+
+/**
+ * Define all the MBean attributes at the job level
+ *
+ * @author hluu
+ *
+ */
+public interface JmxJobMXBean {
+
+ @DisplayName("OPERATION: getNumRunningJobs")
+ public int getNumRunningJobs();
+
+ @DisplayName("OPERATION: getTotalNumExecutedJobs")
+ public int getTotalNumExecutedJobs();
+
+ @DisplayName("OPERATION: getTotalFailedJobs")
+ public int getTotalFailedJobs();
+
+ @DisplayName("OPERATION: getTotalSucceededJobs")
+ public int getTotalSucceededJobs();
+
+ @DisplayName("OPERATION: getTotalSucceededJobsByJobType")
+ public Map<String, Integer> getTotalSucceededJobsByJobType();
+
+ @DisplayName("OPERATION: getTotalFailedJobsByJobType")
+ public Map<String, Integer> getTotalFailedJobsByJobType();
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index 2243fb8..6db6a47 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -51,7 +51,7 @@ import azkaban.utils.Props;
import azkaban.utils.StringUtils;
public class JobRunner extends EventHandler implements Runnable {
- private static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
+ public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -695,6 +695,10 @@ public class JobRunner extends EventHandler implements Runnable {
return logFile;
}
+ public Logger getLogger() {
+ return logger;
+ }
+
public static String createLogFileName(ExecutableNode node, int attempt) {
int executionId = node.getExecutableFlow().getExecutionId();
String jobId = node.getId();
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 152a3aa..e66a586 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -26,7 +26,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -42,13 +41,11 @@ import azkaban.metric.TimeBasedReportingMetric;
import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.server.HttpRequestUtils;
-import azkaban.server.ServerConstants;
import azkaban.utils.JSONUtils;
-
/**
- * Servlet to communicate with Azkaban exec server
- * This servlet get requests from stats servlet in Azkaban Web server
+ * Servlet to communicate with Azkaban exec server This servlet get requests
+ * from stats servlet in Azkaban Web server
*/
public class StatsServlet extends HttpServlet implements ConnectorParams {
private static final long serialVersionUID = 2L;
@@ -58,24 +55,29 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
return HttpRequestUtils.hasParam(request, param);
}
- public String getParam(HttpServletRequest request, String name) throws ServletException {
+ public String getParam(HttpServletRequest request, String name)
+ throws ServletException {
return HttpRequestUtils.getParam(request, name);
}
- public Boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException {
+ public Boolean getBooleanParam(HttpServletRequest request, String name)
+ throws ServletException {
return HttpRequestUtils.getBooleanParam(request, name);
}
- public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+ public long getLongParam(HttpServletRequest request, String name)
+ throws ServletException {
return HttpRequestUtils.getLongParam(request, name);
}
/**
- * Handle all get request to Stats Servlet
- * {@inheritDoc}
- * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
+ * Handle all get request to Stats Servlet {@inheritDoc}
+ *
+ * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+ * javax.servlet.http.HttpServletResponse)
*/
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
Map<String, Object> ret = new HashMap<String, Object>();
if (hasParam(req, ACTION_PARAM)) {
@@ -103,14 +105,15 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
/**
- * enable or disable metric Manager
- * A disable will also purge all data from all metric emitters
+ * enable or disable metric Manager A disable will also purge all data from
+ * all metric emitters
*/
- private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
- boolean enableMetricManager) {
+ private void handleChangeManagerStatusRequest(HttpServletRequest req,
+ Map<String, Object> ret, boolean enableMetricManager) {
try {
logger.info("Updating metric manager status");
- if ((enableMetricManager && MetricReportManager.isInstantiated()) || MetricReportManager.isAvailable()) {
+ if ((enableMetricManager && MetricReportManager.isInstantiated())
+ || MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
if (enableMetricManager) {
metricManager.enableManager();
@@ -130,12 +133,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Update number of display snapshots for /stats graphs
*/
- private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
+ private void handleChangeEmitterPoints(HttpServletRequest req,
+ Map<String, Object> ret) {
try {
long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+ InMemoryMetricEmitter memoryEmitter =
+ extractInMemoryMetricEmitter(metricManager);
memoryEmitter.setReportingInstances(numInstance);
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} else {
@@ -150,12 +155,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Update InMemoryMetricEmitter interval to maintain metric snapshots
*/
- private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
+ private void handleChangeCleaningInterval(HttpServletRequest req,
+ Map<String, Object> ret) {
try {
long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+ InMemoryMetricEmitter memoryEmitter =
+ extractInMemoryMetricEmitter(metricManager);
memoryEmitter.setReportingInterval(newInterval);
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} else {
@@ -169,19 +176,24 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Get metric snapshots for a metric and date specification
+ *
* @throws ServletException
*/
- private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+ private void handleGetMetricHistory(HttpServletRequest req,
+ Map<String, Object> ret) throws ServletException {
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+ InMemoryMetricEmitter memoryEmitter =
+ extractInMemoryMetricEmitter(metricManager);
// if we have a memory emitter
if (memoryEmitter != null) {
try {
List<InMemoryHistoryNode> result =
- memoryEmitter.getMetrics(getParam(req, STATS_MAP_METRICNAMEPARAM),
- parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)),
+ memoryEmitter.getMetrics(
+ getParam(req, STATS_MAP_METRICNAMEPARAM),
+ parseDate(getParam(req, STATS_MAP_STARTDATE)),
+ parseDate(getParam(req, STATS_MAP_ENDDATE)),
getBooleanParam(req, STATS_MAP_METRICRETRIEVALMODE));
if (result != null && result.size() > 0) {
@@ -204,7 +216,8 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Get InMemoryMetricEmitter, if available else null
*/
- private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
+ private InMemoryMetricEmitter extractInMemoryMetricEmitter(
+ MetricReportManager metricManager) {
InMemoryMetricEmitter memoryEmitter = null;
for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
if (emitter instanceof InMemoryMetricEmitter) {
@@ -218,7 +231,8 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Get all the metrics tracked by metric manager
*/
- private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
+ private void handleGetAllMMetricsName(HttpServletRequest req,
+ Map<String, Object> ret) {
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
List<IMetric<?>> result = metricManager.getAllMetrics();
@@ -226,7 +240,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
ret.put(RESPONSE_ERROR, "No Metric being tracked");
} else {
List<String> metricNames = new LinkedList<String>();
- for(IMetric<?> metric: result) {
+ for (IMetric<?> metric : result) {
metricNames.add(metric.getName());
}
ret.put("data", metricNames);
@@ -238,15 +252,19 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Update tracking interval for a given metrics
+ *
* @throws ServletException
*/
- private void handleChangeMetricInterval(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+ private void handleChangeMetricInterval(HttpServletRequest req,
+ Map<String, Object> ret) throws ServletException {
try {
String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
long newInterval = getLongParam(req, STATS_MAP_REPORTINGINTERVAL);
if (MetricReportManager.isAvailable()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
+ TimeBasedReportingMetric<?> metric =
+ (TimeBasedReportingMetric<?>) metricManager
+ .getMetricFromName(metricName);
metric.updateInterval(newInterval);
ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
} else {
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
new file mode 100644
index 0000000..bd14da4
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
@@ -0,0 +1,240 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_EXECUTION_ID_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_FLOW_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_STATUS_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_PROJECT_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_SERVER_TOKEN;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+import azkaban.jobcallback.JobCallbackConstants;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+
+public class JobCallbackRequestMakerTest {
+
+ private static final Logger logger = Logger
+ .getLogger(JobCallbackRequestMakerTest.class);
+
+ private static final String SLEEP_DURATION_PARAM = "sleepDuration";
+ private static final String STATUS_CODE_PARAM = "returnedStatusCode";
+
+ private static final String SERVER_NAME = "localhost:9999";
+ private static final String PROJECT_NANE = "PROJECTX";
+ private static final String FLOW_NANE = "FLOWX";
+ private static final String JOB_NANE = "JOBX";
+ private static final String EXECUTION_ID = "1234";
+
+ private static final int PORT_NUMBER = 8989;
+
+ private static JobCallbackRequestMaker jobCBMaker;
+
+ private static Map<String, String> contextInfo;
+
+ private static Server embeddedJettyServer;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ try {
+ JobCallbackRequestMaker.initialize(new Props());
+ jobCBMaker = JobCallbackRequestMaker.getInstance();
+
+ contextInfo = new HashMap<String, String>();
+ contextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
+ contextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NANE);
+ contextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NANE);
+ contextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
+ contextInfo.put(CONTEXT_JOB_TOKEN, JOB_NANE);
+ contextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
+
+ embeddedJettyServer = new Server(PORT_NUMBER);
+
+ Context context = new Context(embeddedJettyServer, "/", Context.SESSIONS);
+ context.addServlet(new ServletHolder(new DelayServlet()), "/delay");
+
+ System.out.println("Start server");
+ embeddedJettyServer.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ System.out.println("Shutting down server");
+ if (embeddedJettyServer != null) {
+ embeddedJettyServer.stop();
+ embeddedJettyServer.destroy();
+ }
+ }
+
+ private static class DelayServlet extends HttpServlet {
+
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+
+ logger.info("Get get request: " + req.getRequestURI());
+ logger.info("Get get request params: " + req.getParameterMap());
+
+ long start = System.currentTimeMillis();
+ String responseMessage = handleDelay(req);
+ logger
+ .info("handleDelay elapse: " + (System.currentTimeMillis() - start));
+
+ responseMessage = handleSimulatedStatusCode(req, resp, responseMessage);
+
+ Writer writer = resp.getWriter();
+ writer.write(responseMessage);
+ writer.close();
+ }
+
+ private String handleSimulatedStatusCode(HttpServletRequest req,
+ HttpServletResponse resp, String responseMessge) {
+ String returnedStatusCodeStr = req.getParameter(STATUS_CODE_PARAM);
+ if (returnedStatusCodeStr != null) {
+ int statusCode = Integer.parseInt(returnedStatusCodeStr);
+ responseMessge = "Not good";
+ resp.setStatus(statusCode);
+ }
+ return responseMessge;
+ }
+
+ private String handleDelay(HttpServletRequest req) {
+ String sleepParamValue = req.getParameter(SLEEP_DURATION_PARAM);
+ if (sleepParamValue != null) {
+ long howLongMS =
+ TimeUnit.MILLISECONDS.convert(Integer.parseInt(sleepParamValue),
+ TimeUnit.SECONDS);
+
+ logger.info("Delay for: " + howLongMS);
+
+ try {
+ Thread.sleep(howLongMS);
+ return "Voila!!";
+ } catch (InterruptedException e) {
+ // don't care
+ return e.getMessage();
+ }
+ }
+ return "";
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ logger.info("Get post request: " + req.getRequestURI());
+ logger.info("Get post request params: " + req.getParameterMap());
+
+ BufferedReader reader = req.getReader();
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ logger.info("post body: " + line);
+ }
+ reader.close();
+
+ String responseMessage = handleDelay(req);
+ responseMessage = handleSimulatedStatusCode(req, resp, responseMessage);
+
+ Writer writer = resp.getWriter();
+ writer.write(responseMessage);
+ writer.close();
+ }
+ }
+
+ private String buildUrlForDelay(int delay) {
+ return "http://localhost:" + PORT_NUMBER + "/delay?" + SLEEP_DURATION_PARAM
+ + "=" + delay;
+ }
+
+ private String buildUrlForStatusCode(int sc) {
+ return "http://localhost:" + PORT_NUMBER + "/delay?" + STATUS_CODE_PARAM
+ + "=" + sc;
+ }
+
+ @Test(timeout = 4000)
+ public void basicGetTest() {
+ Props props = new Props();
+ String url = buildUrlForDelay(1);
+
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+
+ List<HttpRequestBase> httpRequestList =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+ }
+
+ @Test(timeout = 4000)
+ public void simulateNotOKStatusCodeTest() {
+ Props props = new Props();
+ String url = buildUrlForStatusCode(404);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+
+ List<HttpRequestBase> httpRequestList =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+ }
+
+ @Test(timeout = 4000)
+ public void unResponsiveGetTest() {
+ Props props = new Props();
+ String url = buildUrlForDelay(10);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+
+ List<HttpRequestBase> httpRequestList =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+ }
+
+ @Test(timeout = 4000)
+ public void basicPostTest() {
+ Props props = new Props();
+ String url = buildUrlForDelay(1);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.method",
+ JobCallbackConstants.HTTP_POST);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.body",
+ "This is it");
+
+ List<HttpRequestBase> httpRequestList =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+ }
+}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackUtilTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackUtilTest.java
new file mode 100644
index 0000000..646f6e1
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackUtilTest.java
@@ -0,0 +1,441 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_EXECUTION_ID_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_FLOW_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_GET;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_POST;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_STATUS_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_PROJECT_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.CONTEXT_SERVER_TOKEN;
+
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.jobcallback.JobCallbackConstants;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+
+public class JobCallbackUtilTest {
+ private static Map<String, String> contextInfo;
+
+ private static final String SERVER_NAME = "localhost:9999";
+ private static final String PROJECT_NAME = "PROJECTX";
+ private static final String FLOW_NAME = "FLOWX";
+ private static final String JOB_NAME = "JOBX";
+ private static final String EXECUTION_ID = "1234";
+ private static final String JOB_STATUS_NAME = JobCallbackStatusEnum.STARTED
+ .name();
+
+ @BeforeClass
+ public static void setup() {
+ contextInfo = new HashMap<String, String>();
+ contextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
+ contextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NAME);
+ contextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NAME);
+ contextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
+ contextInfo.put(CONTEXT_JOB_TOKEN, JOB_NAME);
+ contextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JOB_STATUS_NAME);
+ }
+
+ @Test
+ public void noCallbackPropertiesTest() {
+ Props props = new Props();
+ props.put("abc", "def");
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED));
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.COMPLETED));
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.FAILURE));
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.SUCCESS));
+ }
+
+ @Test
+ public void hasCallbackPropertiesTest() {
+ Props props = new Props();
+ for (JobCallbackStatusEnum jobStatus : JobCallbackStatusEnum.values()) {
+ props.put(
+ "job.notification." + jobStatus.name().toLowerCase() + ".1.url",
+ "def");
+ }
+
+ System.out.println(props);
+
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED));
+
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.COMPLETED));
+
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.FAILURE));
+
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.SUCCESS));
+ }
+
+ @Test
+ public void multipleStatusWithNoJobCallbackTest() {
+ Props props = new Props();
+ props.put("abc", "def");
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+ JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+ }
+
+ @Test
+ public void multipleStatusesWithJobCallbackTest() {
+ Props props = new Props();
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", "def");
+
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+ JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+ props = new Props();
+ props.put("job.notification."
+ + JobCallbackStatusEnum.COMPLETED.name().toLowerCase() + ".1.url",
+ "def");
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+ JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+ props = new Props();
+ props.put("job.notification."
+ + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url", "def");
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+ JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+ props = new Props();
+ props.put("job.notification."
+ + JobCallbackStatusEnum.SUCCESS.name().toLowerCase() + ".1.url", "def");
+ Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+ JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+ }
+
+ @Test
+ public void hasCallbackPropertiesWithGapTest() {
+ Props props = new Props();
+ for (JobCallbackStatusEnum jobStatus : JobCallbackStatusEnum.values()) {
+ props.put(
+ "job.notification." + jobStatus.name().toLowerCase() + ".2.url",
+ "def");
+ }
+
+ System.out.println(props);
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.STARTED));
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.COMPLETED));
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.FAILURE));
+
+ Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+ JobCallbackStatusEnum.SUCCESS));
+ }
+
+ @Test
+ public void noTokenTest() {
+ String urlWithNoToken = "http://www.linkedin.com";
+ String result =
+ JobCallbackUtil.replaceTokens(urlWithNoToken, contextInfo, true);
+ Assert.assertEquals(urlWithNoToken, result);
+ }
+
+ @Test
+ public void oneTokenTest() {
+
+ String urlWithOneToken =
+ "http://www.linkedin.com?project=" + CONTEXT_PROJECT_TOKEN + "&another=yes";
+
+ String result =
+ JobCallbackUtil.replaceTokens(urlWithOneToken, contextInfo, true);
+ Assert.assertEquals("http://www.linkedin.com?project=" + PROJECT_NAME
+ + "&another=yes", result);
+ }
+
+ @Test
+ public void twoTokensTest() {
+
+ String urlWithOneToken =
+ "http://www.linkedin.com?project=" + CONTEXT_PROJECT_TOKEN + "&flow="
+ + CONTEXT_FLOW_TOKEN;
+
+ String result =
+ JobCallbackUtil.replaceTokens(urlWithOneToken, contextInfo, true);
+ Assert.assertEquals("http://www.linkedin.com?project=" + PROJECT_NAME
+ + "&flow=" + FLOW_NAME, result);
+ }
+
+ @Test
+ public void allTokensTest() {
+
+ String urlWithOneToken =
+ "http://www.linkedin.com?server=" + SERVER_NAME + "&project="
+ + CONTEXT_PROJECT_TOKEN + "&flow=" + CONTEXT_FLOW_TOKEN + "&executionId="
+ + CONTEXT_EXECUTION_ID_TOKEN + "&job=" + CONTEXT_JOB_TOKEN + "&status="
+ + CONTEXT_JOB_STATUS_TOKEN;
+
+ String result =
+ JobCallbackUtil.replaceTokens(urlWithOneToken, contextInfo, true);
+
+ String expectedResult =
+ "http://www.linkedin.com?server=" + SERVER_NAME + "&project="
+ + PROJECT_NAME + "&flow=" + FLOW_NAME + "&executionId="
+ + EXECUTION_ID + "&job=" + JOB_NAME + "&status=" + JOB_STATUS_NAME;
+
+ Assert.assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void tokenWithEncoding() throws Exception {
+ String jobNameWithSpaces = "my job";
+ String encodedJobName = URLEncoder.encode(jobNameWithSpaces, "UTF-8");
+
+ Map<String, String> customContextInfo = new HashMap<String, String>();
+ customContextInfo = new HashMap<String, String>();
+ customContextInfo.put(CONTEXT_SERVER_TOKEN, SERVER_NAME);
+ customContextInfo.put(CONTEXT_PROJECT_TOKEN, PROJECT_NAME);
+ customContextInfo.put(CONTEXT_FLOW_TOKEN, FLOW_NAME);
+ customContextInfo.put(CONTEXT_EXECUTION_ID_TOKEN, EXECUTION_ID);
+ customContextInfo.put(CONTEXT_JOB_TOKEN, jobNameWithSpaces);
+ customContextInfo.put(CONTEXT_JOB_STATUS_TOKEN, JOB_STATUS_NAME);
+
+ String urlWithOneToken =
+ "http://www.linkedin.com?job=" + CONTEXT_JOB_TOKEN + "&flow=" + CONTEXT_FLOW_TOKEN;
+
+ String result =
+ JobCallbackUtil.replaceTokens(urlWithOneToken, customContextInfo, true);
+ Assert.assertEquals("http://www.linkedin.com?job=" + encodedJobName
+ + "&flow=" + FLOW_NAME, result);
+ }
+
+ @Test
+ public void parseJobCallbackOneGetTest() {
+ Props props = new Props();
+ String url = "http://lva1-rpt07.corp.linkedin.com";
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+ List<HttpRequestBase> result =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals(HTTP_GET, result.get(0).getMethod());
+ Assert.assertEquals(url, result.get(0).getURI().toString());
+ }
+
+ @Test
+ public void parseJobCallbackWithInvalidURLTest() {
+ Props props = new Props();
+ String url = "linkedin.com";
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+ List<HttpRequestBase> result =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals(HTTP_GET, result.get(0).getMethod());
+ Assert.assertEquals(url, result.get(0).getURI().toString());
+ }
+
+ @Test
+ public void parseJobCallbackTwoGetsTest() {
+ Props props = new Props();
+ String[] urls =
+ { "http://lva1-rpt07.corp.linkedin.com",
+ "http://lva1-rpt06.corp.linkedin.com" };
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url",
+ urls[0]);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".2.url",
+ urls[1]);
+ List<HttpRequestBase> result =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ Assert.assertEquals(2, result.size());
+ for (int i = 0; i < urls.length; i++) {
+ Assert.assertEquals(HTTP_GET, result.get(i).getMethod());
+ Assert.assertEquals(urls[i], result.get(i).getURI().toString());
+ }
+ }
+
+ @Test
+ public void parseJobCallbackWithGapTest() {
+ Props props = new Props();
+ String[] urls =
+ { "http://lva1-rpt07.corp.linkedin.com",
+ "http://lva1-rpt06.corp.linkedin.com" };
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url",
+ urls[0]);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".3.url",
+ urls[1]);
+ List<HttpRequestBase> result =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals(HTTP_GET, result.get(0).getMethod());
+ Assert.assertEquals(urls[0], result.get(0).getURI().toString());
+ }
+
+ @Test
+ public void parseJobCallbackWithPostTest() {
+ Props props = new Props();
+ String url = "http://lva1-rpt07.corp.linkedin.com";
+ String bodyText = "{name:\"you\"}";
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.method",
+ HTTP_POST);
+
+ props.put("job.notification."
+ + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.body",
+ bodyText);
+
+ List<HttpRequestBase> result =
+ JobCallbackUtil.parseJobCallbackProperties(props,
+ JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+ Assert.assertEquals(1, result.size());
+
+ HttpPost httpPost = (HttpPost) result.get(0);
+
+ Assert.assertEquals(url, httpPost.getURI().toString());
+ Assert.assertEquals(HTTP_POST, httpPost.getMethod());
+
+ Assert.assertEquals(bodyText.length(), httpPost.getEntity()
+ .getContentLength());
+
+ }
+
+ @Test
+ public void noHeaderElementTest() {
+ Header[] headerArr =
+ JobCallbackUtil.parseHttpHeaders("this is an amazing day");
+
+ Assert.assertNotNull(headerArr);
+ Assert.assertEquals(0, headerArr.length);
+ }
+
+ @Test
+ public void oneHeaderElementTest() {
+ String name = "Content-type";
+ String value = "application/json";
+ String headers =
+ name + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + value;
+ Header[] headerArr = JobCallbackUtil.parseHttpHeaders(headers);
+
+ Assert.assertNotNull(headerArr);
+ Assert.assertEquals(1, headerArr.length);
+ Assert.assertEquals(name, headerArr[0].getName());
+ Assert.assertEquals(value, headerArr[0].getValue());
+
+ String headersWithExtraDelimiter =
+ name + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + value
+ + JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+
+ headerArr = JobCallbackUtil.parseHttpHeaders(headersWithExtraDelimiter);
+ Assert.assertNotNull(headerArr);
+ Assert.assertEquals(1, headerArr.length);
+ Assert.assertEquals(name, headerArr[0].getName());
+ Assert.assertEquals(value, headerArr[0].getValue());
+
+ }
+
+ @Test
+ public void multipleHeaderElementTest() {
+ String name1 = "Content-type";
+ String value1 = "application/json";
+
+ String name2 = "Accept";
+ String value2 = "application/xml";
+
+ String name3 = "User-Agent";
+ String value3 =
+ "Mozilla/5.0 (X11; Linux x86_64; rv:12.0) Gecko/20100101 Firefox/21.0";
+
+ String headers = makeHeaderElement(name1, value1);
+ headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+ headers += makeHeaderElement(name2, value2);
+ headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+ headers += makeHeaderElement(name3, value3);
+
+ System.out.println("headers: " + headers);
+ Header[] headerArr = JobCallbackUtil.parseHttpHeaders(headers);
+
+ Assert.assertNotNull(headerArr);
+ Assert.assertEquals(3, headerArr.length);
+ Assert.assertEquals(name1, headerArr[0].getName());
+ Assert.assertEquals(value1, headerArr[0].getValue());
+ Assert.assertEquals(name2, headerArr[1].getName());
+ Assert.assertEquals(value2, headerArr[1].getValue());
+ Assert.assertEquals(name3, headerArr[2].getName());
+ Assert.assertEquals(value3, headerArr[2].getValue());
+ }
+
+ @Test
+ public void partialHeaderElementTest() {
+ String name1 = "Content-type";
+ String value1 = "application/json";
+
+ String name2 = "Accept";
+ String value2 = "";
+
+ String name3 = "User-Agent";
+ String value3 =
+ "Mozilla/5.0 (X11; Linux x86_64; rv:12.0) Gecko/20100101 Firefox/21.0";
+
+ String headers = makeHeaderElement(name1, value1);
+ headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+ headers += makeHeaderElement(name2, value2);
+ headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+ headers += makeHeaderElement(name3, value3);
+
+ System.out.println("headers: " + headers);
+ Header[] headerArr = JobCallbackUtil.parseHttpHeaders(headers);
+
+ Assert.assertNotNull(headerArr);
+ Assert.assertEquals(3, headerArr.length);
+ Assert.assertEquals(name1, headerArr[0].getName());
+ Assert.assertEquals(value1, headerArr[0].getValue());
+ Assert.assertEquals(name2, headerArr[1].getName());
+ Assert.assertEquals(value2, headerArr[1].getValue());
+ Assert.assertEquals(name3, headerArr[2].getName());
+ Assert.assertEquals(value3, headerArr[2].getValue());
+ }
+
+ private String makeHeaderElement(String name, String value) {
+ return name + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + value;
+ }
+
+}
build.gradle 8(+6 -2)
diff --git a/build.gradle b/build.gradle
index 2ad1434..d8afcca 100644
--- a/build.gradle
+++ b/build.gradle
@@ -102,8 +102,8 @@ project(':azkaban-common') {
compile('org.apache.commons:commons-email:1.2')
compile('org.apache.commons:commons-jexl:2.1.1')
compile('org.apache.commons:commons-math3:3.0')
- compile('org.apache.httpcomponents:httpclient:4.2.1')
- compile('org.apache.httpcomponents:httpcore:4.2.1')
+ compile('org.apache.httpcomponents:httpclient:4.3.1')
+ compile('org.apache.httpcomponents:httpcore:4.3')
compile('org.apache.velocity:velocity:1.7')
compile('org.codehaus.jackson:jackson-core-asl:1.9.5')
compile('org.codehaus.jackson:jackson-mapper-asl:1.9.5')
@@ -305,10 +305,14 @@ project(':azkaban-execserver') {
compile('javax.servlet:servlet-api:2.5')
compile('joda-time:joda-time:2.0')
compile('log4j:log4j:1.2.16')
+ compile('commons-logging:commons-logging:1.1.1')
+ compile('org.apache.httpcomponents:httpclient:4.3.1')
+ compile('org.apache.httpcomponents:httpcore:4.3')
compile('org.mortbay.jetty:jetty:6.1.26')
compile('org.mortbay.jetty:jetty-util:6.1.26')
compile('org.codehaus.jackson:jackson-core-asl:1.9.5')
compile('org.codehaus.jackson:jackson-mapper-asl:1.9.5')
+
testCompile('junit:junit:4.11')
testCompile('org.hamcrest:hamcrest-all:1.3')