azkaban-aplcache

EventData to always include nestedId (#1119) This gives

5/26/2017 4:53:02 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/event/EventData.java b/azkaban-common/src/main/java/azkaban/event/EventData.java
index 77a830c..ae83bdd 100644
--- a/azkaban-common/src/main/java/azkaban/event/EventData.java
+++ b/azkaban-common/src/main/java/azkaban/event/EventData.java
@@ -15,20 +15,19 @@ public class EventData {
    * Creates a new EventData instance.
    *
    * @param status node status.
+   * @param nestedId node id, corresponds to {@link ExecutableNode#getNestedId()}.
    */
-  public EventData(Status status) {
-    this(status, null);
+  public EventData(Status status, String nestedId) {
+    this.status = status;
+    this.nestedId = nestedId;
   }
 
   /**
    * Creates a new EventData instance.
-   *
-   * @param status node status.
-   * @param nestedId node id, corresponds to {@link ExecutableNode#getNestedId()}.
+   * @param node node.
    */
-  public EventData(Status status, String nestedId) {
-    this.status = status;
-    this.nestedId = nestedId;
+  public EventData(ExecutableNode node) {
+    this(node.getStatus(), node.getNestedId());
   }
 
   public Status getStatus() {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3dc9393..92feba9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1358,7 +1358,7 @@ public class ExecutorManager extends EventHandler implements
                 ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
                     cacheDir);
               }
-              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
+              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow)));
               recentlyFinished.put(flow.getExecutionId(), flow);
             }
 
@@ -1423,7 +1423,7 @@ public class ExecutorManager extends EventHandler implements
       executorLoader.removeActiveExecutableReference(execId);
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
-      fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
+      fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
       recentlyFinished.put(execId, dsFlow);
 
     } catch (ExecutorManagerException e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index b86cfbb..fc5fa4b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -222,7 +222,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       logger.info("Fetching job and shared properties.");
       loadAllProperties();
 
-      this.fireEventListeners(Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow().getStatus())));
+      this.fireEventListeners(Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
       runFlow();
     } catch (Throwable t) {
       if (logger != null) {
@@ -246,7 +246,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       closeLogger();
 
       updateFlow();
-      this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(flow.getStatus())));
+      this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(flow)));
     }
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 82db1f2..3d95c3e 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -375,9 +375,9 @@ public class JobRunner extends EventHandler implements Runnable {
 
     if (quickFinish) {
       node.setStartTime(time);
-      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus)));
+      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus, node.getNestedId())));
       node.setEndTime(time);
-      fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus)));
+      fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus, node.getNestedId())));
       return true;
     }
 
@@ -528,7 +528,7 @@ public class JobRunner extends EventHandler implements Runnable {
     node.setStartTime(System.currentTimeMillis());
     Status finalStatus = node.getStatus();
     if (!errorFound && !isKilled()) {
-      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(finalStatus)));
+      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(node)));
       try {
         loader.uploadExecutableNode(node, props);
       } catch (ExecutorManagerException e1) {
@@ -539,7 +539,8 @@ public class JobRunner extends EventHandler implements Runnable {
       if (prepareStatus != null) {
         // Writes status to the db
         writeStatus();
-        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED, new EventData(prepareStatus)));
+        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
+            new EventData(prepareStatus, node.getNestedId())));
         finalStatus = runJob();
       } else {
         finalStatus = changeStatus(Status.FAILED);
@@ -561,7 +562,8 @@ public class JobRunner extends EventHandler implements Runnable {
     logInfo("Finishing job " + this.jobId + " attempt: " + attemptNo + " at "
         + node.getEndTime() + " with status " + node.getStatus());
 
-    fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(finalStatus)), false);
+    fireEvent(Event.create(this, Type.JOB_FINISHED,
+        new EventData(finalStatus, node.getNestedId())), false);
     finalizeLogFile(attemptNo);
     finalizeAttachmentFile();
     writeStatus();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 1b8a5d6..632f8e7 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -83,12 +83,12 @@ public class JobRunnerTest {
         createJobRunner(1, "testJob", 1, false, loader, eventCollector);
     ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
         || runner.getStatus() != Status.FAILED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -272,11 +272,11 @@ public class JobRunnerTest {
     long startTime = System.currentTimeMillis();
     ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -311,7 +311,7 @@ public class JobRunnerTest {
     long startTime = System.currentTimeMillis();
     ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node.getStatus())));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     Thread thread = new Thread(runner);
@@ -321,7 +321,7 @@ public class JobRunnerTest {
     runner.kill();
     Thread.sleep(500);
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node.getStatus())));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),