azkaban-aplcache

Fix issue #1559: Error writing out logs for job on a retry attempt

11/20/2017 7:15:00 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index 21e6b50..e18528e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Base Executable that nodes and flows are based.
@@ -47,6 +48,7 @@ public class ExecutableNode {
   public static final String OUTPUT_PROPS_PARAM = "outputProps";
   public static final String ATTEMPT_PARAM = "attempt";
   public static final String PASTATTEMPTS_PARAM = "pastAttempts";
+  private final AtomicInteger attempt = new AtomicInteger(0);
   private String id;
   private String type = null;
   private volatile Status status = Status.READY;
@@ -54,7 +56,6 @@ public class ExecutableNode {
   private volatile long endTime = -1;
   private long updateTime = -1;
   private volatile boolean killedBySLA = false;
-
   // Path to Job File
   private String jobSource;
   // Path to top level props file
@@ -63,7 +64,6 @@ public class ExecutableNode {
   private Set<String> outNodes = new HashSet<>();
   private Props inputProps;
   private Props outputProps;
-  private int attempt = 0;
   private long delayExecution = 0;
   private ArrayList<ExecutionAttempt> pastAttempts = null;
 
@@ -226,16 +226,12 @@ public class ExecutableNode {
   }
 
   public int getAttempt() {
-    return this.attempt;
-  }
-
-  public void setAttempt(final int attempt) {
-    this.attempt = attempt;
+    return this.attempt.get();
   }
 
   public void resetForRetry() {
-    final ExecutionAttempt pastAttempt = new ExecutionAttempt(this.attempt, this);
-    this.attempt++;
+    final ExecutionAttempt pastAttempt = new ExecutionAttempt(this.attempt.get(), this);
+    this.attempt.incrementAndGet();
 
     synchronized (this) {
       if (this.pastAttempts == null) {
@@ -326,7 +322,7 @@ public class ExecutableNode {
     this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
     this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
     this.updateTime = wrappedMap.getLong(UPDATETIME_PARAM);
-    this.attempt = wrappedMap.getInt(ATTEMPT_PARAM, 0);
+    this.attempt.set(wrappedMap.getInt(ATTEMPT_PARAM, 0));
 
     this.inNodes = new HashSet<>();
     this.inNodes.addAll(wrappedMap.getStringCollection(INNODES_PARAM,
@@ -395,8 +391,8 @@ public class ExecutableNode {
     this.endTime = updateData.getLong(ENDTIME_PARAM);
 
     if (updateData.containsKey(ATTEMPT_PARAM)) {
-      this.attempt = updateData.getInt(ATTEMPT_PARAM);
-      if (this.attempt > 0) {
+      this.attempt.set(updateData.getInt(ATTEMPT_PARAM));
+      if (this.attempt.get() > 0) {
         updatePastAttempts(updateData.<Object>getList(PASTATTEMPTS_PARAM,
             Collections.<Object>emptyList()));
       }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index cd625d8..544ba75 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -282,7 +282,9 @@ public class ExecutableFlowTest {
 
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
     exFlow.setExecutionId(101);
-    exFlow.setAttempt(2);
+    // reset twice so that attempt = 2
+    exFlow.resetForRetry();
+    exFlow.resetForRetry();
     exFlow.setDelayedExecution(1000);
 
     final ExecutionOptions options = new ExecutionOptions();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 23fb470..aa5c856 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -609,11 +609,15 @@ public class JobRunner extends EventHandler implements Runnable {
         "Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
             + " with status " + this.node.getStatus());
 
-    fireEvent(Event.create(this, EventType.JOB_FINISHED,
-        new EventData(finalStatus, this.node.getNestedId())), false);
-    finalizeLogFile(this.node.getAttempt());
-    finalizeAttachmentFile();
-    writeStatus();
+    try {
+      finalizeLogFile(this.node.getAttempt());
+      finalizeAttachmentFile();
+      writeStatus();
+    } finally {
+      // note that FlowRunner thread does node.attempt++ when it receives the JOB_FINISHED event
+      fireEvent(Event.create(this, EventType.JOB_FINISHED,
+          new EventData(finalStatus, this.node.getNestedId())), false);
+    }
   }
 
   private String getNodeRetryLog() {