diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index 21e6b50..e18528e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Base Executable that nodes and flows are based.
@@ -47,6 +48,7 @@ public class ExecutableNode {
public static final String OUTPUT_PROPS_PARAM = "outputProps";
public static final String ATTEMPT_PARAM = "attempt";
public static final String PASTATTEMPTS_PARAM = "pastAttempts";
+ private final AtomicInteger attempt = new AtomicInteger(0);
private String id;
private String type = null;
private volatile Status status = Status.READY;
@@ -54,7 +56,6 @@ public class ExecutableNode {
private volatile long endTime = -1;
private long updateTime = -1;
private volatile boolean killedBySLA = false;
-
// Path to Job File
private String jobSource;
// Path to top level props file
@@ -63,7 +64,6 @@ public class ExecutableNode {
private Set<String> outNodes = new HashSet<>();
private Props inputProps;
private Props outputProps;
- private int attempt = 0;
private long delayExecution = 0;
private ArrayList<ExecutionAttempt> pastAttempts = null;
@@ -226,16 +226,12 @@ public class ExecutableNode {
}
public int getAttempt() {
- return this.attempt;
- }
-
- public void setAttempt(final int attempt) {
- this.attempt = attempt;
+ return this.attempt.get();
}
public void resetForRetry() {
- final ExecutionAttempt pastAttempt = new ExecutionAttempt(this.attempt, this);
- this.attempt++;
+ final ExecutionAttempt pastAttempt = new ExecutionAttempt(this.attempt.get(), this);
+ this.attempt.incrementAndGet();
synchronized (this) {
if (this.pastAttempts == null) {
@@ -326,7 +322,7 @@ public class ExecutableNode {
this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
this.updateTime = wrappedMap.getLong(UPDATETIME_PARAM);
- this.attempt = wrappedMap.getInt(ATTEMPT_PARAM, 0);
+ this.attempt.set(wrappedMap.getInt(ATTEMPT_PARAM, 0));
this.inNodes = new HashSet<>();
this.inNodes.addAll(wrappedMap.getStringCollection(INNODES_PARAM,
@@ -395,8 +391,8 @@ public class ExecutableNode {
this.endTime = updateData.getLong(ENDTIME_PARAM);
if (updateData.containsKey(ATTEMPT_PARAM)) {
- this.attempt = updateData.getInt(ATTEMPT_PARAM);
- if (this.attempt > 0) {
+ this.attempt.set(updateData.getInt(ATTEMPT_PARAM));
+ if (this.attempt.get() > 0) {
updatePastAttempts(updateData.<Object>getList(PASTATTEMPTS_PARAM,
Collections.<Object>emptyList()));
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index cd625d8..544ba75 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -282,7 +282,9 @@ public class ExecutableFlowTest {
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
exFlow.setExecutionId(101);
- exFlow.setAttempt(2);
+ // reset twice so that attempt = 2
+ exFlow.resetForRetry();
+ exFlow.resetForRetry();
exFlow.setDelayedExecution(1000);
final ExecutionOptions options = new ExecutionOptions();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 23fb470..aa5c856 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -609,11 +609,15 @@ public class JobRunner extends EventHandler implements Runnable {
"Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
+ " with status " + this.node.getStatus());
- fireEvent(Event.create(this, EventType.JOB_FINISHED,
- new EventData(finalStatus, this.node.getNestedId())), false);
- finalizeLogFile(this.node.getAttempt());
- finalizeAttachmentFile();
- writeStatus();
+ try {
+ finalizeLogFile(this.node.getAttempt());
+ finalizeAttachmentFile();
+ writeStatus();
+ } finally {
+ // note that FlowRunner thread does node.attempt++ when it receives the JOB_FINISHED event
+ fireEvent(Event.create(this, EventType.JOB_FINISHED,
+ new EventData(finalStatus, this.node.getNestedId())), false);
+ }
}
private String getNodeRetryLog() {