Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index 6db3fb8..b143f84 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import azkaban.flow.ConditionOnJobStatus;
import azkaban.flow.Node;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -44,6 +45,7 @@ public class ExecutableNode {
public static final String OUTNODES_PARAM = "outNodes";
public static final String TYPE_PARAM = "type";
public static final String CONDITION_PARAM = "condition";
+ public static final String CONDITION_ON_JOB_STATUS_PARAM = "conditionOnJobStatus";
public static final String PROPS_SOURCE_PARAM = "propSource";
public static final String JOB_SOURCE_PARAM = "jobSource";
public static final String OUTPUT_PROPS_PARAM = "outputProps";
@@ -68,6 +70,7 @@ public class ExecutableNode {
private long delayExecution = 0;
private ArrayList<ExecutionAttempt> pastAttempts = null;
private String condition;
+ private ConditionOnJobStatus conditionOnJobStatus = ConditionOnJobStatus.ALL_SUCCESS;
// Transient. These values aren't saved, but rediscovered.
private ExecutableFlowBase parentFlow;
@@ -79,18 +82,20 @@ public class ExecutableNode {
}
public ExecutableNode(final Node node, final ExecutableFlowBase parent) {
- this(node.getId(), node.getType(), node.getCondition(), node.getJobSource(), node
+ this(node.getId(), node.getType(), node.getCondition(), node.getConditionOnJobStatus(), node
+ .getJobSource(), node
.getPropsSource(), parent);
}
- public ExecutableNode(final String id, final String type, final String condition, final String
- jobSource,
+ public ExecutableNode(final String id, final String type, final String condition,
+ final ConditionOnJobStatus conditionOnJobStatus, final String jobSource,
final String propsSource, final ExecutableFlowBase parent) {
this.id = id;
this.jobSource = jobSource;
this.propsSource = propsSource;
this.type = type;
this.condition = condition;
+ this.conditionOnJobStatus = conditionOnJobStatus;
setParentFlow(parent);
}
@@ -289,6 +294,7 @@ public class ExecutableNode {
objMap.put(UPDATETIME_PARAM, this.updateTime);
objMap.put(TYPE_PARAM, this.type);
objMap.put(CONDITION_PARAM, this.condition);
+ objMap.put(CONDITION_ON_JOB_STATUS_PARAM, this.conditionOnJobStatus.toString());
objMap.put(ATTEMPT_PARAM, this.attempt);
if (this.inNodes != null && !this.inNodes.isEmpty()) {
@@ -324,6 +330,8 @@ public class ExecutableNode {
this.id = wrappedMap.getString(ID_PARAM);
this.type = wrappedMap.getString(TYPE_PARAM);
this.condition = wrappedMap.getString(CONDITION_PARAM);
+ this.conditionOnJobStatus = ConditionOnJobStatus.fromString(wrappedMap.getString
+ (CONDITION_ON_JOB_STATUS_PARAM));
this.status = Status.valueOf(wrappedMap.getString(STATUS_PARAM));
this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
@@ -468,4 +476,12 @@ public class ExecutableNode {
public void setCondition(final String condition) {
this.condition = condition;
}
+
+ public ConditionOnJobStatus getConditionOnJobStatus() {
+ return this.conditionOnJobStatus;
+ }
+
+ public void setConditionOnJobStatus(final ConditionOnJobStatus conditionOnJobStatus) {
+ this.conditionOnJobStatus = conditionOnJobStatus;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/Status.java b/azkaban-common/src/main/java/azkaban/executor/Status.java
index bd7eac3..3ca64c3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Status.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Status.java
@@ -74,6 +74,28 @@ public enum Status {
}
}
+ public static boolean isStatusFailed(final Status status) {
+ switch (status) {
+ case FAILED:
+ case KILLED:
+ case CANCELLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public static boolean isStatusSucceeded(final Status status) {
+ switch (status) {
+ case SUCCEEDED:
+ case FAILED_SUCCEEDED:
+ case SKIPPED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
public int getNumVal() {
return this.numVal;
}
diff --git a/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java b/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
new file mode 100644
index 0000000..8f9d0a2
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
@@ -0,0 +1,47 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.flow;
+
+public enum ConditionOnJobStatus {
+ ALL_SUCCESS("all_success"),
+ ALL_FAILED("all_failed"),
+ ALL_DONE("all_done"),
+ ONE_FAILED("one_failed"),
+ ONE_SUCCESS("one_success"),
+ ONE_FAILED_ALL_DONE("one_failed_all_done"),
+ ONE_SUCCESS_ALL_DONE("one_success_all_done");
+
+ private final String condition;
+
+ ConditionOnJobStatus(final String condition) {
+ this.condition = condition;
+ }
+
+ public static ConditionOnJobStatus fromString(final String condition) {
+ for (final ConditionOnJobStatus conditionOnJobStatus : ConditionOnJobStatus.values()) {
+ if (conditionOnJobStatus.condition.equalsIgnoreCase(condition)) {
+ return conditionOnJobStatus;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return this.condition;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/flow/Node.java b/azkaban-common/src/main/java/azkaban/flow/Node.java
index 71b68a3..cf16dc7 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Node.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Node.java
@@ -36,6 +36,8 @@ public class Node {
private String condition = null;
+ private ConditionOnJobStatus conditionOnJobStatus = ConditionOnJobStatus.ALL_SUCCESS;
+
public Node(final String id) {
this.id = id;
}
@@ -191,4 +193,12 @@ public class Node {
public void setCondition(final String condition) {
this.condition = condition;
}
+
+ public ConditionOnJobStatus getConditionOnJobStatus() {
+ return this.conditionOnJobStatus;
+ }
+
+ public void setConditionOnJobStatus(final ConditionOnJobStatus conditionOnJobStatus) {
+ this.conditionOnJobStatus = conditionOnJobStatus;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index f4c6765..2af9ac8 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -17,6 +17,7 @@
package azkaban.project;
import azkaban.Constants;
+import azkaban.flow.ConditionOnJobStatus;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
@@ -33,6 +34,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,6 +168,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
final Node node = new Node(azkabanNode.getName());
node.setType(azkabanNode.getType());
node.setCondition(azkabanNode.getCondition());
+ setConditionOnJobStatus(node);
node.setPropsSource(flowFile.getName());
node.setJobSource(flowFile.getName());
@@ -217,4 +222,25 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
}
+ private void setConditionOnJobStatus(final Node node) {
+ String condition = node.getCondition();
+ if (condition != null) {
+ // Only values in the ConditionOnJobStatus enum can be matched by this pattern. Some examples:
+ // Valid: all_done, one_success && ${jobA: param1} == 1, ALL_FAILED
+ // Invalid: two_success, one_faileddd, {one_failed}
+ final String patternString =
+ "(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b";
+ final Pattern pattern = Pattern.compile(patternString);
+ final Matcher matcher = pattern.matcher(condition);
+
+ // Todo jamiesjc: need to add validation for condition
+ while (matcher.find()) {
+ logger.info("Found conditionOnJobStatus: " + matcher.group(1));
+ node.setConditionOnJobStatus(ConditionOnJobStatus.fromString(matcher.group(1)));
+ condition = condition.replace(matcher.group(1), "true");
+ logger.info("condition is " + condition);
+ node.setCondition(condition);
+ }
+ }
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 1e0eb37..978b4ea 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -42,13 +42,10 @@ public class DirectoryYamlFlowLoaderTest {
private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
- private static final String CONDITIONAL_FLOW_YAML_DIR = "conditionalflowyamltest";
private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
private static final String BASIC_FLOW_1 = "basic_flow";
private static final String BASIC_FLOW_2 = "basic_flow2";
private static final String EMBEDDED_FLOW = "embedded_flow";
- private static final String CONDITIONAL_FLOW_1 = "conditional_flow1";
- private static final String CONDITIONAL_FLOW_2 = "conditional_flow2";
private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
"embedded_flow1";
private static final String EMBEDDED_FLOW_2 =
@@ -171,16 +168,6 @@ public class DirectoryYamlFlowLoaderTest {
checkFlowLoaderProperties(loader, 0, 0, 0);
}
- @Test
- public void testLoadConditionalWorkflowYamlFile() {
- final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
- loader.loadProjectFlow(this.project,
- ExecutionsTestUtil.getFlowDir(CONDITIONAL_FLOW_YAML_DIR));
- checkFlowLoaderProperties(loader, 0, 2, 2);
- checkFlowProperties(loader, CONDITIONAL_FLOW_1, 0, 4, 1, 4, null);
- checkFlowProperties(loader, CONDITIONAL_FLOW_2, 0, 4, 1, 4, null);
- }
-
private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
final int numFlowMap, final int numEdgeMap) {
assertThat(loader.getErrors().size()).isEqualTo(numError);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
new file mode 100644
index 0000000..0c0d2b2
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
@@ -0,0 +1,101 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.execapp;
+
+import static azkaban.flow.ConditionOnJobStatus.ALL_FAILED;
+import static azkaban.flow.ConditionOnJobStatus.ALL_SUCCESS;
+import static azkaban.flow.ConditionOnJobStatus.ONE_FAILED;
+import static azkaban.flow.ConditionOnJobStatus.ONE_FAILED_ALL_DONE;
+import static azkaban.flow.ConditionOnJobStatus.ONE_SUCCESS;
+import static azkaban.flow.ConditionOnJobStatus.ONE_SUCCESS_ALL_DONE;
+
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import azkaban.flow.ConditionOnJobStatus;
+
+public class ConditionalWorkflowUtils {
+
+ public static final String SATISFIED = "satisfied";
+ public static final String PENDING = "pending";
+ public static final String FAILED = "failed";
+
+ public static String checkConditionOnJobStatus(final ExecutableNode node) {
+ final ConditionOnJobStatus conditionOnJobStatus = node.getConditionOnJobStatus();
+ switch (conditionOnJobStatus) {
+ case ALL_SUCCESS:
+ case ALL_FAILED:
+ case ALL_DONE:
+ return checkAllStatus(node, conditionOnJobStatus);
+ case ONE_FAILED:
+ case ONE_SUCCESS:
+ return checkOneStatus(node, conditionOnJobStatus);
+ case ONE_FAILED_ALL_DONE:
+ case ONE_SUCCESS_ALL_DONE:
+ return checkOneStatusAllDone(node, conditionOnJobStatus);
+ default:
+ return checkAllStatus(node, ALL_SUCCESS);
+ }
+ }
+
+ private static String checkAllStatus(final ExecutableNode node, final ConditionOnJobStatus
+ condition) {
+ String result = SATISFIED;
+ for (final String dependency : node.getInNodes()) {
+ final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
+ final Status depStatus = dependencyNode.getStatus();
+ if (!Status.isStatusFinished(depStatus)) {
+ return PENDING;
+ } else if ((condition.equals(ALL_SUCCESS) && Status.isStatusFailed(depStatus)) ||
+ (condition.equals(ALL_FAILED) && Status.isStatusSucceeded(depStatus))) {
+ result = FAILED;
+ }
+ }
+ return result;
+ }
+
+ private static String checkOneStatus(final ExecutableNode node, final ConditionOnJobStatus
+ condition) {
+ boolean finished = true;
+ for (final String dependency : node.getInNodes()) {
+ final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
+ final Status depStatus = dependencyNode.getStatus();
+ if ((condition.equals(ONE_SUCCESS) && Status.isStatusSucceeded(depStatus)) ||
+ (condition.equals(ONE_FAILED) && Status.isStatusFailed(depStatus))) {
+ return SATISFIED;
+ } else if (!Status.isStatusFinished(depStatus)) {
+ finished = false;
+ }
+ }
+ return finished ? FAILED : PENDING;
+ }
+
+ private static String checkOneStatusAllDone(final ExecutableNode node, final ConditionOnJobStatus
+ condition) {
+ String result = FAILED;
+ for (final String dependency : node.getInNodes()) {
+ final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
+ final Status depStatus = dependencyNode.getStatus();
+ if (!Status.isStatusFinished(depStatus)) {
+ return PENDING;
+ } else if ((condition.equals(ONE_SUCCESS_ALL_DONE) && Status.isStatusSucceeded(depStatus)) ||
+ (condition.equals(ONE_FAILED_ALL_DONE) && Status.isStatusFailed(depStatus))) {
+ result = SATISFIED;
+ }
+ }
+ return result;
+ }
+
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 54b4342..df5f3c5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -17,6 +17,9 @@
package azkaban.execapp;
import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
+import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
+import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
import azkaban.Constants;
import azkaban.Constants.JobProperties;
@@ -38,6 +41,7 @@ import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
+import azkaban.flow.ConditionOnJobStatus;
import azkaban.flow.FlowProps;
import azkaban.flow.FlowUtils;
import azkaban.jobExecutor.ProcessJob;
@@ -477,12 +481,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// The job cannot be retried or has run out of retry attempts. We will
// fail the job and its flow now.
if (!retryJobIfPossible(node)) {
- propagateStatus(node.getParentFlow(),
- node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
- if (this.failureAction == FailureAction.CANCEL_ALL) {
- this.kill();
- }
- this.flowFailed = true;
+ setFlowFailed(node);
} else {
nodesToCheck.add(node);
continue;
@@ -490,6 +489,9 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (outNodeIds.isEmpty() && isFlowReadytoFinalize(parentFlow)) {
+ // Todo jamiesjc: For conditional workflows, if conditionOnJobStatus is ONE_SUCCESS or
+ // ONE_FAILED, some jobs might still be running when the end nodes have finished. In this
+ // case, we need to kill all running jobs before finalizing the flow.
finalizeFlow(parentFlow);
finishExecutableNode(parentFlow);
@@ -530,6 +532,31 @@ public class FlowRunner extends EventHandler implements Runnable {
return false;
}
+ private void setFlowFailed(final ExecutableNode node) {
+ boolean shouldFail = true;
+ // As long as there is no outNodes or at least one outNode has conditionOnJobStatus of
+ // ALL_SUCCESS, we should set the flow to failed. Otherwise, it could still statisfy the
+ // condition of conditional workflows, so don't set the flow to failed.
+ for (final String outNodeId : node.getOutNodes()) {
+ if (node.getParentFlow().getExecutableNode(outNodeId).getConditionOnJobStatus()
+ .equals(ConditionOnJobStatus.ALL_SUCCESS)) {
+ shouldFail = true;
+ break;
+ } else {
+ shouldFail = false;
+ }
+ }
+
+ if (shouldFail) {
+ propagateStatus(node.getParentFlow(),
+ node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
+ if (this.failureAction == FailureAction.CANCEL_ALL) {
+ this.kill();
+ }
+ this.flowFailed = true;
+ }
+ }
+
private boolean notReadyToRun(final Status status) {
return Status.isStatusFinished(status)
|| Status.isStatusRunning(status)
@@ -861,23 +888,22 @@ public class FlowRunner extends EventHandler implements Runnable {
// Go through the node's dependencies. If all of the previous job's
// statuses is finished and not FAILED or KILLED, than we can safely
// run this job.
- final ExecutableFlowBase flow = node.getParentFlow();
- boolean shouldKill = false;
- for (final String dependency : node.getInNodes()) {
- final ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
- final Status depStatus = dependencyNode.getStatus();
+ Status status = Status.READY;
- if (!Status.isStatusFinished(depStatus)) {
+ // Check if condition on job status is satisfied
+ switch (checkConditionOnJobStatus(node)) {
+ case FAILED:
+ status = Status.CANCELLED;
+ break;
+ // Condition not satisfied yet, need to wait
+ case PENDING:
return null;
- } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
- || depStatus == Status.KILLED) {
- // We propagate failures as KILLED states.
- shouldKill = true;
- }
+ default:
+ break;
}
- if (!isConditionMet(node)) {
- return Status.CANCELLED;
+ if (!isConditionOnRuntimeVariableMet(node)) {
+ status = Status.CANCELLED;
}
// If it's disabled but ready to run, we want to make sure it continues
@@ -893,15 +919,14 @@ public class FlowRunner extends EventHandler implements Runnable {
if (this.flowFailed
&& this.failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
return Status.CANCELLED;
- } else if (shouldKill || isKilled()) {
+ } else if (isKilled()) {
return Status.CANCELLED;
}
- // All good to go, ready to run.
- return Status.READY;
+ return status;
}
- private Boolean isConditionMet(final ExecutableNode node) {
+ private Boolean isConditionOnRuntimeVariableMet(final ExecutableNode node) {
final String condition = node.getCondition();
if (condition == null) {
return true;
@@ -911,7 +936,7 @@ public class FlowRunner extends EventHandler implements Runnable {
String replaced = condition;
while (matcher.find()) {
- final String value = findValueForJobVariable(matcher.group(1), matcher.group(2));
+ final String value = findValueForJobVariable(node, matcher.group(1), matcher.group(2));
if (value != null) {
replaced = replaced.replace(matcher.group(), "'" + value + "'");
}
@@ -922,7 +947,8 @@ public class FlowRunner extends EventHandler implements Runnable {
return evaluateExpression(replaced);
}
- private String findValueForJobVariable(final String jobName, final String variable) {
+ private String findValueForJobVariable(final ExecutableNode node, final String jobName, final
+ String variable) {
// Get props from job props tmp file
final File jobPropsFile = findPropsFileForJob(jobName + "_props");
if (jobPropsFile != null) {
@@ -937,8 +963,16 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ // Todo jamiesjc: need to handle condition for embedded flows
// Get job output props
- final Props outputProps = this.getExecutableFlow().getExecutableNode(jobName).getOutputProps();
+ final ExecutableNode target = node.getParentFlow().getExecutableNode(jobName);
+ if (target == null) {
+ this.logger.error("Not able to load props from output props file, job name " + jobName
+ + " might be invalid.");
+ return null;
+ }
+
+ final Props outputProps = target.getOutputProps();
if (outputProps != null && outputProps.containsKey(variable)) {
return outputProps.get(variable);
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
index d33fb77..196a7fe 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -34,10 +34,12 @@ import org.junit.Test;
public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
private static final String FLOW_YAML_DIR = "conditionalflowyamltest";
- private static final String FLOW_NAME_1 = "conditional_flow1";
- private static final String FLOW_NAME_2 = "conditional_flow2";
- private static final String FLOW_YAML_FILE_1 = FLOW_NAME_1 + ".flow";
- private static final String FLOW_YAML_FILE_2 = FLOW_NAME_2 + ".flow";
+ private static final String CONDITIONAL_FLOW_1 = "conditional_flow1";
+ private static final String CONDITIONAL_FLOW_2 = "conditional_flow2";
+ private static final String CONDITIONAL_FLOW_3 = "conditional_flow3";
+ private static final String CONDITIONAL_FLOW_4 = "conditional_flow4";
+ private static final String CONDITIONAL_FLOW_5 = "conditional_flow5";
+ private static final String CONDITIONAL_FLOW_6 = "conditional_flow6";
private FlowRunnerTestUtil testUtil;
private Project project;
@@ -49,23 +51,10 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
@Test
public void runFlowOnJobPropsCondition() throws Exception {
-
- when(this.testUtil.getProjectLoader()
- .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_1))
- .thenReturn(1);
- when(this.testUtil.getProjectLoader()
- .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
- eq(FLOW_YAML_FILE_1),
- eq(1), any(File.class)))
- .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_1));
-
final HashMap<String, String> flowProps = new HashMap<>();
flowProps.put("azkaban.server.name", "foo");
- final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_1, flowProps);
- final ExecutableFlow flow = runner.getExecutableFlow();
-
- FlowRunnerTestUtil.startThread(runner);
-
+ setUp(CONDITIONAL_FLOW_1, flowProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
assertStatus(flow, "jobA", Status.SUCCEEDED);
assertStatus(flow, "jobB", Status.SUCCEEDED);
assertStatus(flow, "jobC", Status.CANCELLED);
@@ -75,22 +64,9 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
@Test
public void runFlowOnJobOutputCondition() throws Exception {
-
- when(this.testUtil.getProjectLoader()
- .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_2))
- .thenReturn(1);
- when(this.testUtil.getProjectLoader()
- .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
- eq(FLOW_YAML_FILE_2),
- eq(1), any(File.class)))
- .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_2));
-
final HashMap<String, String> flowProps = new HashMap<>();
- final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_2, flowProps);
- final ExecutableFlow flow = runner.getExecutableFlow();
-
- FlowRunnerTestUtil.startThread(runner);
-
+ setUp(CONDITIONAL_FLOW_2, flowProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
assertStatus(flow, "jobA", Status.RUNNING);
final Props generatedProperties = new Props();
generatedProperties.put("key1", "value1");
@@ -102,4 +78,74 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
assertStatus(flow, "jobD", Status.CANCELLED);
assertFlowStatus(flow, Status.KILLED);
}
+
+ @Test
+ public void runFlowOnJobStatusOneFailed() throws Exception {
+ final HashMap<String, String> flowProps = new HashMap<>();
+ setUp(CONDITIONAL_FLOW_3, flowProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
+ InteractiveTestJob.getTestJob("jobA").failJob();
+ assertStatus(flow, "jobA", Status.FAILED);
+ assertStatus(flow, "jobB", Status.RUNNING);
+ assertStatus(flow, "jobC", Status.SUCCEEDED);
+ assertFlowStatus(flow, Status.SUCCEEDED);
+ }
+
+ @Test
+ public void runFlowOnJobStatusAllFailed() throws Exception {
+ final HashMap<String, String> flowProps = new HashMap<>();
+ setUp(CONDITIONAL_FLOW_4, flowProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
+ InteractiveTestJob.getTestJob("jobA").failJob();
+ assertStatus(flow, "jobA", Status.FAILED);
+ assertStatus(flow, "jobB", Status.RUNNING);
+ assertStatus(flow, "jobC", Status.READY);
+ InteractiveTestJob.getTestJob("jobB").failJob();
+ assertStatus(flow, "jobB", Status.FAILED);
+ assertStatus(flow, "jobC", Status.SUCCEEDED);
+ assertFlowStatus(flow, Status.SUCCEEDED);
+ }
+
+ @Test
+ public void runFlowOnJobStatusOneSuccessAllDone() throws Exception {
+ final HashMap<String, String> flowProps = new HashMap<>();
+ setUp(CONDITIONAL_FLOW_5, flowProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
+ InteractiveTestJob.getTestJob("jobA").succeedJob();
+ assertStatus(flow, "jobA", Status.SUCCEEDED);
+ assertStatus(flow, "jobB", Status.RUNNING);
+ assertStatus(flow, "jobC", Status.READY);
+ InteractiveTestJob.getTestJob("jobB").failJob();
+ assertStatus(flow, "jobB", Status.FAILED);
+ assertStatus(flow, "jobC", Status.SUCCEEDED);
+ assertFlowStatus(flow, Status.SUCCEEDED);
+ }
+
+ @Test
+ public void runFlowOnBothJobStatusAndPropsCondition() throws Exception {
+ final HashMap<String, String> flowProps = new HashMap<>();
+ flowProps.put("azkaban.server.name", "foo");
+ setUp(CONDITIONAL_FLOW_6, flowProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
+ assertStatus(flow, "jobA", Status.SUCCEEDED);
+ assertStatus(flow, "jobB", Status.SUCCEEDED);
+ assertStatus(flow, "jobC", Status.CANCELLED);
+ assertStatus(flow, "jobD", Status.SUCCEEDED);
+ assertFlowStatus(flow, Status.SUCCEEDED);
+ }
+
+ private void setUp(final String flowName, final HashMap<String, String> flowProps)
+ throws Exception {
+ final String flowYamlFile = flowName + ".flow";
+ when(this.testUtil.getProjectLoader()
+ .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), flowYamlFile))
+ .thenReturn(1);
+ when(this.testUtil.getProjectLoader()
+ .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
+ eq(flowYamlFile),
+ eq(1), any(File.class)))
+ .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, flowYamlFile));
+ this.runner = this.testUtil.createFromFlowMap(flowName, flowProps);
+ FlowRunnerTestUtil.startThread(this.runner);
+ }
}
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
new file mode 100644
index 0000000..d619bac
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
@@ -0,0 +1,25 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobC
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: one_failed
+
+ dependsOn:
+ - jobA
+ - jobB
+
+ - name: jobA
+ type: test
+ config:
+ seconds: 2
+
+ - name: jobB
+ type: test
+ config:
+ seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
new file mode 100644
index 0000000..9faef5a
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
@@ -0,0 +1,25 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobC
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: all_failed
+
+ dependsOn:
+ - jobA
+ - jobB
+
+ - name: jobA
+ type: test
+ config:
+ seconds: 2
+
+ - name: jobB
+ type: test
+ config:
+ seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
new file mode 100644
index 0000000..17f54be
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
@@ -0,0 +1,25 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobC
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: one_success_all_done
+
+ dependsOn:
+ - jobA
+ - jobB
+
+ - name: jobA
+ type: test
+ config:
+ seconds: 2
+
+ - name: jobB
+ type: test
+ config:
+ seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow6.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow6.flow
new file mode 100644
index 0000000..327a879
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow6.flow
@@ -0,0 +1,41 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobD
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: one_success && ${jobA:azkaban.server.name} == 'foo'
+
+ dependsOn:
+ - jobB
+ - jobC
+
+ - name: jobA
+ type: test
+ config:
+ fail: false
+ seconds: 0
+
+ - name: jobB
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobA:azkaban.server.name} == 'foo'
+
+ dependsOn:
+ - jobA
+
+ - name: jobC
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobA:azkaban.server.name} == 'bar'
+
+ dependsOn:
+ - jobA