azkaban-developers
Changes
src/java/azkaban/execapp/FlowRunner.java 32(+20 -12)
Details
src/java/azkaban/execapp/FlowRunner.java 32(+20 -12)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 44b97a3..77c9016 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -400,7 +400,7 @@ public class FlowRunner extends EventHandler implements Runnable {
resetFailedState(this.flow, retryJobs);
for (ExecutableNode node: retryJobs) {
- if(node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
runReadyJob(node);
}
else if (node.getStatus() == Status.SUCCEEDED){
@@ -570,10 +570,12 @@ public class FlowRunner extends EventHandler implements Runnable {
boolean succeeded = true;
Props previousOutput = null;
- for(String end: flow.getEndNodes()) {
+ for (String end: flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
- if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED || node.getStatus() == Status.CANCELLED) {
+ if (node.getStatus() == Status.KILLED ||
+ node.getStatus() == Status.FAILED ||
+ node.getStatus() == Status.CANCELLED) {
succeeded = false;
}
@@ -683,7 +685,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
File path = new File(execDir, source);
- if(props == null) {
+ if (props == null) {
// if no override prop, load the original one on disk
try {
props = new Props(null, path);
@@ -742,7 +744,9 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!Status.isStatusFinished(depStatus)) {
return null;
}
- else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
+ else if (depStatus == Status.FAILED ||
+ depStatus == Status.CANCELLED ||
+ depStatus == Status.KILLED) {
// We propagate failures as KILLED states.
shouldKill = true;
}
@@ -908,18 +912,21 @@ public class FlowRunner extends EventHandler implements Runnable {
else if (node instanceof ExecutableFlowBase) {
ExecutableFlowBase base = (ExecutableFlowBase)node;
switch (base.getStatus()) {
- case KILLED:
- case FAILED:
- case FAILED_FINISHING:
- resetFailedState(base, nodesToRetry);
- continue;
case CANCELLED:
node.setStatus(Status.READY);
node.setEndTime(-1);
node.setStartTime(-1);
node.setUpdateTime(currentTime);
+ // Break out of the switch. We'll reset the flow just like a normal node
break;
+ case KILLED:
+ case FAILED:
+ case FAILED_FINISHING:
+ resetFailedState(base, nodesToRetry);
+ continue;
default:
+ // Continue the while loop. If the job is in a finished state that's not
+ // a failure, we don't want to reset the job.
continue;
}
}
@@ -930,7 +937,7 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setEndTime(-1);
node.setUpdateTime(currentTime);
}
- else if(node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
+ else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
node.resetForRetry();
nodesToRetry.add(node);
}
@@ -939,12 +946,13 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
}
- for(String inId: node.getInNodes()) {
+ for (String inId: node.getInNodes()) {
ExecutableNode nodeUp = flow.getExecutableNode(inId);
queue.add(nodeUp);
}
}
+ // At this point, the following code will reset the flow
Status oldFlowState = flow.getStatus();
if (maxStartTime == -1) {
// Nothing has run inside the flow, so we assume the flow hasn't even started running yet.