azkaban-developers

Changes

build.gradle 7(+5 -2)

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackConstants.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackConstants.java
new file mode 100644
index 0000000..b66eb58
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackConstants.java
@@ -0,0 +1,36 @@
+package azkaban.jobcallback;
+
+public interface JobCallbackConstants {
+  public static final String STATUS_TOKEN = "status";
+  public static final String SEQUENCE_TOKEN = "sequence";
+  public static final String HTTP_GET = "GET";
+  public static final String HTTP_POST = "POST";
+
+  public static final String MAX_CALLBACK_COUNT_PROPERTY_KEY =
+      "jobcallback.max_count";
+  public static final int DEFAULT_MAX_CALLBACK_COUNT = 3;
+
+  public static final String FIRST_JOB_CALLBACK_URL_TEMPLATE =
+      "job.notification." + STATUS_TOKEN + ".1.url";
+
+  public static final String JOB_CALLBACK_URL_TEMPLATE = "job.notification."
+      + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".url";
+  public static final String JOB_CALLBACK_REQUEST_METHOD_TEMPLATE =
+      "job.notification." + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".method";
+
+  public static final String JOB_CALLBACK_REQUEST_HEADERS_TEMPLATE =
+      "job.notification." + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".headers";
+
+  public static final String JOB_CALLBACK_BODY_TEMPLATE = "job.notification."
+      + STATUS_TOKEN + "." + SEQUENCE_TOKEN + ".body";
+
+  public static final String SERVER_TOKEN = "?{server}";
+  public static final String PROJECT_TOKEN = "?{project}";
+  public static final String FLOW_TOKEN = "?{flow}";
+  public static final String EXECUTION_ID_TOKEN = "?{executionId}";
+  public static final String JOB_TOKEN = "?{job}";
+  public static final String JOB_STATUS_TOKEN = "?{status}";
+
+  public static final String HEADER_ELEMENT_DELIMITER = "\r\n";
+  public static final String HEADER_NAME_VALUE_DELIMITER = ":";
+}
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackStatusEnum.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackStatusEnum.java
new file mode 100644
index 0000000..b10679b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackStatusEnum.java
@@ -0,0 +1,5 @@
+package azkaban.jobcallback;
+
+public enum JobCallbackStatusEnum {
+  STARTED, SUCCESS, FAILURE, COMPLETED
+}
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
new file mode 100644
index 0000000..322d80f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -0,0 +1,111 @@
+package azkaban.jobcallback;
+
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_GET;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_POST;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_BODY_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_REQUEST_METHOD_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_URL_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.SEQUENCE_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.STATUS_TOKEN;
+
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/**
+ * Responsible for validating the job callback related properties at project
+ * upload time
+ * 
+ * @author hluu
+ *
+ */
+public class JobCallbackValidator {
+
+  private static final Logger logger = Logger
+      .getLogger(JobCallbackValidator.class);
+
+  /**
+   * Make sure all the job callback related properties are valid
+   * 
+   * @param jobProps
+   * @param error
+   * @return number of valid job callback properties. Mainly for testing
+   *         purpose.
+   */
+  public static int validate(String jobName, Props serverProps, Props jobProps,
+      Collection<String> errors) {
+    int maxNumCallback =
+        serverProps.getInt(
+            JobCallbackConstants.MAX_CALLBACK_COUNT_PROPERTY_KEY,
+            JobCallbackConstants.DEFAULT_MAX_CALLBACK_COUNT);
+
+    int totalCallbackCount = 0;
+    for (JobCallbackStatusEnum jobStatus : JobCallbackStatusEnum.values()) {
+      totalCallbackCount +=
+          validateBasedOnStatus(jobProps, errors, jobStatus, maxNumCallback);
+    }
+
+    logger.info("Found " + totalCallbackCount + " job callbacks for job "
+        + jobName);
+    return totalCallbackCount;
+  }
+
+  private static int validateBasedOnStatus(Props jobProps,
+      Collection<String> errors, JobCallbackStatusEnum jobStatus,
+      int maxNumCallback) {
+
+    int callbackCount = 0;
+    // replace property templates with status
+    String jobCallBackUrl =
+        JOB_CALLBACK_URL_TEMPLATE.replace(STATUS_TOKEN, jobStatus.name()
+            .toLowerCase());
+
+    String requestMethod =
+        JOB_CALLBACK_REQUEST_METHOD_TEMPLATE.replace(STATUS_TOKEN, jobStatus
+            .name().toLowerCase());
+
+    String httpBody =
+        JOB_CALLBACK_BODY_TEMPLATE.replace(STATUS_TOKEN, jobStatus.name()
+            .toLowerCase());
+
+    for (int i = 1; i <= maxNumCallback; i++) {
+      // callback url
+      String callbackUrlKey =
+          jobCallBackUrl.replace(SEQUENCE_TOKEN, Integer.toString(i));
+      String callbackUrlValue = jobProps.get(callbackUrlKey);
+
+      if (callbackUrlValue == null || callbackUrlValue.length() == 0) {
+        // no more needs to done
+        break;
+      } else {
+        String requestMethodKey =
+            requestMethod.replace(SEQUENCE_TOKEN, Integer.toString(i));
+
+        String methodValue = jobProps.getString(requestMethodKey, HTTP_GET);
+
+        if (HTTP_POST.equals(methodValue)) {
+          // now try to get the post body
+          String postBodyKey =
+              httpBody.replace(SEQUENCE_TOKEN, Integer.toString(i));
+          String postBodyValue = jobProps.get(postBodyKey);
+          if (postBodyValue == null || postBodyValue.length() == 0) {
+            errors.add("No POST body was specified for job callback '"
+                + callbackUrlValue + "'");
+          } else {
+            callbackCount++;
+          }
+        } else if (HTTP_GET.equals(methodValue)) {
+          // that's cool
+          callbackCount++;
+        } else {
+          errors.add("Unsupported request method: " + methodValue
+              + " Only POST and GET are supported");
+        }
+      }
+    }
+
+    return callbackCount;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 1a6610e..37df66e 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -36,6 +36,7 @@ import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
+import azkaban.jobcallback.JobCallbackValidator;
 import azkaban.project.validator.ProjectValidator;
 import azkaban.project.validator.ValidationReport;
 import azkaban.project.validator.XmlValidatorManager;
@@ -113,6 +114,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
 
     // Resolve embedded flows
     resolveEmbeddedFlows();
+
   }
 
   private void loadProjectFromDir(String base, File dir, Props parent) {
@@ -383,17 +385,23 @@ public class DirectoryFlowLoader implements ProjectValidator {
     long sizeMaxXmx = azkaban.utils.Utils.parseMemString(maxXmx);
 
     for (String jobName : jobPropsMap.keySet()) {
-      Props resolvedJobProps = PropsUtils.resolveProps(jobPropsMap.get(jobName));
+      Props resolvedJobProps =
+          PropsUtils.resolveProps(jobPropsMap.get(jobName));
       String xms = resolvedJobProps.getString(XMS, null);
       if (xms != null && azkaban.utils.Utils.parseMemString(xms) > sizeMaxXms) {
-        errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
-                jobName, maxXms));
+        errors.add(String.format(
+            "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+            jobName, maxXms));
       }
       String xmx = resolvedJobProps.getString(XMX, null);
       if (xmx != null && azkaban.utils.Utils.parseMemString(xmx) > sizeMaxXmx) {
-        errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
-                jobName, maxXmx));
+        errors.add(String.format(
+            "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+            jobName, maxXmx));
       }
+
+      // job callback properties check
+      JobCallbackValidator.validate(jobName, props, resolvedJobProps, errors);
     }
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
new file mode 100644
index 0000000..ec067a0
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
@@ -0,0 +1,170 @@
+package azkaban.jobcallback;
+
+import static azkaban.jobcallback.JobCallbackConstants.DEFAULT_MAX_CALLBACK_COUNT;
+import static azkaban.jobcallback.JobCallbackConstants.MAX_CALLBACK_COUNT_PROPERTY_KEY;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+
+public class JobCallbackValidatorTest {
+  private Props serverProps;
+
+  @Before
+  public void setup() {
+    serverProps = new Props();
+    serverProps
+        .put(MAX_CALLBACK_COUNT_PROPERTY_KEY, DEFAULT_MAX_CALLBACK_COUNT);
+  }
+
+  @Test
+  public void noJobCallbackProps() {
+    Props jobProps = new Props();
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(0, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(0, errors.size());
+  }
+
+  @Test
+  public void oneGetJobCallback() {
+    Props jobProps = new Props();
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(0, errors.size());
+  }
+
+  @Test
+  public void onePostJobCallback() {
+    Props jobProps = new Props();
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+        JobCallbackConstants.HTTP_POST);
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.body",
+        "doc:id");
+
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(0, errors.size());
+  }
+
+  @Test
+  public void multiplePostJobCallbacks() {
+    Props jobProps = new Props();
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+        JobCallbackConstants.HTTP_POST);
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.body",
+        "doc:id");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.url",
+        "http://www.linkedin2.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.method",
+        JobCallbackConstants.HTTP_POST);
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.body",
+        "doc2:id");
+
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(2, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(0, errors.size());
+  }
+
+  @Test
+  public void oneBadPostJobCallback() {
+    Props jobProps = new Props();
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.method",
+        JobCallbackConstants.HTTP_POST);
+
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(0, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(1, errors.size());
+    System.out.println(errors);
+  }
+
+  @Test
+  public void multipleGetJobCallbacks() {
+    Props jobProps = new Props();
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(2, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(0, errors.size());
+  }
+
+  @Test
+  public void multipleGetJobCallbackWithGap() {
+    Props jobProps = new Props();
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".2.url",
+        "http://www.linkedin.com");
+
+    jobProps.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".2.url",
+        "http://www.linkedin.com");
+
+    Set<String> errors = new HashSet<String>();
+
+    Assert.assertEquals(2, JobCallbackValidator.validate("bogusJob",
+        serverProps, jobProps, errors));
+
+    Assert.assertEquals(0, errors.size());
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index d9a4bc0..a241c3c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimeZone;
@@ -38,14 +39,16 @@ import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
-import azkaban.executor.ExecutorLoader;
-import azkaban.executor.JdbcExecutorLoader;
+import azkaban.execapp.event.JobCallbackManager;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.execapp.metric.NumFailedFlowMetric;
 import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.execapp.metric.NumQueuedFlowMetric;
 import azkaban.execapp.metric.NumRunningFlowMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.metric.IMetricEmitter;
 import azkaban.metric.MetricException;
@@ -59,17 +62,21 @@ import azkaban.utils.Props;
 import azkaban.utils.SystemMemoryInfo;
 import azkaban.utils.Utils;
 
-
 public class AzkabanExecutorServer {
-  private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
+  private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY =
+      "jmx.attribute.processor.class";
+  private static final Logger logger = Logger
+      .getLogger(AzkabanExecutorServer.class);
   private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
 
   public static final String AZKABAN_HOME = "AZKABAN_HOME";
   public static final String DEFAULT_CONF_PATH = "conf";
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
-  public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
+  public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
+      "azkaban.private.properties";
   public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
-  public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
+  public static final String METRIC_INTERVAL =
+      "executor.metric.milisecinterval.";
   public static final int DEFAULT_PORT_NUMBER = 12321;
 
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -120,13 +127,22 @@ public class AzkabanExecutorServer {
 
     executionLoader = createExecLoader(props);
     projectLoader = createProjectLoader(props);
-    runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
+    runnerManager =
+        new FlowRunnerManager(props, executionLoader, projectLoader, this
+            .getClass().getClassLoader());
+
+    JmxJobMBeanManager.getInstance().initialize(props);
+
+    // make sure this happens before
+    configureJobCallback(props);
 
     configureMBeanServer();
     configureMetricReports();
 
     SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));
 
