azkaban-aplcache

Fix issue #605: Azkaban HTTP callback may skip notification.

10/8/2016 1:47:31 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index 82cf5b3..7eb59f9 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,6 +16,8 @@
 
 package azkaban.event;
 
+import com.google.common.base.Preconditions;
+
 public class Event {
   public enum Type {
     FLOW_STARTED,
@@ -29,16 +31,14 @@ public class Event {
 
   private final Object runner;
   private final Type type;
-  private final Object eventData;
+  private final EventData eventData;
   private final long time;
-  private final boolean shouldUpdate;
 
-  private Event(Object runner, Type type, Object eventData, boolean shouldUpdate) {
+  private Event(Object runner, Type type, EventData eventData) {
     this.runner = runner;
     this.type = type;
     this.eventData = eventData;
     this.time = System.currentTimeMillis();
-    this.shouldUpdate = shouldUpdate;
   }
 
   public Object getRunner() {
@@ -53,24 +53,22 @@ public class Event {
     return time;
   }
 
-  public Object getData() {
+  public EventData getData() {
     return eventData;
   }
 
-  public static Event create(Object runner, Type type) {
-    return new Event(runner, type, null, true);
-  }
-
-  public static Event create(Object runner, Type type, Object eventData) {
-    return new Event(runner, type, eventData, true);
+  /**
+   * Creates a new event.
+   *
+   * @param runner runner.
+   * @param type type.
+   * @param eventData EventData, null is not allowed.
+   * @return New Event instance.
+   * @throws NullPointerException if EventData is null.
+   */
+  public static Event create(Object runner, Type type, EventData eventData) throws NullPointerException {
+    Preconditions.checkNotNull(eventData, "EventData was null");
+    return new Event(runner, type, eventData);
   }
 
-  public static Event create(Object runner, Type type, Object eventData,
-      boolean shouldUpdate) {
-    return new Event(runner, type, eventData, shouldUpdate);
-  }
-
-  public boolean isShouldUpdate() {
-    return shouldUpdate;
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/event/EventData.java b/azkaban-common/src/main/java/azkaban/event/EventData.java
new file mode 100644
index 0000000..77a830c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/event/EventData.java
@@ -0,0 +1,42 @@
+package azkaban.event;
+
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+/**
+ * Carries an immutable snapshot of the status data, suitable for asynchronous message passing.
+ */
+public class EventData {
+
+  private final Status status;
+  private final String nestedId;
+
+  /**
+   * Creates a new EventData instance.
+   *
+   * @param status node status.
+   */
+  public EventData(Status status) {
+    this(status, null);
+  }
+
+  /**
+   * Creates a new EventData instance.
+   *
+   * @param status node status.
+   * @param nestedId node id, corresponds to {@link ExecutableNode#getNestedId()}.
+   */
+  public EventData(Status status, String nestedId) {
+    this.status = status;
+    this.nestedId = nestedId;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public String getNestedId() {
+    return nestedId;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 7df342c..144d6d4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -45,6 +45,7 @@ import org.joda.time.DateTime;
 import azkaban.alert.Alerter;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.executor.selector.ExecutorComparator;
 import azkaban.executor.selector.ExecutorFilter;
@@ -1349,7 +1350,7 @@ public class ExecutorManager extends EventHandler implements
                 ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
                     cacheDir);
               }
-              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
+              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
               recentlyFinished.put(flow.getExecutionId(), flow);
             }
 
@@ -1415,7 +1416,7 @@ public class ExecutorManager extends EventHandler implements
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
       runningFlows.remove(execId);
-      fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
+      fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
       recentlyFinished.put(execId, dsFlow);
 
     } catch (ExecutorManagerException e) {
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 2a77009..1cbbb9b 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -5,6 +5,7 @@ dependencies {
 
   testCompile('org.hamcrest:hamcrest-all:1.3')
   testCompile(project(':azkaban-common').sourceSets.test.output)
+  testCompile('com.google.guava:guava:13.0.1')
 }
 
 distributions {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index c417b1c..5662d03 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -19,6 +19,7 @@ import org.apache.http.message.BasicHeader;
 import org.apache.log4j.Logger;
 
 import azkaban.event.Event;
+import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.execapp.jmx.JmxJobCallback;
@@ -135,6 +136,7 @@ public class JobCallbackManager implements EventListener {
 
   private void processJobCallOnFinish(Event event) {
     JobRunner jobRunner = (JobRunner) event.getRunner();
+    EventData eventData = event.getData();
 
     if (!JobCallbackUtil.isThereJobCallbackProperty(jobRunner.getProps(),
         ON_COMPLETION_JOB_CALLBACK_STATUS)) {
@@ -151,7 +153,7 @@ public class JobCallbackManager implements EventListener {
     JobCallbackStatusEnum jobCallBackStatusEnum = null;
     Logger jobLogger = jobRunner.getLogger();
 
-    Status jobStatus = jobRunner.getNode().getStatus();
+    Status jobStatus = eventData.getStatus();
 
     if (jobStatus == Status.SUCCEEDED) {
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
index a6ed354..e06a306 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
@@ -36,6 +36,7 @@ import org.apache.http.message.BasicHeader;
 import org.apache.log4j.Logger;
 
 import azkaban.event.Event;
+import azkaban.event.EventData;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
 import azkaban.jobcallback.JobCallbackStatusEnum;
@@ -244,6 +245,7 @@ public class JobCallbackUtil {
     if (event.getRunner() instanceof JobRunner) {
       JobRunner jobRunner = (JobRunner) event.getRunner();
       ExecutableNode node = jobRunner.getNode();
+      EventData eventData = event.getData();
       String projectName = node.getParentFlow().getProjectName();
       String flowName = node.getParentFlow().getFlowId();
       String executionId =
@@ -256,7 +258,7 @@ public class JobCallbackUtil {
       result.put(CONTEXT_FLOW_TOKEN, flowName);
       result.put(CONTEXT_EXECUTION_ID_TOKEN, executionId);
       result.put(CONTEXT_JOB_TOKEN, jobId);
-      result.put(CONTEXT_JOB_STATUS_TOKEN, node.getStatus().name().toLowerCase());
+      result.put(CONTEXT_JOB_STATUS_TOKEN, eventData.getStatus().name().toLowerCase());
 
       /*
        * if (node.getStatus() == Status.SUCCEEDED || node.getStatus() ==
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index afa3650..a5776f2 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -18,6 +18,7 @@ package azkaban.execapp.event;
 
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.JobRunner;
@@ -58,10 +59,9 @@ public class LocalFlowWatcher extends FlowWatcher {
       if (event.getType() == Type.JOB_FINISHED) {
         if (event.getRunner() instanceof FlowRunner) {
           // The flow runner will finish a job without it running
-          Object data = event.getData();
-          if (data instanceof ExecutableNode) {
-            ExecutableNode node = (ExecutableNode) data;
-            handleJobStatusChange(node.getNestedId(), node.getStatus());
+          EventData eventData = event.getData();
+          if (eventData.getNestedId() != null) {
+            handleJobStatusChange(eventData.getNestedId(), eventData.getStatus());
           }
         } else if (event.getRunner() instanceof JobRunner) {
           // A job runner is finished
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 7eedee4..3e90c13 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -39,6 +39,7 @@ import org.apache.log4j.PatternLayout;
 
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -212,7 +213,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       logger.info("Fetching job and shared properties.");
       loadAllProperties();
 
-      this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
+      this.fireEventListeners(Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow().getStatus())));
       runFlow();
     } catch (Throwable t) {
       if (logger != null) {
@@ -236,7 +237,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       closeLogger();
 
       updateFlow();
-      this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+      this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
     }
   }
 
@@ -572,7 +573,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private void finishExecutableNode(ExecutableNode node) {
     finishedNodes.add(node);
-    fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+    EventData eventData = new EventData(node.getStatus(), node.getNestedId());
+    fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
   }
 
   private void finalizeFlow(ExecutableFlowBase flow) {
@@ -1054,15 +1056,16 @@ public class FlowRunner extends EventHandler implements Runnable {
         updateFlow();
       } else if (event.getType() == Type.JOB_FINISHED) {
         ExecutableNode node = runner.getNode();
+        EventData eventData = event.getData();
         long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (mainSyncObj) {
-          logger.info("Job " + node.getNestedId() + " finished with status "
-              + node.getStatus() + " in " + seconds + " seconds");
+          logger.info("Job " + eventData.getNestedId() + " finished with status "
+              + eventData.getStatus() + " in " + seconds + " seconds");
 
           // Cancellation is handled in the main thread, but if the flow is
           // paused, the main thread is paused too.
           // This unpauses the flow for cancellation.
-          if (flowPaused && node.getStatus() == Status.FAILED
+          if (flowPaused && eventData.getStatus() == Status.FAILED
               && failureAction == FailureAction.CANCEL_ALL) {
             flowPaused = false;
           }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 4fb0c62..7b98603 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.log4j.Logger;
 
 import azkaban.event.Event;
+import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
@@ -101,12 +102,13 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
 
     if (event.getRunner() instanceof JobRunner) {
       JobRunner jobRunner = (JobRunner) event.getRunner();
+      EventData eventData = event.getData();
       ExecutableNode node = jobRunner.getNode();
 
       if (logger.isDebugEnabled()) {
         logger.debug("*** got " + event.getType() + " " + node.getId() + " "
             + event.getRunner().getClass().getName() + " status: "
-            + node.getStatus());
+            + eventData.getStatus());
       }
 
       if (event.getType() == Event.Type.JOB_STARTED) {
@@ -121,13 +123,13 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
               + Event.Type.JOB_FINISHED);
         }
 
-        if (node.getStatus() == Status.FAILED) {
+        if (eventData.getStatus() == Status.FAILED) {
           totalFailedJobCount.incrementAndGet();
-        } else if (node.getStatus() == Status.SUCCEEDED) {
+        } else if (eventData.getStatus() == Status.SUCCEEDED) {
           totalSucceededJobCount.incrementAndGet();
         }
 
-        handleJobFinishedCount(node.getStatus(), node.getType());
+        handleJobFinishedCount(eventData.getStatus(), node.getType());
       }
 
     } else {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 6db6a47..eaa0bbc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -33,6 +33,7 @@ import org.apache.log4j.RollingFileAppender;
 
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.execapp.event.BlockingStatus;
 import azkaban.execapp.event.FlowWatcher;
@@ -268,18 +269,18 @@ public class JobRunner extends EventHandler implements Runnable {
     if (Status.isStatusFinished(nodeStatus)) {
       quickFinish = true;
     } else if (nodeStatus == Status.DISABLED) {
-      changeStatus(Status.SKIPPED, time);
+      nodeStatus = changeStatus(Status.SKIPPED, time);
       quickFinish = true;
     } else if (this.isKilled()) {
-      changeStatus(Status.KILLED, time);
+      nodeStatus = changeStatus(Status.KILLED, time);
       quickFinish = true;
     }
 
     if (quickFinish) {
       node.setStartTime(time);
-      fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
+      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus)));
       node.setEndTime(time);
-      fireEvent(Event.create(this, Type.JOB_FINISHED));
+      fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus)));
       return true;
     }
 
@@ -428,21 +429,23 @@ public class JobRunner extends EventHandler implements Runnable {
 
     // Start the node.
     node.setStartTime(System.currentTimeMillis());
+    Status finalStatus = node.getStatus();
     if (!errorFound && !isKilled()) {
-      fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
+      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(finalStatus)));
       try {
         loader.uploadExecutableNode(node, props);
       } catch (ExecutorManagerException e1) {
         logger.error("Error writing initial node properties");
       }
 
-      if (prepareJob()) {
+      Status prepareStatus = prepareJob();
+      if (prepareStatus != null) {
         // Writes status to the db
         writeStatus();
-        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
-        runJob();
+        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED, new EventData(prepareStatus)));
+        finalStatus = runJob();
       } else {
-        changeStatus(Status.FAILED);
+        finalStatus = changeStatus(Status.FAILED);
         logError("Job run failed preparing the job.");
       }
     }
@@ -454,29 +457,30 @@ public class JobRunner extends EventHandler implements Runnable {
       // So we set it to KILLED to make sure we know that we forced kill it
       // rather than
       // it being a legitimate failure.
-      changeStatus(Status.KILLED);
+      finalStatus = changeStatus(Status.KILLED);
     }
 
     int attemptNo = node.getAttempt();
     logInfo("Finishing job " + this.jobId + " attempt: " + attemptNo + " at "
         + node.getEndTime() + " with status " + node.getStatus());
 
-    fireEvent(Event.create(this, Type.JOB_FINISHED), false);
+    fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(finalStatus)), false);
     finalizeLogFile(attemptNo);
     finalizeAttachmentFile();
     writeStatus();
   }
 
-  private boolean prepareJob() throws RuntimeException {
+  private Status prepareJob() throws RuntimeException {
     // Check pre conditions
     if (props == null || this.isKilled()) {
       logError("Failing job. The job properties don't exist");
-      return false;
+      return null;
     }
 
+    Status finalStatus;
     synchronized (syncObject) {
       if (node.getStatus() == Status.FAILED || this.isKilled()) {
-        return false;
+        return null;
       }
 
       if (node.getAttempt() > 0) {
@@ -500,7 +504,7 @@ public class JobRunner extends EventHandler implements Runnable {
       props.put(CommonJobProperties.JOB_METADATA_FILE,
           createMetaDataFileName(node));
       props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
-      changeStatus(Status.RUNNING);
+      finalStatus = changeStatus(Status.RUNNING);
 
       // Ability to specify working directory
       if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
@@ -512,7 +516,7 @@ public class JobRunner extends EventHandler implements Runnable {
         if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
           logger.error("User " + jobProxyUser
               + " has no permission to execute this job " + this.jobId + "!");
-          return false;
+          return null;
         }
       }
 
@@ -520,11 +524,11 @@ public class JobRunner extends EventHandler implements Runnable {
         job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
       } catch (JobTypeManagerException e) {
         logger.error("Failed to build job type", e);
-        return false;
+        return null;
       }
     }
 
-    return true;
+    return finalStatus;
   }
 
   /**
@@ -585,17 +589,18 @@ public class JobRunner extends EventHandler implements Runnable {
         StringUtils.join2(node.getInNodes(), ","));
   }
 
-  private void runJob() {
+  private Status runJob() {
+    Status finalStatus = node.getStatus();
     try {
       job.run();
     } catch (Throwable e) {
 
       if (props.getBoolean("job.succeed.on.failure", false)) {
-        changeStatus(Status.FAILED_SUCCEEDED);
+        finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
         logError("Job run failed, but will treat it like success.");
         logError(e.getMessage() + " cause: " + e.getCause(), e);
       } else {
-        changeStatus(Status.FAILED);
+        finalStatus = changeStatus(Status.FAILED);
         logError("Job run failed!", e);
         logError(e.getMessage() + " cause: " + e.getCause());
       }
@@ -606,18 +611,21 @@ public class JobRunner extends EventHandler implements Runnable {
     }
 
     // If the job is still running, set the status to Success.
-    if (!Status.isStatusFinished(node.getStatus())) {
-      changeStatus(Status.SUCCEEDED);
+    if (!Status.isStatusFinished(finalStatus)) {
+      finalStatus = changeStatus(Status.SUCCEEDED);
     }
+    return finalStatus;
   }
 
-  private void changeStatus(Status status) {
+  private Status changeStatus(Status status) {
     changeStatus(status, System.currentTimeMillis());
+    return status;
   }
 
-  private void changeStatus(Status status, long time) {
+  private Status changeStatus(Status status, long time) {
     node.setStatus(status);
     node.setUpdateTime(time);
+    return status;
   }
 
   private void fireEvent(Event event) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index bb463d4..0a0e937 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -18,6 +18,7 @@ package azkaban.execapp.metric;
 
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.Status;
@@ -44,8 +45,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
    */
   @Override
   public synchronized void handleEvent(Event event) {
-    JobRunner runner = (JobRunner) event.getRunner();
-    if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
+    if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
       value = value + 1;
     }
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 2e6dad8..95406f5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
@@ -82,12 +83,12 @@ public class JobRunnerTest {
         createJobRunner(1, "testJob", 1, false, loader, eventCollector);
     ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
         || runner.getStatus() != Status.FAILED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -281,12 +282,11 @@ public class JobRunnerTest {
     long startTime = System.currentTimeMillis();
     ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
-    Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
-        || runner.getStatus() != Status.FAILED);
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+    Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -321,9 +321,8 @@ public class JobRunnerTest {
     long startTime = System.currentTimeMillis();
     ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
-    Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
-        || runner.getStatus() != Status.FAILED);
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+    Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     Thread thread = new Thread(runner);
     thread.start();
@@ -344,7 +343,7 @@ public class JobRunnerTest {
       }
     }
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),