azkaban-aplcache

Condition on job status - Step 1 (#1854) * Condition on job

7/18/2018 5:43:57 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index 6db3fb8..b143f84 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -16,6 +16,7 @@
 
 package azkaban.executor;
 
+import azkaban.flow.ConditionOnJobStatus;
 import azkaban.flow.Node;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -44,6 +45,7 @@ public class ExecutableNode {
   public static final String OUTNODES_PARAM = "outNodes";
   public static final String TYPE_PARAM = "type";
   public static final String CONDITION_PARAM = "condition";
+  public static final String CONDITION_ON_JOB_STATUS_PARAM = "conditionOnJobStatus";
   public static final String PROPS_SOURCE_PARAM = "propSource";
   public static final String JOB_SOURCE_PARAM = "jobSource";
   public static final String OUTPUT_PROPS_PARAM = "outputProps";
@@ -68,6 +70,7 @@ public class ExecutableNode {
   private long delayExecution = 0;
   private ArrayList<ExecutionAttempt> pastAttempts = null;
   private String condition;
+  private ConditionOnJobStatus conditionOnJobStatus = ConditionOnJobStatus.ALL_SUCCESS;
 
   // Transient. These values aren't saved, but rediscovered.
   private ExecutableFlowBase parentFlow;
@@ -79,18 +82,20 @@ public class ExecutableNode {
   }
 
   public ExecutableNode(final Node node, final ExecutableFlowBase parent) {
-    this(node.getId(), node.getType(), node.getCondition(), node.getJobSource(), node
+    this(node.getId(), node.getType(), node.getCondition(), node.getConditionOnJobStatus(), node
+        .getJobSource(), node
         .getPropsSource(), parent);
   }
 
-  public ExecutableNode(final String id, final String type, final String condition, final String
-      jobSource,
+  public ExecutableNode(final String id, final String type, final String condition,
+      final ConditionOnJobStatus conditionOnJobStatus, final String jobSource,
       final String propsSource, final ExecutableFlowBase parent) {
     this.id = id;
     this.jobSource = jobSource;
     this.propsSource = propsSource;
     this.type = type;
     this.condition = condition;
+    this.conditionOnJobStatus = conditionOnJobStatus;
     setParentFlow(parent);
   }
 
@@ -289,6 +294,7 @@ public class ExecutableNode {
     objMap.put(UPDATETIME_PARAM, this.updateTime);
     objMap.put(TYPE_PARAM, this.type);
     objMap.put(CONDITION_PARAM, this.condition);
+    objMap.put(CONDITION_ON_JOB_STATUS_PARAM, this.conditionOnJobStatus.toString());
     objMap.put(ATTEMPT_PARAM, this.attempt);
 
     if (this.inNodes != null && !this.inNodes.isEmpty()) {
@@ -324,6 +330,8 @@ public class ExecutableNode {
     this.id = wrappedMap.getString(ID_PARAM);
     this.type = wrappedMap.getString(TYPE_PARAM);
     this.condition = wrappedMap.getString(CONDITION_PARAM);
+    this.conditionOnJobStatus = ConditionOnJobStatus.fromString(wrappedMap.getString
+        (CONDITION_ON_JOB_STATUS_PARAM));
     this.status = Status.valueOf(wrappedMap.getString(STATUS_PARAM));
     this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
     this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
@@ -468,4 +476,12 @@ public class ExecutableNode {
   public void setCondition(final String condition) {
     this.condition = condition;
   }
+
+  public ConditionOnJobStatus getConditionOnJobStatus() {
+    return this.conditionOnJobStatus;
+  }
+
+  public void setConditionOnJobStatus(final ConditionOnJobStatus conditionOnJobStatus) {
+    this.conditionOnJobStatus = conditionOnJobStatus;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/Status.java b/azkaban-common/src/main/java/azkaban/executor/Status.java
index bd7eac3..3ca64c3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Status.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Status.java
@@ -74,6 +74,28 @@ public enum Status {
     }
   }
 
+  public static boolean isStatusFailed(final Status status) {
+    switch (status) {
+      case FAILED:
+      case KILLED:
+      case CANCELLED:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  public static boolean isStatusSucceeded(final Status status) {
+    switch (status) {
+      case SUCCEEDED:
+      case FAILED_SUCCEEDED:
+      case SKIPPED:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   public int getNumVal() {
     return this.numVal;
   }
diff --git a/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java b/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
new file mode 100644
index 0000000..8f9d0a2
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
@@ -0,0 +1,47 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.flow;
+
+public enum ConditionOnJobStatus {
+  ALL_SUCCESS("all_success"),
+  ALL_FAILED("all_failed"),
+  ALL_DONE("all_done"),
+  ONE_FAILED("one_failed"),
+  ONE_SUCCESS("one_success"),
+  ONE_FAILED_ALL_DONE("one_failed_all_done"),
+  ONE_SUCCESS_ALL_DONE("one_success_all_done");
+
+  private final String condition;
+
+  ConditionOnJobStatus(final String condition) {
+    this.condition = condition;
+  }
+
+  public static ConditionOnJobStatus fromString(final String condition) {
+    for (final ConditionOnJobStatus conditionOnJobStatus : ConditionOnJobStatus.values()) {
+      if (conditionOnJobStatus.condition.equalsIgnoreCase(condition)) {
+        return conditionOnJobStatus;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return this.condition;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/flow/Node.java b/azkaban-common/src/main/java/azkaban/flow/Node.java
index 71b68a3..cf16dc7 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Node.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Node.java
@@ -36,6 +36,8 @@ public class Node {
 
   private String condition = null;
 
+  private ConditionOnJobStatus conditionOnJobStatus = ConditionOnJobStatus.ALL_SUCCESS;
+
   public Node(final String id) {
     this.id = id;
   }
@@ -191,4 +193,12 @@ public class Node {
   public void setCondition(final String condition) {
     this.condition = condition;
   }
+
+  public ConditionOnJobStatus getConditionOnJobStatus() {
+    return this.conditionOnJobStatus;
+  }
+
+  public void setConditionOnJobStatus(final ConditionOnJobStatus conditionOnJobStatus) {
+    this.conditionOnJobStatus = conditionOnJobStatus;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index f4c6765..2af9ac8 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -17,6 +17,7 @@
 package azkaban.project;
 
 import azkaban.Constants;
+import azkaban.flow.ConditionOnJobStatus;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
@@ -33,6 +34,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -164,6 +168,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     final Node node = new Node(azkabanNode.getName());
     node.setType(azkabanNode.getType());
     node.setCondition(azkabanNode.getCondition());
+    setConditionOnJobStatus(node);
     node.setPropsSource(flowFile.getName());
     node.setJobSource(flowFile.getName());
 
@@ -217,4 +222,25 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     }
   }
 
+  private void setConditionOnJobStatus(final Node node) {
+    String condition = node.getCondition();
+    if (condition != null) {
+      // Only values in the ConditionOnJobStatus enum can be matched by this pattern. Some examples:
+      // Valid: all_done, one_success && ${jobA: param1} == 1, ALL_FAILED
+      // Invalid: two_success, one_faileddd, {one_failed}
+      final String patternString =
+          "(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b";
+      final Pattern pattern = Pattern.compile(patternString);
+      final Matcher matcher = pattern.matcher(condition);
+
+      // Todo jamiesjc: need to add validation for condition
+      while (matcher.find()) {
+        logger.info("Found conditionOnJobStatus: " + matcher.group(1));
+        node.setConditionOnJobStatus(ConditionOnJobStatus.fromString(matcher.group(1)));
+        condition = condition.replace(matcher.group(1), "true");
+        logger.info("condition is " + condition);
+        node.setCondition(condition);
+      }
+    }
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 1e0eb37..978b4ea 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -42,13 +42,10 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
   private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
   private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
-  private static final String CONDITIONAL_FLOW_YAML_DIR = "conditionalflowyamltest";
   private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
   private static final String BASIC_FLOW_1 = "basic_flow";
   private static final String BASIC_FLOW_2 = "basic_flow2";
   private static final String EMBEDDED_FLOW = "embedded_flow";
-  private static final String CONDITIONAL_FLOW_1 = "conditional_flow1";
-  private static final String CONDITIONAL_FLOW_2 = "conditional_flow2";
   private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
       "embedded_flow1";
   private static final String EMBEDDED_FLOW_2 =
@@ -171,16 +168,6 @@ public class DirectoryYamlFlowLoaderTest {
     checkFlowLoaderProperties(loader, 0, 0, 0);
   }
 
-  @Test
-  public void testLoadConditionalWorkflowYamlFile() {
-    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
-    loader.loadProjectFlow(this.project,
-        ExecutionsTestUtil.getFlowDir(CONDITIONAL_FLOW_YAML_DIR));
-    checkFlowLoaderProperties(loader, 0, 2, 2);
-    checkFlowProperties(loader, CONDITIONAL_FLOW_1, 0, 4, 1, 4, null);
-    checkFlowProperties(loader, CONDITIONAL_FLOW_2, 0, 4, 1, 4, null);
-  }
-
   private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
       final int numFlowMap, final int numEdgeMap) {
     assertThat(loader.getErrors().size()).isEqualTo(numError);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
new file mode 100644
index 0000000..0c0d2b2
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
@@ -0,0 +1,101 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.execapp;
+
+import static azkaban.flow.ConditionOnJobStatus.ALL_FAILED;
+import static azkaban.flow.ConditionOnJobStatus.ALL_SUCCESS;
+import static azkaban.flow.ConditionOnJobStatus.ONE_FAILED;
+import static azkaban.flow.ConditionOnJobStatus.ONE_FAILED_ALL_DONE;
+import static azkaban.flow.ConditionOnJobStatus.ONE_SUCCESS;
+import static azkaban.flow.ConditionOnJobStatus.ONE_SUCCESS_ALL_DONE;
+
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import azkaban.flow.ConditionOnJobStatus;
+
+public class ConditionalWorkflowUtils {
+
+  public static final String SATISFIED = "satisfied";
+  public static final String PENDING = "pending";
+  public static final String FAILED = "failed";
+
+  public static String checkConditionOnJobStatus(final ExecutableNode node) {
+    final ConditionOnJobStatus conditionOnJobStatus = node.getConditionOnJobStatus();
+    switch (conditionOnJobStatus) {
+      case ALL_SUCCESS:
+      case ALL_FAILED:
+      case ALL_DONE:
+        return checkAllStatus(node, conditionOnJobStatus);
+      case ONE_FAILED:
+      case ONE_SUCCESS:
+        return checkOneStatus(node, conditionOnJobStatus);
+      case ONE_FAILED_ALL_DONE:
+      case ONE_SUCCESS_ALL_DONE:
+        return checkOneStatusAllDone(node, conditionOnJobStatus);
+      default:
+        return checkAllStatus(node, ALL_SUCCESS);
+    }
+  }
+
+  private static String checkAllStatus(final ExecutableNode node, final ConditionOnJobStatus
+      condition) {
+    String result = SATISFIED;
+    for (final String dependency : node.getInNodes()) {
+      final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
+      final Status depStatus = dependencyNode.getStatus();
+      if (!Status.isStatusFinished(depStatus)) {
+        return PENDING;
+      } else if ((condition.equals(ALL_SUCCESS) && Status.isStatusFailed(depStatus)) ||
+          (condition.equals(ALL_FAILED) && Status.isStatusSucceeded(depStatus))) {
+        result = FAILED;
+      }
+    }
+    return result;
+  }
+
+  private static String checkOneStatus(final ExecutableNode node, final ConditionOnJobStatus
+      condition) {
+    boolean finished = true;
+    for (final String dependency : node.getInNodes()) {
+      final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
+      final Status depStatus = dependencyNode.getStatus();
+      if ((condition.equals(ONE_SUCCESS) && Status.isStatusSucceeded(depStatus)) ||
+          (condition.equals(ONE_FAILED) && Status.isStatusFailed(depStatus))) {
+        return SATISFIED;
+      } else if (!Status.isStatusFinished(depStatus)) {
+        finished = false;
+      }
+    }
+    return finished ? FAILED : PENDING;
+  }
+
+  private static String checkOneStatusAllDone(final ExecutableNode node, final ConditionOnJobStatus
+      condition) {
+    String result = FAILED;
+    for (final String dependency : node.getInNodes()) {
+      final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
+      final Status depStatus = dependencyNode.getStatus();
+      if (!Status.isStatusFinished(depStatus)) {
+        return PENDING;
+      } else if ((condition.equals(ONE_SUCCESS_ALL_DONE) && Status.isStatusSucceeded(depStatus)) ||
+          (condition.equals(ONE_FAILED_ALL_DONE) && Status.isStatusFailed(depStatus))) {
+        result = SATISFIED;
+      }
+    }
+    return result;
+  }
+
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 54b4342..df5f3c5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -17,6 +17,9 @@
 package azkaban.execapp;
 
 import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
+import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
+import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
 
 import azkaban.Constants;
 import azkaban.Constants.JobProperties;
@@ -38,6 +41,7 @@ import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
+import azkaban.flow.ConditionOnJobStatus;
 import azkaban.flow.FlowProps;
 import azkaban.flow.FlowUtils;
 import azkaban.jobExecutor.ProcessJob;
@@ -477,12 +481,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         // The job cannot be retried or has run out of retry attempts. We will
         // fail the job and its flow now.
         if (!retryJobIfPossible(node)) {
-          propagateStatus(node.getParentFlow(),
-              node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
-          if (this.failureAction == FailureAction.CANCEL_ALL) {
-            this.kill();
-          }
-          this.flowFailed = true;
+          setFlowFailed(node);
         } else {
           nodesToCheck.add(node);
           continue;
@@ -490,6 +489,9 @@ public class FlowRunner extends EventHandler implements Runnable {
       }
 
       if (outNodeIds.isEmpty() && isFlowReadytoFinalize(parentFlow)) {
+        // Todo jamiesjc: For conditional workflows, if conditionOnJobStatus is ONE_SUCCESS or
+        // ONE_FAILED, some jobs might still be running when the end nodes have finished. In this
+        // case, we need to kill all running jobs before finalizing the flow.
         finalizeFlow(parentFlow);
         finishExecutableNode(parentFlow);
 
@@ -530,6 +532,31 @@ public class FlowRunner extends EventHandler implements Runnable {
     return false;
   }
 
+  private void setFlowFailed(final ExecutableNode node) {
+    boolean shouldFail = true;
+    // As long as there is no outNodes or at least one outNode has conditionOnJobStatus of
+    // ALL_SUCCESS, we should set the flow to failed. Otherwise, it could still statisfy the
+    // condition of conditional workflows, so don't set the flow to failed.
+    for (final String outNodeId : node.getOutNodes()) {
+      if (node.getParentFlow().getExecutableNode(outNodeId).getConditionOnJobStatus()
+          .equals(ConditionOnJobStatus.ALL_SUCCESS)) {
+        shouldFail = true;
+        break;
+      } else {
+        shouldFail = false;
+      }
+    }
+
+    if (shouldFail) {
+      propagateStatus(node.getParentFlow(),
+          node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
+      if (this.failureAction == FailureAction.CANCEL_ALL) {
+        this.kill();
+      }
+      this.flowFailed = true;
+    }
+  }
+
   private boolean notReadyToRun(final Status status) {
     return Status.isStatusFinished(status)
         || Status.isStatusRunning(status)
@@ -861,23 +888,22 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Go through the node's dependencies. If all of the previous job's
     // statuses is finished and not FAILED or KILLED, than we can safely
     // run this job.
-    final ExecutableFlowBase flow = node.getParentFlow();
-    boolean shouldKill = false;
-    for (final String dependency : node.getInNodes()) {
-      final ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
-      final Status depStatus = dependencyNode.getStatus();
+    Status status = Status.READY;
 
-      if (!Status.isStatusFinished(depStatus)) {
+    // Check if condition on job status is satisfied
+    switch (checkConditionOnJobStatus(node)) {
+      case FAILED:
+        status = Status.CANCELLED;
+        break;
+      // Condition not satisfied yet, need to wait
+      case PENDING:
         return null;
-      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
-          || depStatus == Status.KILLED) {
-        // We propagate failures as KILLED states.
-        shouldKill = true;
-      }
+      default:
+        break;
     }
 
-    if (!isConditionMet(node)) {
-      return Status.CANCELLED;
+    if (!isConditionOnRuntimeVariableMet(node)) {
+      status = Status.CANCELLED;
     }
 
     // If it's disabled but ready to run, we want to make sure it continues
@@ -893,15 +919,14 @@ public class FlowRunner extends EventHandler implements Runnable {
     if (this.flowFailed
         && this.failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
       return Status.CANCELLED;
-    } else if (shouldKill || isKilled()) {
+    } else if (isKilled()) {
       return Status.CANCELLED;
     }
 
-    // All good to go, ready to run.
-    return Status.READY;
+    return status;
   }
 
-  private Boolean isConditionMet(final ExecutableNode node) {
+  private Boolean isConditionOnRuntimeVariableMet(final ExecutableNode node) {
     final String condition = node.getCondition();
     if (condition == null) {
       return true;
@@ -911,7 +936,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     String replaced = condition;
 
     while (matcher.find()) {
-      final String value = findValueForJobVariable(matcher.group(1), matcher.group(2));
+      final String value = findValueForJobVariable(node, matcher.group(1), matcher.group(2));
       if (value != null) {
         replaced = replaced.replace(matcher.group(), "'" + value + "'");
       }
@@ -922,7 +947,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     return evaluateExpression(replaced);
   }
 
-  private String findValueForJobVariable(final String jobName, final String variable) {
+  private String findValueForJobVariable(final ExecutableNode node, final String jobName, final
+  String variable) {
     // Get props from job props tmp file
     final File jobPropsFile = findPropsFileForJob(jobName + "_props");
     if (jobPropsFile != null) {
@@ -937,8 +963,16 @@ public class FlowRunner extends EventHandler implements Runnable {
       }
     }
 
+    // Todo jamiesjc: need to handle condition for embedded flows
     // Get job output props
-    final Props outputProps = this.getExecutableFlow().getExecutableNode(jobName).getOutputProps();
+    final ExecutableNode target = node.getParentFlow().getExecutableNode(jobName);
+    if (target == null) {
+      this.logger.error("Not able to load props from output props file, job name " + jobName
+          + " might be invalid.");
+      return null;
+    }
+
+    final Props outputProps = target.getOutputProps();
     if (outputProps != null && outputProps.containsKey(variable)) {
       return outputProps.get(variable);
     }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
index d33fb77..196a7fe 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -34,10 +34,12 @@ import org.junit.Test;
 public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
 
   private static final String FLOW_YAML_DIR = "conditionalflowyamltest";
-  private static final String FLOW_NAME_1 = "conditional_flow1";
-  private static final String FLOW_NAME_2 = "conditional_flow2";
-  private static final String FLOW_YAML_FILE_1 = FLOW_NAME_1 + ".flow";
-  private static final String FLOW_YAML_FILE_2 = FLOW_NAME_2 + ".flow";
+  private static final String CONDITIONAL_FLOW_1 = "conditional_flow1";
+  private static final String CONDITIONAL_FLOW_2 = "conditional_flow2";
+  private static final String CONDITIONAL_FLOW_3 = "conditional_flow3";
+  private static final String CONDITIONAL_FLOW_4 = "conditional_flow4";
+  private static final String CONDITIONAL_FLOW_5 = "conditional_flow5";
+  private static final String CONDITIONAL_FLOW_6 = "conditional_flow6";
   private FlowRunnerTestUtil testUtil;
   private Project project;
 
@@ -49,23 +51,10 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
 
   @Test
   public void runFlowOnJobPropsCondition() throws Exception {
-
-    when(this.testUtil.getProjectLoader()
-        .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_1))
-        .thenReturn(1);
-    when(this.testUtil.getProjectLoader()
-        .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
-            eq(FLOW_YAML_FILE_1),
-            eq(1), any(File.class)))
-        .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_1));
-
     final HashMap<String, String> flowProps = new HashMap<>();
     flowProps.put("azkaban.server.name", "foo");
-    final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_1, flowProps);
-    final ExecutableFlow flow = runner.getExecutableFlow();
-
-    FlowRunnerTestUtil.startThread(runner);
-
+    setUp(CONDITIONAL_FLOW_1, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
     assertStatus(flow, "jobA", Status.SUCCEEDED);
     assertStatus(flow, "jobB", Status.SUCCEEDED);
     assertStatus(flow, "jobC", Status.CANCELLED);
@@ -75,22 +64,9 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
 
   @Test
   public void runFlowOnJobOutputCondition() throws Exception {
-
-    when(this.testUtil.getProjectLoader()
-        .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_2))
-        .thenReturn(1);
-    when(this.testUtil.getProjectLoader()
-        .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
-            eq(FLOW_YAML_FILE_2),
-            eq(1), any(File.class)))
-        .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_2));
-
     final HashMap<String, String> flowProps = new HashMap<>();
-    final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_2, flowProps);
-    final ExecutableFlow flow = runner.getExecutableFlow();
-
-    FlowRunnerTestUtil.startThread(runner);
-
+    setUp(CONDITIONAL_FLOW_2, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
     assertStatus(flow, "jobA", Status.RUNNING);
     final Props generatedProperties = new Props();
     generatedProperties.put("key1", "value1");
@@ -102,4 +78,74 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
     assertStatus(flow, "jobD", Status.CANCELLED);
     assertFlowStatus(flow, Status.KILLED);
   }
+
+  @Test
+  public void runFlowOnJobStatusOneFailed() throws Exception {
+    final HashMap<String, String> flowProps = new HashMap<>();
+    setUp(CONDITIONAL_FLOW_3, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    InteractiveTestJob.getTestJob("jobA").failJob();
+    assertStatus(flow, "jobA", Status.FAILED);
+    assertStatus(flow, "jobB", Status.RUNNING);
+    assertStatus(flow, "jobC", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.SUCCEEDED);
+  }
+
+  @Test
+  public void runFlowOnJobStatusAllFailed() throws Exception {
+    final HashMap<String, String> flowProps = new HashMap<>();
+    setUp(CONDITIONAL_FLOW_4, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    InteractiveTestJob.getTestJob("jobA").failJob();
+    assertStatus(flow, "jobA", Status.FAILED);
+    assertStatus(flow, "jobB", Status.RUNNING);
+    assertStatus(flow, "jobC", Status.READY);
+    InteractiveTestJob.getTestJob("jobB").failJob();
+    assertStatus(flow, "jobB", Status.FAILED);
+    assertStatus(flow, "jobC", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.SUCCEEDED);
+  }
+
+  @Test
+  public void runFlowOnJobStatusOneSuccessAllDone() throws Exception {
+    final HashMap<String, String> flowProps = new HashMap<>();
+    setUp(CONDITIONAL_FLOW_5, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    InteractiveTestJob.getTestJob("jobA").succeedJob();
+    assertStatus(flow, "jobA", Status.SUCCEEDED);
+    assertStatus(flow, "jobB", Status.RUNNING);
+    assertStatus(flow, "jobC", Status.READY);
+    InteractiveTestJob.getTestJob("jobB").failJob();
+    assertStatus(flow, "jobB", Status.FAILED);
+    assertStatus(flow, "jobC", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.SUCCEEDED);
+  }
+
+  @Test
+  public void runFlowOnBothJobStatusAndPropsCondition() throws Exception {
+    final HashMap<String, String> flowProps = new HashMap<>();
+    flowProps.put("azkaban.server.name", "foo");
+    setUp(CONDITIONAL_FLOW_6, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    assertStatus(flow, "jobA", Status.SUCCEEDED);
+    assertStatus(flow, "jobB", Status.SUCCEEDED);
+    assertStatus(flow, "jobC", Status.CANCELLED);
+    assertStatus(flow, "jobD", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.SUCCEEDED);
+  }
+
+  private void setUp(final String flowName, final HashMap<String, String> flowProps)
+      throws Exception {
+    final String flowYamlFile = flowName + ".flow";
+    when(this.testUtil.getProjectLoader()
+        .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), flowYamlFile))
+        .thenReturn(1);
+    when(this.testUtil.getProjectLoader()
+        .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
+            eq(flowYamlFile),
+            eq(1), any(File.class)))
+        .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, flowYamlFile));
+    this.runner = this.testUtil.createFromFlowMap(flowName, flowProps);
+    FlowRunnerTestUtil.startThread(this.runner);
+  }
 }
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
new file mode 100644
index 0000000..d619bac
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
@@ -0,0 +1,25 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobC
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: one_failed
+
+    dependsOn:
+      - jobA
+      - jobB
+
+  - name: jobA
+    type: test
+    config:
+      seconds: 2
+
+  - name: jobB
+    type: test
+    config:
+      seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
new file mode 100644
index 0000000..9faef5a
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
@@ -0,0 +1,25 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobC
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: all_failed
+
+    dependsOn:
+      - jobA
+      - jobB
+
+  - name: jobA
+    type: test
+    config:
+      seconds: 2
+
+  - name: jobB
+    type: test
+    config:
+      seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
new file mode 100644
index 0000000..17f54be
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
@@ -0,0 +1,25 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobC
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: one_success_all_done
+
+    dependsOn:
+      - jobA
+      - jobB
+
+  - name: jobA
+    type: test
+    config:
+      seconds: 2
+
+  - name: jobB
+    type: test
+    config:
+      seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow6.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow6.flow
new file mode 100644
index 0000000..327a879
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow6.flow
@@ -0,0 +1,41 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobD
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: one_success && ${jobA:azkaban.server.name} == 'foo'
+
+    dependsOn:
+      - jobB
+      - jobC
+
+  - name: jobA
+    type: test
+    config:
+      fail: false
+      seconds: 0
+
+  - name: jobB
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobA:azkaban.server.name} == 'foo'
+
+    dependsOn:
+      - jobA
+
+  - name: jobC
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobA:azkaban.server.name} == 'bar'
+
+    dependsOn:
+      - jobA