+    loadCustomJMXAttributeProcessor(props);
+
     try {
       server.start();
     } catch (Exception e) {
@@ -137,8 +153,20 @@ public class AzkabanExecutorServer {
     logger.info("Azkaban Executor Server started on port " + portNumber);
   }
 
+  private void configureJobCallback(Props props) {
+    boolean jobCallbackEnabled =
+        props.getBoolean("azkaban.executor.jobcallback.enabled", true);
+
+    logger.info("Job callback enabled? " + jobCallbackEnabled);
+
+    if (jobCallbackEnabled) {
+      JobCallbackManager.initialize(props);
+    }
+  }
+
   /**
    * Configure Metric Reporting as per azkaban.properties settings
+   * 
    * @throws MetricException
    */
   private void configureMetricReports() throws MetricException {
@@ -150,29 +178,65 @@ public class AzkabanExecutorServer {
       metricManager.addMetricEmitter(metricEmitter);
 
       logger.info("Adding number of failed flow metric");
-      metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt(METRIC_INTERVAL
-          + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+      metricManager.addMetric(new NumFailedFlowMetric(metricManager, props
+          .getInt(METRIC_INTERVAL
+              + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME,
+              props.getInt(METRIC_INTERVAL + "default"))));
 
       logger.info("Adding number of failed jobs metric");
-      metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt(METRIC_INTERVAL
-          + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+      metricManager.addMetric(new NumFailedJobMetric(metricManager, props
+          .getInt(METRIC_INTERVAL
+              + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME,
+              props.getInt(METRIC_INTERVAL + "default"))));
 
       logger.info("Adding number of running Jobs metric");
-      metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt(METRIC_INTERVAL
-          + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+      metricManager.addMetric(new NumRunningJobMetric(metricManager, props
+          .getInt(METRIC_INTERVAL
+              + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME,
+              props.getInt(METRIC_INTERVAL + "default"))));
 
       logger.info("Adding number of running flows metric");
-      metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
-          METRIC_INTERVAL  + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
-          props.getInt(METRIC_INTERVAL + "default"))));
+      metricManager.addMetric(new NumRunningFlowMetric(runnerManager,
+          metricManager, props.getInt(METRIC_INTERVAL
+              + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+              props.getInt(METRIC_INTERVAL + "default"))));
 
       logger.info("Adding number of queued flows metric");
-      metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
-          METRIC_INTERVAL + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
-          props.getInt(METRIC_INTERVAL + "default"))));
+      metricManager.addMetric(new NumQueuedFlowMetric(runnerManager,
+          metricManager, props.getInt(METRIC_INTERVAL
+              + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+              props.getInt(METRIC_INTERVAL + "default"))));
 
       logger.info("Completed configuring Metric Reports");
     }
+
+    // initialize event emitter
+    // AsyncEventEmitterFactory.getInstance().initialize(props);
+
+  }
+
+  private void loadCustomJMXAttributeProcessor(Props props) {
+    String jmxAttributeEmitter =
+        props.get(CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY);
+    if (jmxAttributeEmitter != null) {
+      try {
+        logger.info("jmxAttributeEmitter: " + jmxAttributeEmitter);
+        Constructor<Props>[] constructors =
+            (Constructor<Props>[]) Class.forName(jmxAttributeEmitter)
+                .getConstructors();
+
+        constructors[0].newInstance(props.toProperties());
+      } catch (Exception e) {
+        logger.error("Encountered error while loading and instantiating "
+            + jmxAttributeEmitter, e);
+        throw new IllegalStateException(
+            "Encountered error while loading and instantiating "
+                + jmxAttributeEmitter, e);
+      }
+    } else {
+      logger.info("No value for property: "
+          + CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY + " was found");
+    }
   }
 
   private ExecutorLoader createExecLoader(Props props) {
@@ -267,23 +331,25 @@ public class AzkabanExecutorServer {
 
       public void logTopMemoryConsumers() throws Exception, IOException {
         if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
-                && new File("/usr/bin/head").exists()) {
+            && new File("/usr/bin/head").exists()) {
           logger.info("logging top memeory consumer");
 
           java.lang.ProcessBuilder processBuilder =
-                  new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+              new java.lang.ProcessBuilder("/bin/bash", "-c",
+                  "/bin/ps aux --sort -rss | /usr/bin/head");
           Process p = processBuilder.start();
           p.waitFor();
-  
+
           InputStream is = p.getInputStream();
-          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+          java.io.BufferedReader reader =
+              new java.io.BufferedReader(new InputStreamReader(is));
           String line = null;
           while ((line = reader.readLine()) != null) {
             logger.info(line);
           }
           is.close();
         }
-      }      
+      }
     });
   }
 
@@ -300,14 +366,16 @@ public class AzkabanExecutorServer {
       return null;
     }
 
-    if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
+    if (!new File(azkabanHome).isDirectory()
+        || !new File(azkabanHome).canRead()) {
       logger.error(azkabanHome + " is not a readable directory.");
       return null;
     }
 
     File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
     if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
-      logger.error(azkabanHome + " does not contain a readable conf directory.");
+      logger
+          .error(azkabanHome + " does not contain a readable conf directory.");
       return null;
     }
 
