Details
diff --git a/azkaban-common/src/main/java/azkaban/event/EventData.java b/azkaban-common/src/main/java/azkaban/event/EventData.java
index 77a830c..ae83bdd 100644
--- a/azkaban-common/src/main/java/azkaban/event/EventData.java
+++ b/azkaban-common/src/main/java/azkaban/event/EventData.java
@@ -15,20 +15,19 @@ public class EventData {
* Creates a new EventData instance.
*
* @param status node status.
+ * @param nestedId node id, corresponds to {@link ExecutableNode#getNestedId()}.
*/
- public EventData(Status status) {
- this(status, null);
+ public EventData(Status status, String nestedId) {
+ this.status = status;
+ this.nestedId = nestedId;
}
/**
* Creates a new EventData instance.
- *
- * @param status node status.
- * @param nestedId node id, corresponds to {@link ExecutableNode#getNestedId()}.
+ * @param node node.
*/
- public EventData(Status status, String nestedId) {
- this.status = status;
- this.nestedId = nestedId;
+ public EventData(ExecutableNode node) {
+ this(node.getStatus(), node.getNestedId());
}
public Status getStatus() {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3dc9393..92feba9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1358,7 +1358,7 @@ public class ExecutorManager extends EventHandler implements
ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
cacheDir);
}
- fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
+ fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow)));
recentlyFinished.put(flow.getExecutionId(), flow);
}
@@ -1423,7 +1423,7 @@ public class ExecutorManager extends EventHandler implements
executorLoader.removeActiveExecutableReference(execId);
updaterStage = "finalizing flow " + execId + " cleaning from memory";
- fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
+ fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index b86cfbb..fc5fa4b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -222,7 +222,7 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Fetching job and shared properties.");
loadAllProperties();
- this.fireEventListeners(Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow().getStatus())));
+ this.fireEventListeners(Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
runFlow();
} catch (Throwable t) {
if (logger != null) {
@@ -246,7 +246,7 @@ public class FlowRunner extends EventHandler implements Runnable {
closeLogger();
updateFlow();
- this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(flow)));
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 82db1f2..3d95c3e 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -375,9 +375,9 @@ public class JobRunner extends EventHandler implements Runnable {
if (quickFinish) {
node.setStartTime(time);
- fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus)));
+ fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus, node.getNestedId())));
node.setEndTime(time);
- fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus)));
+ fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus, node.getNestedId())));
return true;
}
@@ -528,7 +528,7 @@ public class JobRunner extends EventHandler implements Runnable {
node.setStartTime(System.currentTimeMillis());
Status finalStatus = node.getStatus();
if (!errorFound && !isKilled()) {
- fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(finalStatus)));
+ fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(node)));
try {
loader.uploadExecutableNode(node, props);
} catch (ExecutorManagerException e1) {
@@ -539,7 +539,8 @@ public class JobRunner extends EventHandler implements Runnable {
if (prepareStatus != null) {
// Writes status to the db
writeStatus();
- fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED, new EventData(prepareStatus)));
+ fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
+ new EventData(prepareStatus, node.getNestedId())));
finalStatus = runJob();
} else {
finalStatus = changeStatus(Status.FAILED);
@@ -561,7 +562,8 @@ public class JobRunner extends EventHandler implements Runnable {
logInfo("Finishing job " + this.jobId + " attempt: " + attemptNo + " at "
+ node.getEndTime() + " with status " + node.getStatus());
- fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(finalStatus)), false);
+ fireEvent(Event.create(this, Type.JOB_FINISHED,
+ new EventData(finalStatus, node.getNestedId())), false);
finalizeLogFile(attemptNo);
finalizeAttachmentFile();
writeStatus();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 1b8a5d6..632f8e7 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -83,12 +83,12 @@ public class JobRunnerTest {
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
runner.run();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -272,11 +272,11 @@ public class JobRunnerTest {
long startTime = System.currentTimeMillis();
ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
runner.run();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -311,7 +311,7 @@ public class JobRunnerTest {
long startTime = System.currentTimeMillis();
ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
Thread thread = new Thread(runner);
@@ -321,7 +321,7 @@ public class JobRunnerTest {
runner.kill();
Thread.sleep(500);
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),