Details
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index 35168af..975ce64 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -50,6 +50,7 @@ public class Flow {
private boolean isLayedOut = false;
private boolean isEmbeddedFlow = false;
private double azkabanFlowVersion = Constants.DEFAULT_AZKABAN_FLOW_VERSION;
+ private String condition = null;
public Flow(final String id) {
this.id = id;
@@ -62,6 +63,8 @@ public class Flow {
final Boolean layedout = (Boolean) flowObject.get("layedout");
final Boolean isEmbeddedFlow = (Boolean) flowObject.get("embeddedFlow");
final Double azkabanFlowVersion = (Double) flowObject.get("azkabanFlowVersion");
+ final String condition = (String) flowObject.get("condition");
+
final Flow flow = new Flow(id);
if (layedout != null) {
flow.setLayedOut(layedout);
@@ -75,6 +78,10 @@ public class Flow {
flow.setAzkabanFlowVersion(azkabanFlowVersion);
}
+ if (condition != null) {
+ flow.setCondition(condition);
+ }
+
final int projId = (Integer) flowObject.get("project.id");
flow.setProjectId(projId);
@@ -171,28 +178,28 @@ public class Flow {
}
}
- setLevelsAndEdgeNodes(new HashSet<>(startNodes), 0);
+ setLevelsAndEdgeNodes(new HashSet<>(this.startNodes), 0);
}
}
- private void setLevelsAndEdgeNodes(final Set<Node> levelNodes, int level) {
+ private void setLevelsAndEdgeNodes(final Set<Node> levelNodes, final int level) {
final Set<Node> nextLevelNodes = new HashSet<>();
- for (Node node : levelNodes) {
+ for (final Node node : levelNodes) {
node.setLevel(level);
- final Set<Edge> edges = outEdges.get(node.getId());
+ final Set<Edge> edges = this.outEdges.get(node.getId());
if (edges != null) {
edges.forEach(edge -> {
edge.setSource(node);
- edge.setTarget(nodes.get(edge.getTargetId()));
+ edge.setTarget(this.nodes.get(edge.getTargetId()));
nextLevelNodes.add(edge.getTarget());
});
}
}
- numLevels = level;
+ this.numLevels = level;
if (!nextLevelNodes.isEmpty()) {
setLevelsAndEdgeNodes(nextLevelNodes, level + 1);
@@ -339,6 +346,8 @@ public class Flow {
flowObj.put("layedout", this.isLayedOut);
flowObj.put("embeddedFlow", this.isEmbeddedFlow);
flowObj.put("azkabanFlowVersion", this.azkabanFlowVersion);
+ flowObj.put("condition", this.condition);
+
if (this.errors != null) {
flowObj.put("errors", this.errors);
}
@@ -404,6 +413,14 @@ public class Flow {
this.azkabanFlowVersion = azkabanFlowVersion;
}
+ public String getCondition() {
+ return this.condition;
+ }
+
+ public void setCondition(final String condition) {
+ this.condition = condition;
+ }
+
public Map<String, Object> getMetadata() {
if (this.metadata == null) {
this.metadata = new HashMap<>();
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 86156d5..ac705e2 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -48,10 +48,10 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
// Pattern to match job variables in condition expressions: ${jobName:variable}
public static final Pattern CONDITION_VARIABLE_REPLACEMENT_PATTERN = Pattern
.compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
- private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
// Pattern to match conditionOnJobStatus macros, e.g. one_success, all_done
- private static final Pattern CONDITION_ON_JOB_STATUS_PATTERN =
+ public static final Pattern CONDITION_ON_JOB_STATUS_PATTERN =
Pattern.compile("(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b");
+ private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
// Pattern to match a number or a string, e.g. 1234, "hello", 'foo'
private static final Pattern DIGIT_STRING_PATTERN = Pattern.compile("\\d+|'.*'|\".*\"");
// Valid operators in condition expressions: &&, ||, ==, !=, >, >=, <, <=
@@ -178,6 +178,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
final Node node = new Node(azkabanNode.getName());
node.setType(azkabanNode.getType());
validateCondition(node, azkabanNode, azkabanFlow);
+ node.setCondition(azkabanNode.getCondition());
node.setPropsSource(flowFile.getName());
node.setJobSource(flowFile.getName());
@@ -187,6 +188,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
final Flow flowNode = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode, embeddedFlowId,
flowFile);
flowNode.setEmbeddedFlow(true);
+ flowNode.setCondition(node.getCondition());
this.flowMap.put(flowNode.getId(), flowNode);
}
@@ -233,9 +235,8 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
private void validateCondition(final Node node, final AzkabanNode azkabanNode,
final AzkabanFlow azkabanFlow) {
- boolean valid = true;
boolean foundConditionOnJobStatus = false;
- String condition = azkabanNode.getCondition();
+ final String condition = azkabanNode.getCondition();
if (condition == null) {
return;
}
@@ -252,11 +253,9 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
if (foundConditionOnJobStatus) {
this.errors.add("Invalid condition for " + node.getId()
+ ": cannot combine more than one conditionOnJobStatus macros.");
- valid = false;
}
foundConditionOnJobStatus = true;
node.setConditionOnJobStatus(ConditionOnJobStatus.fromString(matcher.group(1)));
- condition = condition.replace(matcher.group(1), "true");
} else {
if (operands[i].startsWith("!")) {
// Remove the operator '!' from the operand.
@@ -265,25 +264,15 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
if (operands[i].equals("")) {
this.errors
.add("Invalid condition for " + node.getId() + ": operand is an empty string.");
- valid = false;
- } else if (!DIGIT_STRING_PATTERN.matcher(operands[i]).matches() &&
- !isValidVariableSubstitution(operands[i], azkabanNode, azkabanFlow)) {
- valid = false;
+ } else if (!DIGIT_STRING_PATTERN.matcher(operands[i]).matches()) {
+ validateVariableSubstitution(operands[i], azkabanNode, azkabanFlow);
}
}
}
- if (valid) {
- node.setCondition(condition);
- } else {
- this.logger.info("Condition of " + node.getId() + ": " + azkabanNode.getCondition()
- + " is invalid, set it to false");
- node.setCondition("false");
- }
}
- private boolean isValidVariableSubstitution(final String operand, final AzkabanNode azkabanNode,
+ private void validateVariableSubstitution(final String operand, final AzkabanNode azkabanNode,
final AzkabanFlow azkabanFlow) {
- boolean result = false;
final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(operand);
if (matcher.matches()) {
final String jobName = matcher.group(1);
@@ -296,14 +285,11 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
else if (isDescendantNode(conditionNode, azkabanNode, azkabanFlow)) {
this.errors.add("Invalid condition for " + azkabanNode.getName()
+ ": should not define condition on its descendant node " + jobName + ".");
- } else {
- result = true;
}
} else {
this.errors.add("Invalid condition for " + azkabanNode.getName()
+ ": cannot resolve the condition. Please check the syntax for supported conditions.");
}
- return result;
}
private boolean isDescendantNode(final AzkabanNode current, final AzkabanNode target,
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index acecbda..3eb7012 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -44,6 +44,7 @@ public class DirectoryYamlFlowLoaderTest {
private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
+ private static final String CONDITION_YAML_DIR = "conditionalflowyamltest";
private static final String INVALID_CONDITION_YAML_DIR = "invalidconditionalflowyamltest";
private static final String BASIC_FLOW_1 = "basic_flow";
private static final String BASIC_FLOW_2 = "basic_flow2";
@@ -59,6 +60,7 @@ public class DirectoryYamlFlowLoaderTest {
private static final String EMBEDDED_FLOW_B2 =
"embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
+ "embedded_flow2";
+ private static final String CONDITIONAL_FLOW = "conditional_flow6";
private static final String DUPLICATE_NODENAME_FLOW_FILE = "duplicate_nodename.flow";
private static final String DEPENDENCY_UNDEFINED_FLOW_FILE = "dependency_undefined.flow";
private static final String CYCLE_FOUND_FLOW = "cycle_found";
@@ -171,6 +173,21 @@ public class DirectoryYamlFlowLoaderTest {
}
@Test
+ public void testFlowYamlFileWithValidCondition() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(CONDITION_YAML_DIR));
+ assertThat(loader.getFlowMap().containsKey(CONDITIONAL_FLOW)).isTrue();
+ checkFlowProperties(loader, CONDITIONAL_FLOW, 0, 4, 1, 4, null);
+ assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobA").getCondition()).isNull();
+ assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobB").getCondition())
+ .isEqualTo("${jobA:props} == 'foo'");
+ assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobC").getCondition())
+ .isEqualTo("${jobA:props} == 'bar'");
+ assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobD").getCondition())
+ .isEqualTo("one_success && ${jobA:props} == 'foo'");
+ }
+
+ @Test
public void testFlowYamlFileWithInvalidConditions() {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_CONDITION_YAML_DIR));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1ba50c8..27981e7 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -20,6 +20,7 @@ import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
+import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_ON_JOB_STATUS_PATTERN;
import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_VARIABLE_REPLACEMENT_PATTERN;
import azkaban.Constants;
@@ -878,6 +879,8 @@ public class FlowRunner extends EventHandler implements Runnable {
// Check if condition on job status is satisfied
switch (checkConditionOnJobStatus(node)) {
case FAILED:
+ this.logger.info("Condition on job status: " + node.getConditionOnJobStatus() + " is "
+ + "evaluated to false for " + node.getId());
status = Status.CANCELLED;
break;
// Condition not satisfied yet, need to wait
@@ -887,7 +890,7 @@ public class FlowRunner extends EventHandler implements Runnable {
break;
}
- if (!isConditionOnRuntimeVariableMet(node)) {
+ if (status != Status.CANCELLED && !isConditionOnRuntimeVariableMet(node)) {
status = Status.CANCELLED;
}
@@ -917,15 +920,24 @@ public class FlowRunner extends EventHandler implements Runnable {
return true;
}
- final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
String replaced = condition;
+ // Replace the condition on job status macro with "true" to skip the evaluation by Script
+ // Engine since it has already been evaluated.
+ final Matcher jobStatusMatcher = CONDITION_ON_JOB_STATUS_PATTERN.matcher
+ (condition);
+ if (jobStatusMatcher.find()) {
+ replaced = condition.replace(jobStatusMatcher.group(1), "true");
+ }
+
+ final Matcher variableMatcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(replaced);
- while (matcher.find()) {
- final String value = findValueForJobVariable(node, matcher.group(1), matcher.group(2));
+ while (variableMatcher.find()) {
+ final String value = findValueForJobVariable(node, variableMatcher.group(1),
+ variableMatcher.group(2));
if (value != null) {
- replaced = replaced.replace(matcher.group(), "'" + value + "'");
+ replaced = replaced.replace(variableMatcher.group(), "'" + value + "'");
}
- this.logger.info("Condition is " + replaced);
+ this.logger.info("Resolved condition of " + node.getId() + " is " + replaced);
}
// Evaluate string expression using script engine
@@ -971,10 +983,10 @@ public class FlowRunner extends EventHandler implements Runnable {
result = (boolean) object;
}
} catch (final Exception e) {
- this.logger.error("Failed to evaluate the expression.", e);
+ this.logger.error("Failed to evaluate the condition.", e);
}
- this.logger.info("Evaluate expression result: " + result);
+ this.logger.info("Condition is evaluated to " + result);
return result;
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 0f68b60..b384c66 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -365,15 +365,17 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
throws ServletException {
final String flowName = getParam(req, "flow");
- Flow flow = null;
try {
- flow = project.getFlow(flowName);
+ final Flow flow = project.getFlow(flowName);
if (flow == null) {
ret.put("error", "Flow " + flowName + " not found.");
return;
}
ret.put("jobTypes", getFlowJobTypes(flow));
+ if (flow.getCondition() != null) {
+ ret.put("condition", flow.getCondition());
+ }
} catch (final AccessControlException e) {
ret.put("error", e.getMessage());
}
@@ -1355,6 +1357,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.add("jobid", node.getId());
page.add("jobtype", node.getType());
+ if (node.getCondition() != null) {
+ page.add("condition", node.getCondition());
+ }
final ArrayList<String> dependencies = new ArrayList<>();
final Set<Edge> inEdges = flow.getInEdges(node.getId());
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm
index 44deca2..9a24ec5 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -121,6 +121,19 @@
<p><strong>Job Type</strong> $jobtype</p>
</div>
+ ## Condition
+
+ <div class="panel panel-default">
+ <div class="panel-heading">Condition</div>
+ <ul class="list-group">
+ #if ($condition)
+ <li class="list-group-item">$condition</li>
+ #else
+ <li class="list-group-item">No Condition</li>
+ #end
+ </ul>
+ </div><!-- /panel -->
+
## Dependencies
<div class="panel panel-default">
diff --git a/azkaban-web-server/src/main/tl/flowsummary.tl b/azkaban-web-server/src/main/tl/flowsummary.tl
index aafdef1..a34f924 100644
--- a/azkaban-web-server/src/main/tl/flowsummary.tl
+++ b/azkaban-web-server/src/main/tl/flowsummary.tl
@@ -10,6 +10,14 @@
<td class="property-key">Job Types Used</td>
<td>{#jobTypes}{.} {/jobTypes}</td>
</tr>
+ <tr>
+ <td class="property-key">Condition</td>
+ {?condition}
+ <td>{condition}</td>
+ {:else}
+ <td>none</td>
+ {/condition}
+ </tr>
</tbody>
</table>
</div>
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow.js b/azkaban-web-server/src/web/js/azkaban/view/flow.js
index c9c1c81..7d97353 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/flow.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow.js
@@ -363,7 +363,8 @@ azkaban.SummaryView = Backbone.View.extend({
var successHandler = function (data) {
console.log(data);
model.set({
- 'jobTypes': data.jobTypes
+ 'jobTypes': data.jobTypes,
+ 'condition': data.condition
});
model.trigger('render');
};
@@ -454,6 +455,7 @@ azkaban.SummaryView = Backbone.View.extend({
projectName: projectName,
flowName: flowId,
jobTypes: this.model.get('jobTypes'),
+ condition: this.model.get('condition'),
schedule: this.model.get('schedule'),
flowtrigger: this.model.get('flowtrigger'),
};