@@ -325,7 +393,8 @@ public class AzkabanExecutorServer {
    * @return
    */
   private static Props loadAzkabanConfigurationFromDirectory(File dir) {
-    File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+    File azkabanPrivatePropsFile =
+        new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
     File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
 
     Props props = null;
@@ -343,7 +412,9 @@ public class AzkabanExecutorServer {
     } catch (FileNotFoundException e) {
       logger.error("File not found. Could not load azkaban config file", e);
     } catch (IOException e) {
-      logger.error("File found, but error reading. Could not load azkaban config file", e);
+      logger.error(
+          "File found, but error reading. Could not load azkaban config file",
+          e);
     }
 
     return props;
@@ -355,6 +426,13 @@ public class AzkabanExecutorServer {
 
     registerMbean("executorJetty", new JmxJettyServer(server));
     registerMbean("flowRunnerManager", new JmxFlowRunnerManager(runnerManager));
+    registerMbean("jobJMXMBean", JmxJobMBeanManager.getInstance());
+
+    if (JobCallbackManager.isInitialized()) {
+      JobCallbackManager jobCallbackMgr = JobCallbackManager.getInstance();
+      registerMbean("jobCallbackJMXMBean",
+          jobCallbackMgr.getJmxJobCallbackMBean());
+    }
   }
 
   public void close() {
@@ -377,7 +455,8 @@ public class AzkabanExecutorServer {
       logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
       registeredMBeans.add(mbeanName);
     } catch (Exception e) {
-      logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+      logger.error("Error registering mbean " + mbeanClass.getCanonicalName(),
+          e);
     }
 
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
new file mode 100644
index 0000000..e60a512
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -0,0 +1,267 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackStatusEnum.COMPLETED;
+import static azkaban.jobcallback.JobCallbackStatusEnum.FAILURE;
+import static azkaban.jobcallback.JobCallbackStatusEnum.STARTED;
+import static azkaban.jobcallback.JobCallbackStatusEnum.SUCCESS;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.message.BasicHeader;
+import org.apache.log4j.Logger;
+
+import azkaban.event.Event;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.execapp.jmx.JmxJobCallback;
+import azkaban.execapp.jmx.JmxJobCallbackMBean;
+import azkaban.executor.Status;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+
+/**
+ * Responsible processing job callback properties on job status change events.
+ * 
+ * When job callback properties are specified, they will be converted to HTTP
+ * calls to execute. The HTTP requests will be made in asynchronous mode so the
+ * caller to the handleEvent method will not be block. In addition, the HTTP
+ * calls will be configured to time appropriately for connection request,
+ * creating connection, and socket timeout.
+ * 
+ * The HTTP request and response will be logged out the job's log for debugging
+ * and traceability purpose.
+ * 
+ * @author hluu
+ *
+ */
+public class JobCallbackManager implements EventListener {
+
+  private static final Logger logger = Logger
+      .getLogger(JobCallbackManager.class);
+
+  private static boolean isInitialized = false;
+  private static JobCallbackManager instance;
+
+  private static int maxNumCallBack = 3;
+
+  private JmxJobCallbackMBean callbackMbean;
+  private String azkabanHostName;
+  private SimpleDateFormat gmtDateFormatter;
+
+  private static final JobCallbackStatusEnum[] ON_COMPLETION_JOB_CALLBACK_STATUS =
+      { SUCCESS, FAILURE, COMPLETED };
+
+  public static void initialize(Props props) {
+    logger.info("Initializing");
+    instance = new JobCallbackManager(props);
+
+    isInitialized = true;
+  }
+
+  public static boolean isInitialized() {
+    return isInitialized;
+  }
+
+  public static JobCallbackManager getInstance() {
+    if (!isInitialized) {
+      throw new IllegalStateException(JobCallbackManager.class.getName()
+          + " has not been initialized");
+    }
+    return instance;
+  }
+
+  private JobCallbackManager(Props props) {
+    maxNumCallBack = props.getInt("jobcallback.max_count", maxNumCallBack);
+
+    // initialize the request maker
+    JobCallbackRequestMaker.initialize(props);
+
+    callbackMbean =
+        new JmxJobCallback(JobCallbackRequestMaker.getInstance()
+            .getJobcallbackMetrics());
+
+    azkabanHostName = getAzkabanHostName(props);
+
+    gmtDateFormatter = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
+    gmtDateFormatter.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+    logger.info("Initialization completed " + getClass().getName());
+    logger.info("azkabanHostName " + azkabanHostName);
+  }
+
+  public JmxJobCallbackMBean getJmxJobCallbackMBean() {
+    return callbackMbean;
+  }
+
+  @Override
+  public void handleEvent(Event event) {
+    if (!isInitialized) {
+      return;
+    }
+
+    if (event.getRunner() instanceof JobRunner) {
+      try {
+        if (event.getType() == Event.Type.JOB_STARTED) {
+          processJobCallOnStart(event);
+        } else if (event.getType() == Event.Type.JOB_FINISHED) {
+          processJobCallOnFinish(event);
+        }
+      } catch (Throwable e) {
+        // Use job runner logger so user can see the issue in their job log
+        JobRunner jobRunner = (JobRunner) event.getRunner();
+        jobRunner.getLogger().error(
+            "Encountered error while hanlding job callback event", e);
+      }
+    } else {
+      logger.warn("((( Got an unsupported runner: "
+          + event.getRunner().getClass().getName() + " )))");
+    }
+
+  }
+
+  private void processJobCallOnFinish(Event event) {
+    JobRunner jobRunner = (JobRunner) event.getRunner();
+
+    if (!JobCallbackUtil.isThereJobCallbackProperty(jobRunner.getProps(),
+        ON_COMPLETION_JOB_CALLBACK_STATUS)) {
+      return;
+    }
+
+    // don't want to waste time resolving properties if there are
+    // callback properties to parse
+    Props props = PropsUtils.resolveProps(jobRunner.getProps());
+
+    Map<String, String> contextInfo =
+        JobCallbackUtil.buildJobContextInfoMap(event, azkabanHostName);
+
+    JobCallbackStatusEnum jobCallBackStatusEnum = null;
+    Logger jobLogger = jobRunner.getLogger();
+
+    Status jobStatus = jobRunner.getNode().getStatus();
+
+    if (jobStatus == Status.SUCCEEDED) {
+
+      jobCallBackStatusEnum = JobCallbackStatusEnum.SUCCESS;
+
+    } else if (jobStatus == Status.FAILED
+        || jobStatus == Status.FAILED_FINISHING || jobStatus == Status.KILLED) {
+
+      jobCallBackStatusEnum = JobCallbackStatusEnum.FAILURE;
+    } else {
+      jobLogger.info("!!!! WE ARE NOT SUPPORTING JOB CALLBACKS FOR STATUS: "
+          + jobStatus);
+      jobCallBackStatusEnum = null; // to be explicit
+    }
+
+    String jobId = contextInfo.get(JOB_TOKEN);
+
+    if (jobCallBackStatusEnum != null) {
+      List<HttpRequestBase> jobCallbackHttpRequests =
+          JobCallbackUtil.parseJobCallbackProperties(props,
+              jobCallBackStatusEnum, contextInfo, maxNumCallBack, jobLogger);
+
+      if (!jobCallbackHttpRequests.isEmpty()) {
+        String msg =
+            String.format("Making %d job callbacks for status: %s",
+                jobCallbackHttpRequests.size(), jobCallBackStatusEnum.name());
+        jobLogger.info(msg);
+
+        addDefaultHeaders(jobCallbackHttpRequests);
+
+        JobCallbackRequestMaker.getInstance().makeHttpRequest(jobId, jobLogger,
+            jobCallbackHttpRequests);
+      } else {
+        jobLogger.info("No job callbacks for status: " + jobCallBackStatusEnum);
+      }
+    }
+
+    // for completed status
+    List<HttpRequestBase> httpRequestsForCompletedStatus =
+        JobCallbackUtil.parseJobCallbackProperties(props, COMPLETED,
+            contextInfo, maxNumCallBack, jobLogger);
+
+    // now make the call
+    if (!httpRequestsForCompletedStatus.isEmpty()) {
+      jobLogger.info("Making " + httpRequestsForCompletedStatus.size()
+          + " job callbacks for status: " + COMPLETED);
+
+      addDefaultHeaders(httpRequestsForCompletedStatus);
+      JobCallbackRequestMaker.getInstance().makeHttpRequest(jobId, jobLogger,
+          httpRequestsForCompletedStatus);
+    } else {
+      jobLogger.info("No job callbacks for status: " + COMPLETED);
+    }
+  }
+
+  private void processJobCallOnStart(Event event) {
+    JobRunner jobRunner = (JobRunner) event.getRunner();
+
+    if (JobCallbackUtil.isThereJobCallbackProperty(jobRunner.getProps(),
+        JobCallbackStatusEnum.STARTED)) {
+
+      // don't want to waste time resolving properties if there are
+      // callback properties to parse
+      Props props = PropsUtils.resolveProps(jobRunner.getProps());
+
+      Map<String, String> contextInfo =
+          JobCallbackUtil.buildJobContextInfoMap(event, azkabanHostName);
+
+      List<HttpRequestBase> jobCallbackHttpRequests =
+          JobCallbackUtil.parseJobCallbackProperties(props, STARTED,
+              contextInfo, maxNumCallBack, jobRunner.getLogger());
+
+      String jobId = contextInfo.get(JOB_TOKEN);
+      String msg =
+          String.format("Making %d job callbacks for job %s for jobStatus: %s",
+              jobCallbackHttpRequests.size(), jobId, STARTED.name());
+
+      jobRunner.getLogger().info(msg);
+
+      addDefaultHeaders(jobCallbackHttpRequests);
+
+      JobCallbackRequestMaker.getInstance().makeHttpRequest(jobId,
+          jobRunner.getLogger(), jobCallbackHttpRequests);
+    }
+  }
+
+  private String getAzkabanHostName(Props props) {
+    String baseURL = props.get(JobRunner.AZKABAN_WEBSERVER_URL);
+    try {
+      String hostName = InetAddress.getLocalHost().getHostName();
+      if (baseURL != null) {
+        URL url = new URL(baseURL);
+        hostName = url.getHost() + ":" + url.getPort();
+      }
+      return hostName;
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Encountered while getting azkaban host name", e);
+    }
+  }
+
+  private void addDefaultHeaders(List<HttpRequestBase> httpRequests) {
+    if (httpRequests == null) {
+      return;
+    }
+
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
+    format.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+    for (HttpRequestBase httpRequest : httpRequests) {
+      httpRequest.addHeader(new BasicHeader("Date", gmtDateFormatter
+          .format(new Date())));
+      httpRequest.addHeader(new BasicHeader("Host", azkabanHostName));
+    }
+
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
new file mode 100644
index 0000000..f94afe0
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackRequestMaker.java
@@ -0,0 +1,235 @@
+package azkaban.execapp.event;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.FutureRequestExecutionMetrics;
+import org.apache.http.impl.client.FutureRequestExecutionService;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpRequestFutureTask;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/**
+ * Responsible for making the job callback HTTP requests.
+ * 
+ * One of the requirements is to log out the request information and response
+ * using the given logger, which should be the job logger.
+ * 
+ * @author hluu
+ *
+ */
+public class JobCallbackRequestMaker {
+
+  private static final Logger logger = Logger
+      .getLogger(JobCallbackRequestMaker.class);
+
+  private static final int DEFAULT_TIME_OUT_MS = 3000;
+  private static final int DEFAULT_RESPONSE_WAIT_TIME_OUT_MS = 5000;
+  private static final int MAX_RESPONSE_LINE_TO_PRINT = 50;
+
+  private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+
+  private static JobCallbackRequestMaker instance;
+  private static boolean isInitialized = false;
+
+  private FutureRequestExecutionService futureRequestExecutionService;
+  private int responseWaitTimeoutMS = -1;
+
+  public static void initialize(Props props) {
+    if (props == null) {
+      throw new NullPointerException("props argument can't be null");
+    }
+    instance = new JobCallbackRequestMaker(props);
+    isInitialized = true;
+    logger.info("Initialization for " + JobCallbackRequestMaker.class.getName()
+        + " is completed");
+  }
+
+  public static boolean isInitialized() {
+    return isInitialized;
+  }
+
+  public static JobCallbackRequestMaker getInstance() {
+    if (!isInitialized) {
+      throw new IllegalStateException(JobCallbackRequestMaker.class.getName()
+          + " hasn't initialzied");
+    }
+    return instance;
+  }
+
+  private JobCallbackRequestMaker(Props props) {
+
+    int connectionRequestTimeout =
+        props.getInt("jobcallback.connection.request.timeout",
+            DEFAULT_TIME_OUT_MS);
+
+    int connectionTimeout =
+        props.getInt("jobcallback.connection.timeout", DEFAULT_TIME_OUT_MS);
+
+    int socketTimeout =
+        props.getInt("jobcallback.socket.timeout", DEFAULT_TIME_OUT_MS);
+
+    responseWaitTimeoutMS =
+        props.getInt("jobcallback.response.wait.timeout",
+            DEFAULT_RESPONSE_WAIT_TIME_OUT_MS);
+
+    logger.info("responseWaitTimeoutMS: " + responseWaitTimeoutMS);
+
+    RequestConfig requestConfig =
+        RequestConfig.custom()
+            .setConnectionRequestTimeout(connectionRequestTimeout)
+            .setConnectTimeout(connectionTimeout)
+            .setSocketTimeout(socketTimeout).build();
+
+    logger.info("Global request configuration " + requestConfig.toString());
+
+    HttpClient httpClient =
+        HttpClientBuilder.create().setDefaultRequestConfig(requestConfig)
+            .build();
+
+    int jobCallbackThreadPoolSize =
+        props.getInt("jobcallback.thread.pool.size", DEFAULT_THREAD_POOL_SIZE);
+    logger.info("Jobcall thread pool size: " + jobCallbackThreadPoolSize);
+
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(jobCallbackThreadPoolSize);
+    futureRequestExecutionService =
+        new FutureRequestExecutionService(httpClient, executorService);
+  }
+
+  public FutureRequestExecutionMetrics getJobcallbackMetrics() {
+    return futureRequestExecutionService.metrics();
+  }
+
+  public void makeHttpRequest(String jobId, Logger logger,
+      List<HttpRequestBase> httpRequestList) {
+
+    if (httpRequestList == null || httpRequestList.isEmpty()) {
+      logger.info("No HTTP requests to make");
+      return;
+    }
+
+    for (HttpRequestBase httpRequest : httpRequestList) {
+
+      logger.info("Job callback http request: " + httpRequest.toString());
+      logger.info("headers [");
+      for (Header header : httpRequest.getAllHeaders()) {
+        logger.info(String.format("  %s : %s", header.getName(),
+            header.getValue()));
+      }
+      logger.info("]");
+
+      HttpRequestFutureTask<Integer> task =
+          futureRequestExecutionService.execute(httpRequest,
+              HttpClientContext.create(), new LoggingResonseHandler(logger));
+
+      try {
+        // get with timeout
+        Integer statusCode =
+            task.get(responseWaitTimeoutMS, TimeUnit.MILLISECONDS);
+
+        logger.info("http callback status code: " + statusCode);
+      } catch (TimeoutException timeOutEx) {
+        logger
+            .warn("Job callback target took longer "
+                + (responseWaitTimeoutMS / 1000) + " seconds to respond",
+                timeOutEx);
+      } catch (ExecutionException ee) {
+        if (ee.getCause() instanceof SocketTimeoutException) {
+          logger.warn("Job callback target took longer "
+              + (responseWaitTimeoutMS / 1000) + " seconds to respond", ee);
+        } else {
+          logger.warn(
+              "Encountered error while waiting for job callback to complete",
+              ee);
+        }
+      } catch (Throwable e) {
+        logger.warn(
+            "Encountered error while waiting for job callback to complete", e);
+      }
+    }
+  }
+
+  /**
+   * Response handler for logging job callback response using the given logger
+   * instance
+   * 
+   * @author hluu
+   *
+   */
+  private final class LoggingResonseHandler implements ResponseHandler<Integer> {
+
+    private Logger logger;
+
+    public LoggingResonseHandler(Logger logger) {
+      if (logger == null) {
+        throw new NullPointerException("Argument logger can't be null");
+      }
+      this.logger = logger;
+    }
+
+    public Integer handleResponse(final HttpResponse response)
+        throws ClientProtocolException, IOException {
+
+      int statusCode = response.getStatusLine().getStatusCode();
+      BufferedReader bufferedReader = null;
+
+      try {
+        HttpEntity responseEntity = response.getEntity();
+        if (responseEntity != null) {
+          bufferedReader =
+              new BufferedReader(new InputStreamReader(
+                  responseEntity.getContent()));
+
+          String line = "";
+          int lineCount = 0;
+          logger.info("HTTP response [");
+          while ((line = bufferedReader.readLine()) != null) {
+            logger.info(line);
+            lineCount++;
+            if (lineCount > MAX_RESPONSE_LINE_TO_PRINT) {
+              break;
+            }
+          }
+          logger.info("]");
+        } else {
+          logger.info("No response");
+        }
+
+      } catch (Throwable t) {
+        logger.warn(
+            "Encountered error while logging out job callback response", t);
+      } finally {
+        if (bufferedReader != null) {
+          try {
+            bufferedReader.close();
+          } catch (IOException ex) {
+            // don't care
+          }
+        }
+      }
+      return statusCode;
+
+    }
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackUtil.java b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
new file mode 100644
index 0000000..c796186
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
@@ -0,0 +1,294 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.EXECUTION_ID_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.FIRST_JOB_CALLBACK_URL_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.FLOW_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+import static azkaban.jobcallback.JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_GET;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_POST;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_BODY_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_REQUEST_HEADERS_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_REQUEST_METHOD_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_CALLBACK_URL_TEMPLATE;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_STATUS_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.PROJECT_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.SEQUENCE_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.SERVER_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.STATUS_TOKEN;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.message.BasicHeader;
+import org.apache.log4j.Logger;
+
+import azkaban.event.Event;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.ExecutableNode;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+
+public class JobCallbackUtil {
+  private static final Logger logger = Logger.getLogger(JobCallbackUtil.class);
+
+  private static Map<JobCallbackStatusEnum, String> firstJobcallbackPropertyMap =
+      new HashMap<JobCallbackStatusEnum, String>(
+          JobCallbackStatusEnum.values().length);
+
+  static {
+    for (JobCallbackStatusEnum statusEnum : JobCallbackStatusEnum.values()) {
+      firstJobcallbackPropertyMap.put(statusEnum,
+          replaceStatusToken(FIRST_JOB_CALLBACK_URL_TEMPLATE, statusEnum));
+    }
+  }
+
+  /**
+   * Use to quickly determine if there is a job callback related property in the
+   * Props.
+   * 
+   * @param props
+   * @param status
+   * @return true if there is job callback related property
+   */
+  public static boolean isThereJobCallbackProperty(Props props,
+      JobCallbackStatusEnum status) {
+
+    if (props == null || status == null) {
+      throw new NullPointerException("One of the argument is null");
+    }
+
+    String jobCallBackUrl = firstJobcallbackPropertyMap.get(status);
+    return props.containsKey(jobCallBackUrl);
+  }
+
+  public static boolean isThereJobCallbackProperty(Props props,
+      JobCallbackStatusEnum... jobStatuses) {
+
+    if (props == null || jobStatuses == null) {
+      throw new NullPointerException("One of the argument is null");
+    }
+
+    for (JobCallbackStatusEnum jobStatus : jobStatuses) {
+      if (JobCallbackUtil.isThereJobCallbackProperty(props, jobStatus)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static List<HttpRequestBase> parseJobCallbackProperties(Props props,
+      JobCallbackStatusEnum status, Map<String, String> contextInfo,
+      int maxNumCallback) {
+
+    return parseJobCallbackProperties(props, status, contextInfo,
+        maxNumCallback, logger);
+  }
+
+  /**
+   * This method is responsible for parsing job call URL properties and convert
+   * them into a list of HttpRequestBase, which callers can use to execute.
+   * 
+   * In addition to parsing, it will also replace the tokens with actual values.
+   * 
+   * @param props
+   * @param status
+   * @param event
+   * @return List<HttpRequestBase> - empty if no job callback related properties
+   */
+  public static List<HttpRequestBase> parseJobCallbackProperties(Props props,
+      JobCallbackStatusEnum status, Map<String, String> contextInfo,
+      int maxNumCallback, Logger privateLogger) {
+    String callbackUrl = null;
+
+    if (!isThereJobCallbackProperty(props, status)) {
+      // short circuit
+      return Collections.emptyList();
+    }
+
+    List<HttpRequestBase> result = new ArrayList<HttpRequestBase>();
+
+    // replace property templates with status
+    String jobCallBackUrlKey =
+        replaceStatusToken(JOB_CALLBACK_URL_TEMPLATE, status);
+
+    String requestMethod =
+        replaceStatusToken(JOB_CALLBACK_REQUEST_METHOD_TEMPLATE, status);
+
+    String httpBodyKey = replaceStatusToken(JOB_CALLBACK_BODY_TEMPLATE, status);
+
+    String headersKey =
+        replaceStatusToken(JOB_CALLBACK_REQUEST_HEADERS_TEMPLATE, status);
+
+    for (int sequence = 1; sequence <= maxNumCallback; sequence++) {
+      HttpRequestBase httpRequest = null;
+      String sequenceStr = Integer.toString(sequence);
+      // callback url
+      String callbackUrlKey =
+          jobCallBackUrlKey.replace(SEQUENCE_TOKEN, sequenceStr);
+
+      callbackUrl = props.get(callbackUrlKey);
+      if (callbackUrl == null || callbackUrl.length() == 0) {
+        // no more needs to done
+        break;
+      } else {
+        String callbackUrlWithTokenReplaced =
+            replaceToken(callbackUrl, contextInfo);
+
+        String requestMethodKey =
+            requestMethod.replace(SEQUENCE_TOKEN, sequenceStr);
+
+        String method = props.getString(requestMethodKey, HTTP_GET);
+
+        if (HTTP_POST.equals(method)) {
+          String postBodyKey = httpBodyKey.replace(SEQUENCE_TOKEN, sequenceStr);
+          String httpBodyValue = props.get(postBodyKey);
+          if (httpBodyValue == null) {
+            // missing body for POST, not good
+            // update the wiki about skipping callback url if body is missing
+            privateLogger.warn("Missing value for key: " + postBodyKey
+                + " skipping job callback '" + callbackUrl + " for job "
+                + contextInfo.get(JOB_TOKEN));
+          } else {
+            // put together an URL
+            HttpPost httpPost = new HttpPost(callbackUrlWithTokenReplaced);
+            String postActualBody = replaceToken(httpBodyValue, contextInfo);
+            privateLogger.info("postActualBody: " + postActualBody);
+            httpPost.setEntity(createStringEntity(postActualBody));
+            httpRequest = httpPost;
+          }
+        } else if (HTTP_GET.equals(method)) {
+          // GET
+          httpRequest = new HttpGet(callbackUrlWithTokenReplaced);
+        } else {
+          privateLogger.warn("Unsupported request method: " + method
+              + ". Only POST and GET are supported");
+        }
+
+        String headersKeyPerSequence =
+            headersKey.replace(SEQUENCE_TOKEN, sequenceStr);
+        String headersValue = props.get(headersKeyPerSequence);
+        privateLogger.info("headers: " + headersValue);
+        Header[] headers = parseHttpHeaders(headersValue);
+        if (headers != null) {
+          httpRequest.setHeaders(headers);
+          privateLogger.info("# of headers found: " + headers.length);
+        }
+        result.add(httpRequest);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Parse headers
+   * 
+   * @param headers
+   * @return null if headers is null or empty
+   */
+  public static Header[] parseHttpHeaders(String headers) {
+    if (headers == null || headers.length() == 0) {
+      return null;
+    }
+
+    String[] headerArray = headers.split(HEADER_ELEMENT_DELIMITER);
+    List<Header> headerList = new ArrayList<Header>(headerArray.length);
+    for (int i = 0; i < headerArray.length; i++) {
+      String headerPair = headerArray[i];
+      int index = headerPair.indexOf(HEADER_NAME_VALUE_DELIMITER);
+      if (index != -1) {
+        headerList.add(new BasicHeader(headerPair.substring(0, index),
+            headerPair.substring(index + 1)));
+      }
+
+    }
+    return headerList.toArray(new BasicHeader[0]);
+  }
+
+  private static String replaceStatusToken(String template,
+      JobCallbackStatusEnum status) {
+    return template.replace(STATUS_TOKEN, status.name().toLowerCase());
+  }
+
+  private static StringEntity createStringEntity(String str) {
+    try {
+      return new StringEntity(str);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("Encoding not supported", e);
+    }
+  }
+
+  /**
+   * This method takes the job context info. and put the values into a map with
+   * keys as the tokens.
+   * 
+   * @param event
+   * @return Map<String,String>
+   */
+  public static Map<String, String> buildJobContextInfoMap(Event event,
+      String server) {
+
+    if (event.getRunner() instanceof JobRunner) {
+      JobRunner jobRunner = (JobRunner) event.getRunner();
+      ExecutableNode node = jobRunner.getNode();
+      String projectName = node.getParentFlow().getProjectName();
+      String flowName = node.getParentFlow().getFlowId();
+      String executionId =
+          String.valueOf(node.getParentFlow().getExecutionId());
+      String jobId = node.getId();
+
+      Map<String, String> result = new HashMap<String, String>();
+      result.put(SERVER_TOKEN, server);
+      result.put(PROJECT_TOKEN, projectName);
+      result.put(FLOW_TOKEN, flowName);
+      result.put(EXECUTION_ID_TOKEN, executionId);
+      result.put(JOB_TOKEN, jobId);
+      result.put(JOB_STATUS_TOKEN, node.getStatus().name().toLowerCase());
+
+      /*
+       * if (node.getStatus() == Status.SUCCEEDED || node.getStatus() ==
+       * Status.FAILED) { result.put(JOB_STATUS_TOKEN,
+       * node.getStatus().name().toLowerCase()); } else if (node.getStatus() ==
+       * Status.PREPARING) { result.put(JOB_STATUS_TOKEN, "started"); }
+       */
+
+      return result;
+
+    } else {
+      throw new IllegalArgumentException("Provided event is not a job event");
+    }
+  }
+
+  /**
+   * Replace the supported tokens in the URL with values in the contextInfo
+   * 
+   * @param url
+   * @param contextInfo
+   * @return String - url with tokens replaced with values
+   */
+  public static String replaceToken(String url, Map<String, String> contextInfo) {
+
+    String result = url;
+
+    result = result.replace(SERVER_TOKEN, contextInfo.get(SERVER_TOKEN));
+    result = result.replace(PROJECT_TOKEN, contextInfo.get(PROJECT_TOKEN));
+    result = result.replace(FLOW_TOKEN, contextInfo.get(FLOW_TOKEN));
+    result = result.replace(JOB_TOKEN, contextInfo.get(JOB_TOKEN));
+    result =
+        result.replace(EXECUTION_ID_TOKEN, contextInfo.get(EXECUTION_ID_TOKEN));
+    result =
+        result.replace(JOB_STATUS_TOKEN, contextInfo.get(JOB_STATUS_TOKEN));
+
+    return result;
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index e98f5f2..124b58b 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,8 @@ import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.JobCallbackManager;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.executor.ExecutableFlow;
@@ -61,13 +63,13 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.SwapQueue;
 
-
 /**
  * Class that handles the running of a ExecutableFlow DAG
  *
  */
 public class FlowRunner extends EventHandler implements Runnable {
-  private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+  private static final Layout DEFAULT_LAYOUT = new PatternLayout(
+      "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
   // We check update every 5 minutes, just in case things get stuck. But for the
   // most part, we'll be idling.
   private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -96,7 +98,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   private final JobTypeManager jobtypeManager;
 
   private JobRunnerEventListener listener = new JobRunnerEventListener();
-  private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+  private Set<JobRunner> activeJobRunners = Collections
+      .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
 
   // Thread safe swap queue for finishedExecutions.
   private SwapQueue<ExecutableNode> finishedNodes;
@@ -131,8 +134,9 @@ public class FlowRunner extends EventHandler implements Runnable {
    * @param jobtypeManager
    * @throws ExecutorManagerException
    */
-  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
-      JobTypeManager jobtypeManager) throws ExecutorManagerException {
+  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
+      ProjectLoader projectLoader, JobTypeManager jobtypeManager)
+      throws ExecutorManagerException {
     this(flow, executorLoader, projectLoader, jobtypeManager, null);
   }
 
@@ -147,8 +151,9 @@ public class FlowRunner extends EventHandler implements Runnable {
    * @param executorService
    * @throws ExecutorManagerException
    */
-  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
-      JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
+  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
+      ProjectLoader projectLoader, JobTypeManager jobtypeManager,
+      ExecutorService executorService) throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
     this.executorLoader = executorLoader;
@@ -210,18 +215,23 @@ public class FlowRunner extends EventHandler implements Runnable {
       runFlow();
     } catch (Throwable t) {
       if (logger != null) {
-        logger.error("An error has occurred during the running of the flow. Quiting.", t);
+        logger
+            .error(
+                "An error has occurred during the running of the flow. Quiting.",
+                t);
       }
       flow.setStatus(Status.FAILED);
     } finally {
       if (watcher != null) {
         logger.info("Watcher is attached. Stopping watcher.");
         watcher.stopWatcher();
-        logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
+        logger
+            .info("Watcher cancelled status is " + watcher.isWatchCancelled());
       }
 
       flow.setEndTime(System.currentTimeMillis());
-      logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
+      logger.info("Setting end time for flow " + execId + " to "
+          + System.currentTimeMillis());
       closeLogger();
 
       updateFlow();
@@ -246,7 +256,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     // If there are flow overrides, we apply them now.
-    Map<String, String> flowParam = flow.getExecutionOptions().getFlowParameters();
+    Map<String, String> flowParam =
+        flow.getExecutionOptions().getFlowParameters();
     if (flowParam != null && !flowParam.isEmpty()) {
       commonFlowProps = new Props(commonFlowProps, flowParam);
     }
@@ -259,9 +270,11 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(logger);
     }
 
-    logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
+    logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
+        + projectId + " version:" + version);
     if (pipelineExecId != null) {
-      logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
+      logger.info("Running simulateously with " + pipelineExecId
+          + ". Pipelining level " + pipelineLevel);
     }
 
     // The current thread is used for interrupting blocks
@@ -271,8 +284,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private void updateFlowReference() throws ExecutorManagerException {
     logger.info("Update active reference");
-    if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
-      throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
+    if (!executorLoader.updateExecutableReference(execId,
+        System.currentTimeMillis())) {
+      throw new ExecutorManagerException(
+          "The executor reference doesn't exist. May have been killed prematurely.");
     }
   }
 
@@ -395,7 +410,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     resetFailedState(this.flow, retryJobs);
 
     for (ExecutableNode node : retryJobs) {
-      if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+      if (node.getStatus() == Status.READY
+          || node.getStatus() == Status.DISABLED) {
         runReadyJob(node);
       } else if (node.getStatus() == Status.SUCCEEDED) {
         for (String outNodeId : node.getOutNodes()) {
@@ -464,7 +480,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Instant kill or skip if necessary.
     boolean jobsRun = false;
     for (ExecutableNode node : nodesToCheck) {
-      if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
+      if (Status.isStatusFinished(node.getStatus())
+          || Status.isStatusRunning(node.getStatus())) {
         // Really shouldn't get in here.
         continue;
       }
@@ -481,7 +498,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   private boolean runReadyJob(ExecutableNode node) throws IOException {
-    if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
+    if (Status.isStatusFinished(node.getStatus())
+        || Status.isStatusRunning(node.getStatus())) {
       return false;
     }
 
@@ -491,7 +509,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (nextNodeStatus == Status.CANCELLED) {
-      logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
+      logger.info("Cancelling '" + node.getNestedId()
+          + "' due to prior errors.");
       node.cancelNode(System.currentTimeMillis());
       finishExecutableNode(node);
     } else if (nextNodeStatus == Status.SKIPPED) {
@@ -523,8 +542,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (node.getRetries() > node.getAttempt()) {
-      logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of "
-          + node.getRetries());
+      logger.info("Job '" + node.getId() + "' will be retried. Attempt "
+          + node.getAttempt() + " of " + node.getRetries());
       node.setDelayedExecution(node.getRetryBackoff());
       node.resetForRetry();
       return true;
@@ -565,7 +584,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     for (String end : flow.getEndNodes()) {
       ExecutableNode node = flow.getExecutableNode(end);
 
-      if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED
+      if (node.getStatus() == Status.KILLED
+          || node.getStatus() == Status.FAILED
           || node.getStatus() == Status.CANCELLED) {
         succeeded = false;
       }
@@ -587,19 +607,22 @@ public class FlowRunner extends EventHandler implements Runnable {
     flow.setUpdateTime(System.currentTimeMillis());
     long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
     switch (flow.getStatus()) {
-      case FAILED_FINISHING:
-        logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
-        flow.setStatus(Status.FAILED);
-        break;
-      case FAILED:
-      case KILLED:
-      case CANCELLED:
-      case FAILED_SUCCEEDED:
-        logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
-        break;
-      default:
-        flow.setStatus(Status.SUCCEEDED);
-        logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+    case FAILED_FINISHING:
+      logger.info("Setting flow '" + id + "' status to FAILED in "
+          + durationSec + " seconds");
+      flow.setStatus(Status.FAILED);
+      break;
+    case FAILED:
+    case KILLED:
+    case CANCELLED:
+    case FAILED_SUCCEEDED:
+      logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
+          + " in " + durationSec + " seconds");
+      break;
+    default:
+      flow.setStatus(Status.SUCCEEDED);
+      logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
+          + " in " + durationSec + " seconds");
     }
 
     // If the finalized flow is actually the top level flow, than we finish
@@ -660,10 +683,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     // load the override props if any
     try {
-      props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId() + ".jor");
+      props =
+          projectLoader.fetchProjectProperty(flow.getProjectId(),
+              flow.getVersion(), node.getId() + ".jor");
     } catch (ProjectManagerException e) {
       e.printStackTrace();
-      logger.error("Error loading job override property for job " + node.getId());
+      logger.error("Error loading job override property for job "
+          + node.getId());
     }
 
     File path = new File(execDir, source);
@@ -673,7 +699,8 @@ public class FlowRunner extends EventHandler implements Runnable {
         props = new Props(null, path);
       } catch (IOException e) {
         e.printStackTrace();
-        logger.error("Error loading job file " + source + " for job " + node.getId());
+        logger.error("Error loading job file " + source + " for job "
+            + node.getId());
       }
     }
     // setting this fake source as this will be used to determine the location
@@ -710,7 +737,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   public Status getImpliedStatus(ExecutableNode node) {
     // If it's running or finished with 'SUCCEEDED', than don't even
     // bother starting this job.
-    if (Status.isStatusRunning(node.getStatus()) || node.getStatus() == Status.SUCCEEDED) {
+    if (Status.isStatusRunning(node.getStatus())
+        || node.getStatus() == Status.SUCCEEDED) {
       return null;
     }
 
@@ -725,7 +753,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 
       if (!Status.isStatusFinished(depStatus)) {
         return null;
-      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
+      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
+          || depStatus == Status.KILLED) {
         // We propagate failures as KILLED states.
         shouldKill = true;
       }
@@ -733,14 +762,16 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     // If it's disabled but ready to run, we want to make sure it continues
     // being disabled.
-    if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
+    if (node.getStatus() == Status.DISABLED
+        || node.getStatus() == Status.SKIPPED) {
       return Status.SKIPPED;
     }
 
     // If the flow has failed, and we want to finish only the currently running
     // jobs, we just
     // kill everything else. We also kill, if the flow has been cancelled.
-    if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+    if (flowFailed
+        && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
       return Status.CANCELLED;
     } else if (shouldKill || isKilled()) {
       return Status.CANCELLED;
@@ -754,7 +785,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     Props previousOutput = null;
     // Iterate the in nodes again and create the dependencies
     for (String dependency : node.getInNodes()) {
-      Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
+      Props output =
+          node.getParentFlow().getExecutableNode(dependency).getOutputProps();
       if (output != null) {
         output = Props.clone(output);
         output.setParent(previousOutput);
@@ -769,7 +801,9 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Load job file.
     File path = new File(execDir, node.getJobSource());
 
-    JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
+    JobRunner jobRunner =
+        new JobRunner(node, path.getParentFile(), executorLoader,
+            jobtypeManager);
     if (watcher != null) {
       jobRunner.setPipeline(watcher, pipelineLevel);
     }
@@ -781,6 +815,10 @@ public class FlowRunner extends EventHandler implements Runnable {
     jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
     jobRunner.addListener(listener);
 
+    if (JobCallbackManager.isInitialized()) {
+      jobRunner.addListener(JobCallbackManager.getInstance());
+    }
+
     configureJobLevelMetrics(jobRunner);
 
     return jobRunner;
@@ -788,6 +826,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Configure Azkaban metrics tracking for a new jobRunner instance
+   * 
    * @param jobRunner
    */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
@@ -795,15 +834,17 @@ public class FlowRunner extends EventHandler implements Runnable {
     if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
 
-      //Adding NumRunningJobMetric listener
+      // Adding NumRunningJobMetric listener
       jobRunner.addListener((NumRunningJobMetric) metricManager
           .getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME));
 
-      //Adding NumFailedJobMetric listener
+      // Adding NumFailedJobMetric listener
       jobRunner.addListener((NumFailedJobMetric) metricManager
           .getMetricFromName(NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME));
 
     }
+
+    jobRunner.addListener(JmxJobMBeanManager.getInstance());
   }
 
   public void pause(String user) {
@@ -865,7 +906,8 @@ public class FlowRunner extends EventHandler implements Runnable {
       if (watcher != null) {
         logger.info("Watcher is attached. Stopping watcher.");
         watcher.stopWatcher();
-        logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
+        logger
+            .info("Watcher cancelled status is " + watcher.isWatchCancelled());
       }
 
       logger.info("Killing " + activeJobRunners.size() + " jobs.");
@@ -883,7 +925,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-  private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
+  private void resetFailedState(ExecutableFlowBase flow,
+      List<ExecutableNode> nodesToRetry) {
     // bottom up
     LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
     for (String id : flow.getEndNodes()) {
@@ -912,24 +955,24 @@ public class FlowRunner extends EventHandler implements Runnable {
       } else if (node instanceof ExecutableFlowBase) {
         ExecutableFlowBase base = (ExecutableFlowBase) node;
         switch (base.getStatus()) {
-          case CANCELLED:
-            node.setStatus(Status.READY);
-            node.setEndTime(-1);
-            node.setStartTime(-1);
-            node.setUpdateTime(currentTime);
-            // Break out of the switch. We'll reset the flow just like a normal
-            // node
-            break;
-          case KILLED:
-          case FAILED:
-          case FAILED_FINISHING:
-            resetFailedState(base, nodesToRetry);
-            continue;
-          default:
-            // Continue the while loop. If the job is in a finished state that's
-            // not
-            // a failure, we don't want to reset the job.
-            continue;
+        case CANCELLED:
+          node.setStatus(Status.READY);
+          node.setEndTime(-1);
+          node.setStartTime(-1);
+          node.setUpdateTime(currentTime);
+          // Break out of the switch. We'll reset the flow just like a normal
+          // node
+          break;
+        case KILLED:
+        case FAILED:
+        case FAILED_FINISHING:
+          resetFailedState(base, nodesToRetry);
+          continue;
+        default:
+          // Continue the while loop. If the job is in a finished state that's
+          // not
+          // a failure, we don't want to reset the job.
+          continue;
         }
       } else if (node.getStatus() == Status.CANCELLED) {
         // Not a flow, but killed
@@ -937,13 +980,16 @@ public class FlowRunner extends EventHandler implements Runnable {
         node.setStartTime(-1);
         node.setEndTime(-1);
         node.setUpdateTime(currentTime);
-      } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
+      } else if (node.getStatus() == Status.FAILED
+          || node.getStatus() == Status.KILLED) {
         node.resetForRetry();
         nodesToRetry.add(node);
       }
 
-      if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
-        logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
+      if (!(node instanceof ExecutableFlowBase)
+          && node.getStatus() != oldStatus) {
+        logger.info("Resetting job '" + node.getNestedId() + "' from "
+            + oldStatus + " to " + node.getStatus());
       }
 
       for (String inId : node.getInNodes()) {
@@ -965,14 +1011,16 @@ public class FlowRunner extends EventHandler implements Runnable {
       // start node has not.
       for (String id : flow.getStartNodes()) {
         ExecutableNode node = flow.getExecutableNode(id);
-        if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+        if (node.getStatus() == Status.READY
+            || node.getStatus() == Status.DISABLED) {
           nodesToRetry.add(node);
         }
       }
     }
     flow.setUpdateTime(System.currentTimeMillis());
     flow.setEndTime(-1);
-    logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
+    logger.info("Resetting flow '" + flow.getNestedId() + "' from "
+        + oldFlowState + " to " + flow.getStatus());
   }
 
   private void interrupt() {
@@ -993,13 +1041,14 @@ public class FlowRunner extends EventHandler implements Runnable {
         ExecutableNode node = runner.getNode();
         long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (mainSyncObj) {
-          logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds
-              + " seconds");
+          logger.info("Job " + node.getNestedId() + " finished with status "
+              + node.getStatus() + " in " + seconds + " seconds");
 
           // Cancellation is handled in the main thread, but if the flow is
           // paused, the main thread is paused too.
           // This unpauses the flow for cancellation.
-          if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
+          if (flowPaused && node.getStatus() == Status.FAILED
+              && failureAction == FailureAction.CANCEL_ALL) {
             flowPaused = false;
           }
 
@@ -1042,7 +1091,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     ExecutableNode node = flow.getExecutableNodePath(jobId);
     File path = new File(execDir, node.getJobSource());
 
-    String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
+    String attachmentFileName =
+        JobRunner.createAttachmentFileName(node, attempt);
     File attachmentFile = new File(path.getParentFile(), attachmentFileName);
     if (!attachmentFile.exists()) {
       return null;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 124b5c0..c8d7f1c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -43,7 +43,6 @@ import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.execapp.metric.NumFailedFlowMetric;
-import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
@@ -316,8 +315,9 @@ public class FlowRunnerManager implements EventListener,
             wait(RECENTLY_FINISHED_TIME_TO_LIVE);
           } catch (InterruptedException e) {
             logger.info("Interrupted. Probably to shut down.");
-          } catch (Throwable t){
-            logger.warn("Uncaught throwable, please look into why it is not caught", t);
+          } catch (Throwable t) {
+            logger.warn(
+                "Uncaught throwable, please look into why it is not caught", t);
           }
         }
       }
@@ -517,6 +517,7 @@ public class FlowRunnerManager implements EventListener,
 
   /**
    * Configure Azkaban metrics tracking for a new flowRunner instance
+   * 
    * @param flowRunner
    */
   private void configureFlowLevelMetrics(FlowRunner flowRunner) {
@@ -524,10 +525,11 @@ public class FlowRunnerManager implements EventListener,
 
     if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
-      //Adding NumFailedFlow Metric listener
+      // Adding NumFailedFlow Metric listener
       flowRunner.addListener((NumFailedFlowMetric) metricManager
           .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
     }
+
   }
 
   private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallback.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallback.java
new file mode 100644
index 0000000..0b3a11d
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallback.java
@@ -0,0 +1,33 @@
+package azkaban.execapp.jmx;
+
+import org.apache.http.impl.client.FutureRequestExecutionMetrics;
+
+public class JmxJobCallback implements JmxJobCallbackMBean {
+
+  private FutureRequestExecutionMetrics jobCallbackMetrics;
+
+  public JmxJobCallback(FutureRequestExecutionMetrics jobCallbackMetrics) {
+    this.jobCallbackMetrics = jobCallbackMetrics;
+  }
+
+  @Override
+  public long getNumJobCallbacks() {
+    return jobCallbackMetrics.getRequestCount();
+  }
+
+  @Override
+  public long getNumSuccessfulJobCallbacks() {
+    return jobCallbackMetrics.getSuccessfulConnectionCount();
+  }
+
+  @Override
+  public long getNumFailedJobCallbacks() {
+    return jobCallbackMetrics.getFailedConnectionCount();
+  }
+
+  @Override
+  public long getNumActiveJobCallbacks() {
+    return jobCallbackMetrics.getActiveConnectionCount();
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallbackMBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallbackMBean.java
new file mode 100644
index 0000000..4b9d27d
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobCallbackMBean.java
@@ -0,0 +1,18 @@
+package azkaban.execapp.jmx;
+
+import azkaban.jmx.DisplayName;
+
+public interface JmxJobCallbackMBean {
+  @DisplayName("OPERATION: getNumJobCallbacks")
+  public long getNumJobCallbacks();
+
+  @DisplayName("OPERATION: getNumSuccessfulJobCallbacks")
+  public long getNumSuccessfulJobCallbacks();
+
+  @DisplayName("OPERATION: getNumFailedJobCallbacks")
+  public long getNumFailedJobCallbacks();
+
+  @DisplayName("OPERATION: getNumActiveJobCallbacks")
+  public long getNumActiveJobCallbacks();
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
new file mode 100644
index 0000000..686347d
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -0,0 +1,164 @@
+package azkaban.execapp.jmx;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+import azkaban.event.Event;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import azkaban.utils.Props;
+
+/**
+ * Responsible keeping track of job related MBean attributes through listening
+ * to job related events.
+ * 
+ * @author hluu
+ *
+ */
+public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
+
+  private static final Logger logger = Logger
+      .getLogger(JmxJobMBeanManager.class);
+
+  private static JmxJobMBeanManager INSTANCE = new JmxJobMBeanManager();
+
+  private AtomicInteger runningJobCount = new AtomicInteger(0);
+  private AtomicInteger totalExecutedJobCount = new AtomicInteger(0);
+  private AtomicInteger totalFailedJobCount = new AtomicInteger(0);
+  private AtomicInteger totalSucceededJobCount = new AtomicInteger(0);
+
+  private Map<String, AtomicInteger> jobTypeFailureMap =
+      new HashMap<String, AtomicInteger>();
+
+  private Map<String, AtomicInteger> jobTypeSucceededMap =
+      new HashMap<String, AtomicInteger>();
+
+  private boolean initialized;
+
+  private JmxJobMBeanManager() {
+  }
+
+  public static JmxJobMBeanManager getInstance() {
+    return INSTANCE;
+  }
+
+  public void initialize(Props props) {
+    logger.info("Initializing " + getClass().getName());
+    initialized = true;
+  }
+
+  @Override
+  public int getNumRunningJobs() {
+    return runningJobCount.get();
+  }
+
+  @Override
+  public int getTotalNumExecutedJobs() {
+    return totalExecutedJobCount.get();
+  }
+
+  @Override
+  public int getTotalFailedJobs() {
+    return totalFailedJobCount.get();
+  }
+
+  @Override
+  public int getTotalSucceededJobs() {
+    return totalSucceededJobCount.get();
+  }
+
+  @Override
+  public Map<String, Integer> getTotalSucceededJobsByJobType() {
+    return convertMapValueToInteger(jobTypeSucceededMap);
+  }
+
+  @Override
+  public Map<String, Integer> getTotalFailedJobsByJobType() {
+    return convertMapValueToInteger(jobTypeFailureMap);
+  }
+
+  private Map<String, Integer> convertMapValueToInteger(
+      Map<String, AtomicInteger> map) {
+    Map<String, Integer> result = new HashMap<String, Integer>(map.size());
+
+    for (Map.Entry<String, AtomicInteger> entry : map.entrySet()) {
+      result.put(entry.getKey(), entry.getValue().intValue());
+    }
+
+    return result;
+  }
+
+  @Override
+  public void handleEvent(Event event) {
+    if (!initialized) {
+      throw new RuntimeException("JmxJobMBeanManager has not been initialized");
+    }
+
+    if (event.getRunner() instanceof JobRunner) {
+      JobRunner jobRunner = (JobRunner) event.getRunner();
+      ExecutableNode node = jobRunner.getNode();
+
+      if (logger.isDebugEnabled()) {
+        logger.info("*** got " + event.getType() + " " + node.getId() + " "
+            + event.getRunner().getClass().getName() + " status: "
+            + node.getStatus());
+      }
+
+      if (event.getType() == Event.Type.JOB_STARTED) {
+        runningJobCount.incrementAndGet();
+      } else if (event.getType() == Event.Type.JOB_FINISHED) {
+        totalExecutedJobCount.incrementAndGet();
+        if (runningJobCount.intValue() > 0) {
+          runningJobCount.decrementAndGet();
+        } else {
+          logger.warn("runningJobCount not messed up, it is already zero "
+              + "and we are trying to decrement on job event "
+              + Event.Type.JOB_FINISHED);
+        }
+
+        if (node.getStatus() == Status.FAILED) {
+          totalFailedJobCount.incrementAndGet();
+        } else if (node.getStatus() == Status.SUCCEEDED) {
+          totalSucceededJobCount.incrementAndGet();
+        }
+
+        handleJobFinishedCount(node.getStatus(), node.getType());
+      }
+
+    } else {
+      logger.warn("((((((((( Got a different runner: "
+          + event.getRunner().getClass().getName());
+    }
+  }
+
+  private void handleJobFinishedCount(Status status, String jobType) {
+    switch (status) {
+    case FAILED:
+      handleJobFinishedByType(jobTypeFailureMap, jobType);
+      break;
+    case SUCCEEDED:
+      handleJobFinishedByType(jobTypeSucceededMap, jobType);
+      break;
+    default:
+    }
+  }
+
+  private void handleJobFinishedByType(Map<String, AtomicInteger> jobTypeMap,
+      String jobType) {
+
+    synchronized (jobTypeMap) {
+      AtomicInteger count = jobTypeMap.get(jobType);
+      if (count == null) {
+        count = new AtomicInteger();
+      }
+
+      count.incrementAndGet();
+      jobTypeMap.put(jobType, count);
+    }
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMXBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMXBean.java
new file mode 100644
index 0000000..9940188
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxJobMXBean.java
@@ -0,0 +1,33 @@
+package azkaban.execapp.jmx;
+
+import java.util.Map;
+
+import azkaban.jmx.DisplayName;
+
+/**
+ * Define all the MBean attributes at the job level
+ * 
+ * @author hluu
+ *
+ */
+public interface JmxJobMXBean {
+
+  @DisplayName("OPERATION: getNumRunningJobs")
+  public int getNumRunningJobs();
+
+  @DisplayName("OPERATION: getTotalNumExecutedJobs")
+  public int getTotalNumExecutedJobs();
+
+  @DisplayName("OPERATION: getTotalFailedJobs")
+  public int getTotalFailedJobs();
+
+  @DisplayName("OPERATION: getTotalSucceededJobs")
+  public int getTotalSucceededJobs();
+
+  @DisplayName("OPERATION: getTotalSucceededJobsByJobType")
+  public Map<String, Integer> getTotalSucceededJobsByJobType();
+
+  @DisplayName("OPERATION: getTotalFailedJobsByJobType")
+  public Map<String, Integer> getTotalFailedJobsByJobType();
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index 2243fb8..6db6a47 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -51,7 +51,7 @@ import azkaban.utils.Props;
 import azkaban.utils.StringUtils;
 
 public class JobRunner extends EventHandler implements Runnable {
-  private static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
+  public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
 
   private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
       "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -695,6 +695,10 @@ public class JobRunner extends EventHandler implements Runnable {
     return logFile;
   }
 
+  public Logger getLogger() {
+    return logger;
+  }
+
   public static String createLogFileName(ExecutableNode node, int attempt) {
     int executionId = node.getExecutableFlow().getExecutionId();
     String jobId = node.getId();
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 152a3aa..e66a586 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -26,7 +26,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -42,13 +41,11 @@ import azkaban.metric.TimeBasedReportingMetric;
 import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
 import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.server.HttpRequestUtils;
-import azkaban.server.ServerConstants;
 import azkaban.utils.JSONUtils;
 
-
 /**
- * Servlet to communicate with Azkaban exec server
- * This servlet get requests from stats servlet in Azkaban Web server
+ * Servlet to communicate with Azkaban exec server This servlet get requests
+ * from stats servlet in Azkaban Web server
  */
 public class StatsServlet extends HttpServlet implements ConnectorParams {
   private static final long serialVersionUID = 2L;
@@ -58,24 +55,29 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     return HttpRequestUtils.hasParam(request, param);
   }
 
-  public String getParam(HttpServletRequest request, String name) throws ServletException {
+  public String getParam(HttpServletRequest request, String name)
+      throws ServletException {
     return HttpRequestUtils.getParam(request, name);
   }
 
-  public Boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException {
+  public Boolean getBooleanParam(HttpServletRequest request, String name)
+      throws ServletException {
     return HttpRequestUtils.getBooleanParam(request, name);
   }
 
-  public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+  public long getLongParam(HttpServletRequest request, String name)
+      throws ServletException {
     return HttpRequestUtils.getLongParam(request, name);
   }
 
   /**
-   * Handle all get request to Stats Servlet
-   * {@inheritDoc}
-   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
+   * Handle all get request to Stats Servlet {@inheritDoc}
+   * 
+   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+   *      javax.servlet.http.HttpServletResponse)
    */
-  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
     Map<String, Object> ret = new HashMap<String, Object>();
 
     if (hasParam(req, ACTION_PARAM)) {
@@ -103,14 +105,15 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   }
 
   /**
-   * enable or disable metric Manager
-   * A disable will also purge all data from all metric emitters
+   * enable or disable metric Manager A disable will also purge all data from
+   * all metric emitters
    */
-  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
-      boolean enableMetricManager) {
+  private void handleChangeManagerStatusRequest(HttpServletRequest req,
+      Map<String, Object> ret, boolean enableMetricManager) {
     try {
       logger.info("Updating metric manager status");
-      if ((enableMetricManager && MetricReportManager.isInstantiated()) || MetricReportManager.isAvailable()) {
+      if ((enableMetricManager && MetricReportManager.isInstantiated())
+          || MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
         if (enableMetricManager) {
           metricManager.enableManager();
@@ -130,12 +133,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   /**
    * Update number of display snapshots for /stats graphs
    */
-  private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
+  private void handleChangeEmitterPoints(HttpServletRequest req,
+      Map<String, Object> ret) {
     try {
       long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
       if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
-        InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+        InMemoryMetricEmitter memoryEmitter =
+            extractInMemoryMetricEmitter(metricManager);
         memoryEmitter.setReportingInstances(numInstance);
         ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
       } else {
@@ -150,12 +155,14 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   /**
    * Update InMemoryMetricEmitter interval to maintain metric snapshots
    */
-  private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
+  private void handleChangeCleaningInterval(HttpServletRequest req,
+      Map<String, Object> ret) {
     try {
       long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
       if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
-        InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+        InMemoryMetricEmitter memoryEmitter =
+            extractInMemoryMetricEmitter(metricManager);
         memoryEmitter.setReportingInterval(newInterval);
         ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
       } else {
@@ -169,19 +176,24 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Get metric snapshots for a metric and date specification
+   * 
    * @throws ServletException
    */
-  private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+  private void handleGetMetricHistory(HttpServletRequest req,
+      Map<String, Object> ret) throws ServletException {
     if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
-      InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+      InMemoryMetricEmitter memoryEmitter =
+          extractInMemoryMetricEmitter(metricManager);
 
       // if we have a memory emitter
       if (memoryEmitter != null) {
         try {
           List<InMemoryHistoryNode> result =
-              memoryEmitter.getMetrics(getParam(req, STATS_MAP_METRICNAMEPARAM),
-                  parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)),
+              memoryEmitter.getMetrics(
+                  getParam(req, STATS_MAP_METRICNAMEPARAM),
+                  parseDate(getParam(req, STATS_MAP_STARTDATE)),
+                  parseDate(getParam(req, STATS_MAP_ENDDATE)),
                   getBooleanParam(req, STATS_MAP_METRICRETRIEVALMODE));
 
           if (result != null && result.size() > 0) {
@@ -204,7 +216,8 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   /**
    * Get InMemoryMetricEmitter, if available else null
    */
-  private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
+  private InMemoryMetricEmitter extractInMemoryMetricEmitter(
+      MetricReportManager metricManager) {
     InMemoryMetricEmitter memoryEmitter = null;
     for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
       if (emitter instanceof InMemoryMetricEmitter) {
@@ -218,7 +231,8 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   /**
    * Get all the metrics tracked by metric manager
    */
-  private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
+  private void handleGetAllMMetricsName(HttpServletRequest req,
+      Map<String, Object> ret) {
     if (MetricReportManager.isAvailable()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
       List<IMetric<?>> result = metricManager.getAllMetrics();
@@ -226,7 +240,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
         ret.put(RESPONSE_ERROR, "No Metric being tracked");
       } else {
         List<String> metricNames = new LinkedList<String>();
-        for(IMetric<?> metric: result) {
+        for (IMetric<?> metric : result) {
           metricNames.add(metric.getName());
         }
         ret.put("data", metricNames);
@@ -238,15 +252,19 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Update tracking interval for a given metrics
+   * 
    * @throws ServletException
    */
-  private void handleChangeMetricInterval(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+  private void handleChangeMetricInterval(HttpServletRequest req,
+      Map<String, Object> ret) throws ServletException {
     try {
       String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
       long newInterval = getLongParam(req, STATS_MAP_REPORTINGINTERVAL);
       if (MetricReportManager.isAvailable()) {
         MetricReportManager metricManager = MetricReportManager.getInstance();
-        TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
+        TimeBasedReportingMetric<?> metric =
+            (TimeBasedReportingMetric<?>) metricManager
+                .getMetricFromName(metricName);
         metric.updateInterval(newInterval);
         ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
       } else {
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
new file mode 100644
index 0000000..58ce823
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackRequestMakerTest.java
@@ -0,0 +1,230 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.EXECUTION_ID_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.FLOW_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_STATUS_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.PROJECT_TOKEN;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+import azkaban.jobcallback.JobCallbackConstants;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+
+public class JobCallbackRequestMakerTest {
+
+  private static final Logger logger = Logger
+      .getLogger(JobCallbackRequestMakerTest.class);
+
+  private static final String SLEEP_DURATION_PARAM = "sleepDuration";
+  private static final String STATUS_CODE_PARAM = "returnedStatusCode";
+
+  private static final String PROJECT_NANE = "PROJECTX";
+  private static final String FLOW_NANE = "FLOWX";
+  private static final String JOB_NANE = "JOBX";
+  private static final String EXECUTION_ID = "1234";
+
+  private static final int PORT_NUMBER = 8989;
+
+  private static JobCallbackRequestMaker jobCBMaker;
+
+  private static Map<String, String> contextInfo;
+
+  private static Server embeddedJettyServer;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    JobCallbackRequestMaker.initialize(new Props());
+    jobCBMaker = JobCallbackRequestMaker.getInstance();
+
+    contextInfo = new HashMap<String, String>();
+    contextInfo.put(PROJECT_TOKEN, PROJECT_NANE);
+    contextInfo.put(FLOW_TOKEN, FLOW_NANE);
+    contextInfo.put(EXECUTION_ID_TOKEN, EXECUTION_ID);
+    contextInfo.put(JOB_TOKEN, JOB_NANE);
+    contextInfo.put(JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
+
+    embeddedJettyServer = new Server(PORT_NUMBER);
+
+    Context context = new Context(embeddedJettyServer, "/", Context.SESSIONS);
+    context.addServlet(new ServletHolder(new DelayServlet()), "/delay");
+
+    System.out.println("Start server");
+    embeddedJettyServer.start();
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    System.out.println("Shutting down server");
+    embeddedJettyServer.stop();
+    embeddedJettyServer.destroy();
+  }
+
+  private static class DelayServlet extends HttpServlet {
+
+    @Override
+    public void doGet(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+
+      logger.info("Get get request: " + req.getRequestURI());
+      logger.info("Get get request params: " + req.getParameterMap());
+
+      long start = System.currentTimeMillis();
+      String responseMessage = handleDelay(req);
+      logger
+          .info("handleDelay elapse: " + (System.currentTimeMillis() - start));
+
+      responseMessage = handleSimulatedStatusCode(req, resp, responseMessage);
+
+      Writer writer = resp.getWriter();
+      writer.write(responseMessage);
+      writer.close();
+    }
+
+    private String handleSimulatedStatusCode(HttpServletRequest req,
+        HttpServletResponse resp, String responseMessge) {
+      String returnedStatusCodeStr = req.getParameter(STATUS_CODE_PARAM);
+      if (returnedStatusCodeStr != null) {
+        int statusCode = Integer.parseInt(returnedStatusCodeStr);
+        responseMessge = "Not good";
+        resp.setStatus(statusCode);
+      }
+      return responseMessge;
+    }
+
+    private String handleDelay(HttpServletRequest req) {
+      String sleepParamValue = req.getParameter(SLEEP_DURATION_PARAM);
+      if (sleepParamValue != null) {
+        long howLongMS =
+            TimeUnit.MILLISECONDS.convert(Integer.parseInt(sleepParamValue),
+                TimeUnit.SECONDS);
+
+        logger.info("Delay for: " + howLongMS);
+
+        try {
+          Thread.sleep(howLongMS);
+          return "Voila!!";
+        } catch (InterruptedException e) {
+          // don't care
+          return e.getMessage();
+        }
+      }
+      return "";
+    }
+
+    @Override
+    public void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      logger.info("Get post request: " + req.getRequestURI());
+      logger.info("Get post request params: " + req.getParameterMap());
+
+      BufferedReader reader = req.getReader();
+      String line = null;
+      while ((line = reader.readLine()) != null) {
+        logger.info("post body: " + line);
+      }
+      reader.close();
+
+      String responseMessage = handleDelay(req);
+      responseMessage = handleSimulatedStatusCode(req, resp, responseMessage);
+
+      Writer writer = resp.getWriter();
+      writer.write(responseMessage);
+      writer.close();
+    }
+  }
+
+  private String buildUrlForDelay(int delay) {
+    return "http://localhost:" + PORT_NUMBER + "/delay?" + SLEEP_DURATION_PARAM
+        + "=" + delay;
+  }
+
+  private String buildUrlForStatusCode(int sc) {
+    return "http://localhost:" + PORT_NUMBER + "/delay?" + STATUS_CODE_PARAM
+        + "=" + sc;
+  }
+
+  @Test(timeout = 4000)
+  public void basicGetTest() {
+    Props props = new Props();
+    String url = buildUrlForDelay(1);
+
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+
+    List<HttpRequestBase> httpRequestList =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+  }
+
+  @Test(timeout = 4000)
+  public void simulateNotOKStatusCodeTest() {
+    Props props = new Props();
+    String url = buildUrlForStatusCode(404);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+
+    List<HttpRequestBase> httpRequestList =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+  }
+
+  @Test(timeout = 4000)
+  public void unResponsiveGetTest() {
+    Props props = new Props();
+    String url = buildUrlForDelay(10);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+
+    List<HttpRequestBase> httpRequestList =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+  }
+
+  @Test(timeout = 4000)
+  public void basicPostTest() {
+    Props props = new Props();
+    String url = buildUrlForDelay(1);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.method",
+        JobCallbackConstants.HTTP_POST);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.body",
+        "This is it");
+
+    List<HttpRequestBase> httpRequestList =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    jobCBMaker.makeHttpRequest(JOB_NANE, logger, httpRequestList);
+  }
+}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackUtilTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackUtilTest.java
new file mode 100644
index 0000000..a7c0af7
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/event/JobCallbackUtilTest.java
@@ -0,0 +1,388 @@
+package azkaban.execapp.event;
+
+import static azkaban.jobcallback.JobCallbackConstants.EXECUTION_ID_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.FLOW_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_GET;
+import static azkaban.jobcallback.JobCallbackConstants.HTTP_POST;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_STATUS_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.JOB_TOKEN;
+import static azkaban.jobcallback.JobCallbackConstants.PROJECT_TOKEN;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.jobcallback.JobCallbackConstants;
+import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.utils.Props;
+
+public class JobCallbackUtilTest {
+  private static Map<String, String> contextInfo;
+
+  private static final String PROJECT_NANE = "PROJECTX";
+  private static final String FLOW_NANE = "FLOWX";
+  private static final String JOB_NANE = "JOBX";
+  private static final String EXECUTION_ID = "1234";
+
+  @BeforeClass
+  public static void setup() {
+    contextInfo = new HashMap<String, String>();
+    contextInfo.put(PROJECT_TOKEN, PROJECT_NANE);
+    contextInfo.put(FLOW_TOKEN, FLOW_NANE);
+    contextInfo.put(EXECUTION_ID_TOKEN, EXECUTION_ID);
+    contextInfo.put(JOB_TOKEN, JOB_NANE);
+    contextInfo.put(JOB_STATUS_TOKEN, JobCallbackStatusEnum.STARTED.name());
+  }
+
+  @Test
+  public void noCallbackPropertiesTest() {
+    Props props = new Props();
+    props.put("abc", "def");
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED));
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.COMPLETED));
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.FAILURE));
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.SUCCESS));
+  }
+
+  @Test
+  public void hasCallbackPropertiesTest() {
+    Props props = new Props();
+    for (JobCallbackStatusEnum jobStatus : JobCallbackStatusEnum.values()) {
+      props.put(
+          "job.notification." + jobStatus.name().toLowerCase() + ".1.url",
+          "def");
+    }
+
+    System.out.println(props);
+
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED));
+
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.COMPLETED));
+
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.FAILURE));
+
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.SUCCESS));
+  }
+
+  @Test
+  public void multipleStatusWithNoJobCallbackTest() {
+    Props props = new Props();
+    props.put("abc", "def");
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+        JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+  }
+
+  @Test
+  public void multipleStatusesWithJobCallbackTest() {
+    Props props = new Props();
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", "def");
+
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+        JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+    props = new Props();
+    props.put("job.notification."
+        + JobCallbackStatusEnum.COMPLETED.name().toLowerCase() + ".1.url",
+        "def");
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+        JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+    props = new Props();
+    props.put("job.notification."
+        + JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url", "def");
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+        JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+
+    props = new Props();
+    props.put("job.notification."
+        + JobCallbackStatusEnum.SUCCESS.name().toLowerCase() + ".1.url", "def");
+    Assert.assertTrue(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED, JobCallbackStatusEnum.COMPLETED,
+        JobCallbackStatusEnum.FAILURE, JobCallbackStatusEnum.SUCCESS));
+  }
+
+  @Test
+  public void hasCallbackPropertiesWithGapTest() {
+    Props props = new Props();
+    for (JobCallbackStatusEnum jobStatus : JobCallbackStatusEnum.values()) {
+      props.put(
+          "job.notification." + jobStatus.name().toLowerCase() + ".2.url",
+          "def");
+    }
+
+    System.out.println(props);
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.STARTED));
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.COMPLETED));
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.FAILURE));
+
+    Assert.assertFalse(JobCallbackUtil.isThereJobCallbackProperty(props,
+        JobCallbackStatusEnum.SUCCESS));
+  }
+
+  @Test
+  public void noTokenTest() {
+    String urlWithNoToken = "http://www.linkedin.com";
+    String result = JobCallbackUtil.replaceToken(urlWithNoToken, contextInfo);
+    Assert.assertEquals(urlWithNoToken, result);
+  }
+
+  @Test
+  public void oneTokenTest() {
+
+    String urlWithOneToken = "http://www.linkedin.com?project=" + PROJECT_TOKEN;
+
+    String result = JobCallbackUtil.replaceToken(urlWithOneToken, contextInfo);
+    Assert.assertEquals("http://www.linkedin.com?project=" + PROJECT_NANE,
+        result);
+  }
+
+  @Test
+  public void twoTokensTest() {
+
+    String urlWithOneToken =
+        "http://www.linkedin.com?project=" + PROJECT_TOKEN + "&flow="
+            + FLOW_TOKEN;
+
+    String result = JobCallbackUtil.replaceToken(urlWithOneToken, contextInfo);
+    Assert.assertEquals("http://www.linkedin.com?project=" + PROJECT_NANE
+        + "&flow=" + FLOW_NANE, result);
+  }
+
+  @Test
+  public void parseJobCallbackOneGetTest() {
+    Props props = new Props();
+    String url = "http://lva1-rpt07.corp.linkedin.com";
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+    List<HttpRequestBase> result =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals(HTTP_GET, result.get(0).getMethod());
+    Assert.assertEquals(url, result.get(0).getURI().toString());
+  }
+
+  @Test
+  public void parseJobCallbackWithInvalidURLTest() {
+    Props props = new Props();
+    String url = "linkedin.com";
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+    List<HttpRequestBase> result =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals(HTTP_GET, result.get(0).getMethod());
+    Assert.assertEquals(url, result.get(0).getURI().toString());
+  }
+
+  @Test
+  public void parseJobCallbackTwoGetsTest() {
+    Props props = new Props();
+    String[] urls =
+        { "http://lva1-rpt07.corp.linkedin.com",
+            "http://lva1-rpt06.corp.linkedin.com" };
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url",
+        urls[0]);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".2.url",
+        urls[1]);
+    List<HttpRequestBase> result =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    Assert.assertEquals(2, result.size());
+    for (int i = 0; i < urls.length; i++) {
+      Assert.assertEquals(HTTP_GET, result.get(i).getMethod());
+      Assert.assertEquals(urls[i], result.get(i).getURI().toString());
+    }
+  }
+
+  @Test
+  public void parseJobCallbackWithGapTest() {
+    Props props = new Props();
+    String[] urls =
+        { "http://lva1-rpt07.corp.linkedin.com",
+            "http://lva1-rpt06.corp.linkedin.com" };
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url",
+        urls[0]);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".3.url",
+        urls[1]);
+    List<HttpRequestBase> result =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals(HTTP_GET, result.get(0).getMethod());
+    Assert.assertEquals(urls[0], result.get(0).getURI().toString());
+  }
+
+  @Test
+  public void parseJobCallbackWithPostTest() {
+    Props props = new Props();
+    String url = "http://lva1-rpt07.corp.linkedin.com";
+    String bodyText = "{name:\"you\"}";
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.url", url);
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.method",
+        HTTP_POST);
+
+    props.put("job.notification."
+        + JobCallbackStatusEnum.STARTED.name().toLowerCase() + ".1.body",
+        bodyText);
+
+    List<HttpRequestBase> result =
+        JobCallbackUtil.parseJobCallbackProperties(props,
+            JobCallbackStatusEnum.STARTED, contextInfo, 3);
+
+    Assert.assertEquals(1, result.size());
+
+    HttpPost httpPost = (HttpPost) result.get(0);
+
+    Assert.assertEquals(url, httpPost.getURI().toString());
+    Assert.assertEquals(HTTP_POST, httpPost.getMethod());
+
+    Assert.assertEquals(bodyText.length(), httpPost.getEntity()
+        .getContentLength());
+
+  }
+
+  @Test
+  public void noHeaderElementTest() {
+    Header[] headerArr =
+        JobCallbackUtil.parseHttpHeaders("this is an amazing day");
+
+    Assert.assertNotNull(headerArr);
+    Assert.assertEquals(0, headerArr.length);
+  }
+
+  @Test
+  public void oneHeaderElementTest() {
+    String name = "Content-type";
+    String value = "application/json";
+    String headers =
+        name + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + value;
+    Header[] headerArr = JobCallbackUtil.parseHttpHeaders(headers);
+
+    Assert.assertNotNull(headerArr);
+    Assert.assertEquals(1, headerArr.length);
+    Assert.assertEquals(name, headerArr[0].getName());
+    Assert.assertEquals(value, headerArr[0].getValue());
+
+    String headersWithExtraDelimiter =
+        name + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + value
+            + JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+
+    headerArr = JobCallbackUtil.parseHttpHeaders(headersWithExtraDelimiter);
+    Assert.assertNotNull(headerArr);
+    Assert.assertEquals(1, headerArr.length);
+    Assert.assertEquals(name, headerArr[0].getName());
+    Assert.assertEquals(value, headerArr[0].getValue());
+
+  }
+
+  @Test
+  public void multipleHeaderElementTest() {
+    String name1 = "Content-type";
+    String value1 = "application/json";
+
+    String name2 = "Accept";
+    String value2 = "application/xml";
+
+    String name3 = "User-Agent";
+    String value3 =
+        "Mozilla/5.0 (X11; Linux x86_64; rv:12.0) Gecko/20100101 Firefox/21.0";
+
+    String headers = makeHeaderElement(name1, value1);
+    headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+    headers += makeHeaderElement(name2, value2);
+    headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+    headers += makeHeaderElement(name3, value3);
+
+    System.out.println("headers: " + headers);
+    Header[] headerArr = JobCallbackUtil.parseHttpHeaders(headers);
+
+    Assert.assertNotNull(headerArr);
+    Assert.assertEquals(3, headerArr.length);
+    Assert.assertEquals(name1, headerArr[0].getName());
+    Assert.assertEquals(value1, headerArr[0].getValue());
+    Assert.assertEquals(name2, headerArr[1].getName());
+    Assert.assertEquals(value2, headerArr[1].getValue());
+    Assert.assertEquals(name3, headerArr[2].getName());
+    Assert.assertEquals(value3, headerArr[2].getValue());
+  }
+
+  @Test
+  public void partialHeaderElementTest() {
+    String name1 = "Content-type";
+    String value1 = "application/json";
+
+    String name2 = "Accept";
+    String value2 = "";
+
+    String name3 = "User-Agent";
+    String value3 =
+        "Mozilla/5.0 (X11; Linux x86_64; rv:12.0) Gecko/20100101 Firefox/21.0";
+
+    String headers = makeHeaderElement(name1, value1);
+    headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+    headers += makeHeaderElement(name2, value2);
+    headers += JobCallbackConstants.HEADER_ELEMENT_DELIMITER;
+    headers += makeHeaderElement(name3, value3);
+
+    System.out.println("headers: " + headers);
+    Header[] headerArr = JobCallbackUtil.parseHttpHeaders(headers);
+
+    Assert.assertNotNull(headerArr);
+    Assert.assertEquals(3, headerArr.length);
+    Assert.assertEquals(name1, headerArr[0].getName());
+    Assert.assertEquals(value1, headerArr[0].getValue());
+    Assert.assertEquals(name2, headerArr[1].getName());
+    Assert.assertEquals(value2, headerArr[1].getValue());
+    Assert.assertEquals(name3, headerArr[2].getName());
+    Assert.assertEquals(value3, headerArr[2].getValue());
+  }
+
+  private String makeHeaderElement(String name, String value) {
+    return name + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + value;
+  }
+
+}

build.gradle 7(+5 -2)

diff --git a/build.gradle b/build.gradle
index 2ad1434..b476d2e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -102,8 +102,8 @@ project(':azkaban-common') {
     compile('org.apache.commons:commons-email:1.2')
     compile('org.apache.commons:commons-jexl:2.1.1')
     compile('org.apache.commons:commons-math3:3.0')
-    compile('org.apache.httpcomponents:httpclient:4.2.1')
-    compile('org.apache.httpcomponents:httpcore:4.2.1')
+    compile('org.apache.httpcomponents:httpclient:4.3.1')
+    compile('org.apache.httpcomponents:httpcore:4.3')
     compile('org.apache.velocity:velocity:1.7')
     compile('org.codehaus.jackson:jackson-core-asl:1.9.5')
     compile('org.codehaus.jackson:jackson-mapper-asl:1.9.5')
@@ -305,10 +305,13 @@ project(':azkaban-execserver') {
     compile('javax.servlet:servlet-api:2.5')
     compile('joda-time:joda-time:2.0')
     compile('log4j:log4j:1.2.16')
+    compile('org.apache.httpcomponents:httpclient:4.3.1')
+    compile('org.apache.httpcomponents:httpcore:4.3')
     compile('org.mortbay.jetty:jetty:6.1.26')
     compile('org.mortbay.jetty:jetty-util:6.1.26')
     compile('org.codehaus.jackson:jackson-core-asl:1.9.5')
     compile('org.codehaus.jackson:jackson-mapper-asl:1.9.5')
+    
 
     testCompile('junit:junit:4.11')
     testCompile('org.hamcrest:hamcrest-all:1.3')