azkaban-aplcache
Changes
azkaban-exec-server/build.gradle 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index 82cf5b3..7eb59f9 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,6 +16,8 @@
package azkaban.event;
+import com.google.common.base.Preconditions;
+
public class Event {
public enum Type {
FLOW_STARTED,
@@ -29,16 +31,14 @@ public class Event {
private final Object runner;
private final Type type;
- private final Object eventData;
+ private final EventData eventData;
private final long time;
- private final boolean shouldUpdate;
- private Event(Object runner, Type type, Object eventData, boolean shouldUpdate) {
+ private Event(Object runner, Type type, EventData eventData) {
this.runner = runner;
this.type = type;
this.eventData = eventData;
this.time = System.currentTimeMillis();
- this.shouldUpdate = shouldUpdate;
}
public Object getRunner() {
@@ -53,24 +53,22 @@ public class Event {
return time;
}
- public Object getData() {
+ public EventData getData() {
return eventData;
}
- public static Event create(Object runner, Type type) {
- return new Event(runner, type, null, true);
- }
-
- public static Event create(Object runner, Type type, Object eventData) {
- return new Event(runner, type, eventData, true);
+ /**
+ * Creates a new event.
+ *
+ * @param runner runner.
+ * @param type type.
+ * @param eventData EventData, null is not allowed.
+ * @return New Event instance.
+ * @throws NullPointerException if EventData is null.
+ */
+ public static Event create(Object runner, Type type, EventData eventData) throws NullPointerException {
+ Preconditions.checkNotNull(eventData, "EventData was null");
+ return new Event(runner, type, eventData);
}
- public static Event create(Object runner, Type type, Object eventData,
- boolean shouldUpdate) {
- return new Event(runner, type, eventData, shouldUpdate);
- }
-
- public boolean isShouldUpdate() {
- return shouldUpdate;
- }
}
diff --git a/azkaban-common/src/main/java/azkaban/event/EventData.java b/azkaban-common/src/main/java/azkaban/event/EventData.java
new file mode 100644
index 0000000..77a830c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/event/EventData.java
@@ -0,0 +1,42 @@
+package azkaban.event;
+
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+/**
+ * Carries an immutable snapshot of the status data, suitable for asynchronous message passing.
+ */
+public class EventData {
+
+ private final Status status;
+ private final String nestedId;
+
+ /**
+ * Creates a new EventData instance.
+ *
+ * @param status node status.
+ */
+ public EventData(Status status) {
+ this(status, null);
+ }
+
+ /**
+ * Creates a new EventData instance.
+ *
+ * @param status node status.
+ * @param nestedId node id, corresponds to {@link ExecutableNode#getNestedId()}.
+ */
+ public EventData(Status status, String nestedId) {
+ this.status = status;
+ this.nestedId = nestedId;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public String getNestedId() {
+ return nestedId;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 7df342c..144d6d4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -45,6 +45,7 @@ import org.joda.time.DateTime;
import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
@@ -1349,7 +1350,7 @@ public class ExecutorManager extends EventHandler implements
ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
cacheDir);
}
- fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
+ fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
recentlyFinished.put(flow.getExecutionId(), flow);
}
@@ -1415,7 +1416,7 @@ public class ExecutorManager extends EventHandler implements
updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
- fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
+ fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
azkaban-exec-server/build.gradle 1(+1 -0)
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 2a77009..1cbbb9b 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -5,6 +5,7 @@ dependencies {
testCompile('org.hamcrest:hamcrest-all:1.3')
testCompile(project(':azkaban-common').sourceSets.test.output)
+ testCompile('com.google.guava:guava:13.0.1')
}
distributions {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index c417b1c..5662d03 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -19,6 +19,7 @@ import org.apache.http.message.BasicHeader;
import org.apache.log4j.Logger;
import azkaban.event.Event;
+import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.execapp.jmx.JmxJobCallback;
@@ -135,6 +136,7 @@ public class JobCallbackManager implements EventListener {
private void processJobCallOnFinish(Event event) {
JobRunner jobRunner = (JobRunner) event.getRunner();
+ EventData eventData = event.getData();
if (!JobCallbackUtil.isThereJobCallbackProperty(jobRunner.getProps(),
ON_COMPLETION_JOB_CALLBACK_STATUS)) {
@@ -151,7 +153,7 @@ public class JobCallbackManager implements EventListener {
JobCallbackStatusEnum jobCallBackStatusEnum = null;
Logger jobLogger = jobRunner.getLogger();
- Status jobStatus = jobRunner.getNode().getStatus();
+ Status jobStatus = eventData.getStatus();
if (jobStatus == Status.SUCCEEDED) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
index a6ed354..e06a306 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackUtil.java
@@ -36,6 +36,7 @@ import org.apache.http.message.BasicHeader;
import org.apache.log4j.Logger;
import azkaban.event.Event;
+import azkaban.event.EventData;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
import azkaban.jobcallback.JobCallbackStatusEnum;
@@ -244,6 +245,7 @@ public class JobCallbackUtil {
if (event.getRunner() instanceof JobRunner) {
JobRunner jobRunner = (JobRunner) event.getRunner();
ExecutableNode node = jobRunner.getNode();
+ EventData eventData = event.getData();
String projectName = node.getParentFlow().getProjectName();
String flowName = node.getParentFlow().getFlowId();
String executionId =
@@ -256,7 +258,7 @@ public class JobCallbackUtil {
result.put(CONTEXT_FLOW_TOKEN, flowName);
result.put(CONTEXT_EXECUTION_ID_TOKEN, executionId);
result.put(CONTEXT_JOB_TOKEN, jobId);
- result.put(CONTEXT_JOB_STATUS_TOKEN, node.getStatus().name().toLowerCase());
+ result.put(CONTEXT_JOB_STATUS_TOKEN, eventData.getStatus().name().toLowerCase());
/*
* if (node.getStatus() == Status.SUCCEEDED || node.getStatus() ==
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index afa3650..a5776f2 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -18,6 +18,7 @@ package azkaban.execapp.event;
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.JobRunner;
@@ -58,10 +59,9 @@ public class LocalFlowWatcher extends FlowWatcher {
if (event.getType() == Type.JOB_FINISHED) {
if (event.getRunner() instanceof FlowRunner) {
// The flow runner will finish a job without it running
- Object data = event.getData();
- if (data instanceof ExecutableNode) {
- ExecutableNode node = (ExecutableNode) data;
- handleJobStatusChange(node.getNestedId(), node.getStatus());
+ EventData eventData = event.getData();
+ if (eventData.getNestedId() != null) {
+ handleJobStatusChange(eventData.getNestedId(), eventData.getStatus());
}
} else if (event.getRunner() instanceof JobRunner) {
// A job runner is finished
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 7eedee4..3e90c13 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -39,6 +39,7 @@ import org.apache.log4j.PatternLayout;
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
@@ -212,7 +213,7 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Fetching job and shared properties.");
loadAllProperties();
- this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
+ this.fireEventListeners(Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow().getStatus())));
runFlow();
} catch (Throwable t) {
if (logger != null) {
@@ -236,7 +237,7 @@ public class FlowRunner extends EventHandler implements Runnable {
closeLogger();
updateFlow();
- this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
}
}
@@ -572,7 +573,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private void finishExecutableNode(ExecutableNode node) {
finishedNodes.add(node);
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ EventData eventData = new EventData(node.getStatus(), node.getNestedId());
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
}
private void finalizeFlow(ExecutableFlowBase flow) {
@@ -1054,15 +1056,16 @@ public class FlowRunner extends EventHandler implements Runnable {
updateFlow();
} else if (event.getType() == Type.JOB_FINISHED) {
ExecutableNode node = runner.getNode();
+ EventData eventData = event.getData();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (mainSyncObj) {
- logger.info("Job " + node.getNestedId() + " finished with status "
- + node.getStatus() + " in " + seconds + " seconds");
+ logger.info("Job " + eventData.getNestedId() + " finished with status "
+ + eventData.getStatus() + " in " + seconds + " seconds");
// Cancellation is handled in the main thread, but if the flow is
// paused, the main thread is paused too.
// This unpauses the flow for cancellation.
- if (flowPaused && node.getStatus() == Status.FAILED
+ if (flowPaused && eventData.getStatus() == Status.FAILED
&& failureAction == FailureAction.CANCEL_ALL) {
flowPaused = false;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 4fb0c62..7b98603 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import azkaban.event.Event;
+import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
@@ -101,12 +102,13 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
if (event.getRunner() instanceof JobRunner) {
JobRunner jobRunner = (JobRunner) event.getRunner();
+ EventData eventData = event.getData();
ExecutableNode node = jobRunner.getNode();
if (logger.isDebugEnabled()) {
logger.debug("*** got " + event.getType() + " " + node.getId() + " "
+ event.getRunner().getClass().getName() + " status: "
- + node.getStatus());
+ + eventData.getStatus());
}
if (event.getType() == Event.Type.JOB_STARTED) {
@@ -121,13 +123,13 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
+ Event.Type.JOB_FINISHED);
}
- if (node.getStatus() == Status.FAILED) {
+ if (eventData.getStatus() == Status.FAILED) {
totalFailedJobCount.incrementAndGet();
- } else if (node.getStatus() == Status.SUCCEEDED) {
+ } else if (eventData.getStatus() == Status.SUCCEEDED) {
totalSucceededJobCount.incrementAndGet();
}
- handleJobFinishedCount(node.getStatus(), node.getType());
+ handleJobFinishedCount(eventData.getStatus(), node.getType());
}
} else {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 6db6a47..eaa0bbc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -33,6 +33,7 @@ import org.apache.log4j.RollingFileAppender;
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.execapp.event.BlockingStatus;
import azkaban.execapp.event.FlowWatcher;
@@ -268,18 +269,18 @@ public class JobRunner extends EventHandler implements Runnable {
if (Status.isStatusFinished(nodeStatus)) {
quickFinish = true;
} else if (nodeStatus == Status.DISABLED) {
- changeStatus(Status.SKIPPED, time);
+ nodeStatus = changeStatus(Status.SKIPPED, time);
quickFinish = true;
} else if (this.isKilled()) {
- changeStatus(Status.KILLED, time);
+ nodeStatus = changeStatus(Status.KILLED, time);
quickFinish = true;
}
if (quickFinish) {
node.setStartTime(time);
- fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
+ fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus)));
node.setEndTime(time);
- fireEvent(Event.create(this, Type.JOB_FINISHED));
+ fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus)));
return true;
}
@@ -428,21 +429,23 @@ public class JobRunner extends EventHandler implements Runnable {
// Start the node.
node.setStartTime(System.currentTimeMillis());
+ Status finalStatus = node.getStatus();
if (!errorFound && !isKilled()) {
- fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
+ fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(finalStatus)));
try {
loader.uploadExecutableNode(node, props);
} catch (ExecutorManagerException e1) {
logger.error("Error writing initial node properties");
}
- if (prepareJob()) {
+ Status prepareStatus = prepareJob();
+ if (prepareStatus != null) {
// Writes status to the db
writeStatus();
- fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
- runJob();
+ fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED, new EventData(prepareStatus)));
+ finalStatus = runJob();
} else {
- changeStatus(Status.FAILED);
+ finalStatus = changeStatus(Status.FAILED);
logError("Job run failed preparing the job.");
}
}
@@ -454,29 +457,30 @@ public class JobRunner extends EventHandler implements Runnable {
// So we set it to KILLED to make sure we know that we forced kill it
// rather than
// it being a legitimate failure.
- changeStatus(Status.KILLED);
+ finalStatus = changeStatus(Status.KILLED);
}
int attemptNo = node.getAttempt();
logInfo("Finishing job " + this.jobId + " attempt: " + attemptNo + " at "
+ node.getEndTime() + " with status " + node.getStatus());
- fireEvent(Event.create(this, Type.JOB_FINISHED), false);
+ fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(finalStatus)), false);
finalizeLogFile(attemptNo);
finalizeAttachmentFile();
writeStatus();
}
- private boolean prepareJob() throws RuntimeException {
+ private Status prepareJob() throws RuntimeException {
// Check pre conditions
if (props == null || this.isKilled()) {
logError("Failing job. The job properties don't exist");
- return false;
+ return null;
}
+ Status finalStatus;
synchronized (syncObject) {
if (node.getStatus() == Status.FAILED || this.isKilled()) {
- return false;
+ return null;
}
if (node.getAttempt() > 0) {
@@ -500,7 +504,7 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(CommonJobProperties.JOB_METADATA_FILE,
createMetaDataFileName(node));
props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
- changeStatus(Status.RUNNING);
+ finalStatus = changeStatus(Status.RUNNING);
// Ability to specify working directory
if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
@@ -512,7 +516,7 @@ public class JobRunner extends EventHandler implements Runnable {
if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
logger.error("User " + jobProxyUser
+ " has no permission to execute this job " + this.jobId + "!");
- return false;
+ return null;
}
}
@@ -520,11 +524,11 @@ public class JobRunner extends EventHandler implements Runnable {
job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
} catch (JobTypeManagerException e) {
logger.error("Failed to build job type", e);
- return false;
+ return null;
}
}
- return true;
+ return finalStatus;
}
/**
@@ -585,17 +589,18 @@ public class JobRunner extends EventHandler implements Runnable {
StringUtils.join2(node.getInNodes(), ","));
}
- private void runJob() {
+ private Status runJob() {
+ Status finalStatus = node.getStatus();
try {
job.run();
} catch (Throwable e) {
if (props.getBoolean("job.succeed.on.failure", false)) {
- changeStatus(Status.FAILED_SUCCEEDED);
+ finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
logError("Job run failed, but will treat it like success.");
logError(e.getMessage() + " cause: " + e.getCause(), e);
} else {
- changeStatus(Status.FAILED);
+ finalStatus = changeStatus(Status.FAILED);
logError("Job run failed!", e);
logError(e.getMessage() + " cause: " + e.getCause());
}
@@ -606,18 +611,21 @@ public class JobRunner extends EventHandler implements Runnable {
}
// If the job is still running, set the status to Success.
- if (!Status.isStatusFinished(node.getStatus())) {
- changeStatus(Status.SUCCEEDED);
+ if (!Status.isStatusFinished(finalStatus)) {
+ finalStatus = changeStatus(Status.SUCCEEDED);
}
+ return finalStatus;
}
- private void changeStatus(Status status) {
+ private Status changeStatus(Status status) {
changeStatus(status, System.currentTimeMillis());
+ return status;
}
- private void changeStatus(Status status, long time) {
+ private Status changeStatus(Status status, long time) {
node.setStatus(status);
node.setUpdateTime(time);
+ return status;
}
private void fireEvent(Event event) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index bb463d4..0a0e937 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -18,6 +18,7 @@ package azkaban.execapp.metric;
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.executor.Status;
@@ -44,8 +45,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
*/
@Override
public synchronized void handleEvent(Event event) {
- JobRunner runner = (JobRunner) event.getRunner();
- if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
+ if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
value = value + 1;
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 2e6dad8..95406f5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
@@ -82,12 +83,12 @@ public class JobRunnerTest {
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
runner.run();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -281,12 +282,11 @@ public class JobRunnerTest {
long startTime = System.currentTimeMillis();
ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
- Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
- || runner.getStatus() != Status.FAILED);
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
runner.run();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -321,9 +321,8 @@ public class JobRunnerTest {
long startTime = System.currentTimeMillis();
ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
- Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
- || runner.getStatus() != Status.FAILED);
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
Thread thread = new Thread(runner);
thread.start();
@@ -344,7 +343,7 @@ public class JobRunnerTest {
}
}
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),