azkaban-aplcache

Conditional workflow UI - add condition in job info and flow

9/19/2018 9:46:24 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index 35168af..975ce64 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -50,6 +50,7 @@ public class Flow {
   private boolean isLayedOut = false;
   private boolean isEmbeddedFlow = false;
   private double azkabanFlowVersion = Constants.DEFAULT_AZKABAN_FLOW_VERSION;
+  private String condition = null;
 
   public Flow(final String id) {
     this.id = id;
@@ -62,6 +63,8 @@ public class Flow {
     final Boolean layedout = (Boolean) flowObject.get("layedout");
     final Boolean isEmbeddedFlow = (Boolean) flowObject.get("embeddedFlow");
     final Double azkabanFlowVersion = (Double) flowObject.get("azkabanFlowVersion");
+    final String condition = (String) flowObject.get("condition");
+
     final Flow flow = new Flow(id);
     if (layedout != null) {
       flow.setLayedOut(layedout);
@@ -75,6 +78,10 @@ public class Flow {
       flow.setAzkabanFlowVersion(azkabanFlowVersion);
     }
 
+    if (condition != null) {
+      flow.setCondition(condition);
+    }
+
     final int projId = (Integer) flowObject.get("project.id");
     flow.setProjectId(projId);
 
@@ -171,28 +178,28 @@ public class Flow {
         }
       }
 
-      setLevelsAndEdgeNodes(new HashSet<>(startNodes), 0);
+      setLevelsAndEdgeNodes(new HashSet<>(this.startNodes), 0);
     }
   }
 
-  private void setLevelsAndEdgeNodes(final Set<Node> levelNodes, int level) {
+  private void setLevelsAndEdgeNodes(final Set<Node> levelNodes, final int level) {
     final Set<Node> nextLevelNodes = new HashSet<>();
 
-    for (Node node : levelNodes) {
+    for (final Node node : levelNodes) {
       node.setLevel(level);
 
-      final Set<Edge> edges = outEdges.get(node.getId());
+      final Set<Edge> edges = this.outEdges.get(node.getId());
       if (edges != null) {
         edges.forEach(edge -> {
           edge.setSource(node);
-          edge.setTarget(nodes.get(edge.getTargetId()));
+          edge.setTarget(this.nodes.get(edge.getTargetId()));
 
           nextLevelNodes.add(edge.getTarget());
         });
       }
     }
 
-    numLevels = level;
+    this.numLevels = level;
 
     if (!nextLevelNodes.isEmpty()) {
       setLevelsAndEdgeNodes(nextLevelNodes, level + 1);
@@ -339,6 +346,8 @@ public class Flow {
     flowObj.put("layedout", this.isLayedOut);
     flowObj.put("embeddedFlow", this.isEmbeddedFlow);
     flowObj.put("azkabanFlowVersion", this.azkabanFlowVersion);
+    flowObj.put("condition", this.condition);
+
     if (this.errors != null) {
       flowObj.put("errors", this.errors);
     }
@@ -404,6 +413,14 @@ public class Flow {
     this.azkabanFlowVersion = azkabanFlowVersion;
   }
 
+  public String getCondition() {
+    return this.condition;
+  }
+
+  public void setCondition(final String condition) {
+    this.condition = condition;
+  }
+
   public Map<String, Object> getMetadata() {
     if (this.metadata == null) {
       this.metadata = new HashMap<>();
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 86156d5..ac705e2 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -48,10 +48,10 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   // Pattern to match job variables in condition expressions: ${jobName:variable}
   public static final Pattern CONDITION_VARIABLE_REPLACEMENT_PATTERN = Pattern
       .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
-  private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
   // Pattern to match conditionOnJobStatus macros, e.g. one_success, all_done
-  private static final Pattern CONDITION_ON_JOB_STATUS_PATTERN =
+  public static final Pattern CONDITION_ON_JOB_STATUS_PATTERN =
       Pattern.compile("(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b");
+  private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
   // Pattern to match a number or a string, e.g. 1234, "hello", 'foo'
   private static final Pattern DIGIT_STRING_PATTERN = Pattern.compile("\\d+|'.*'|\".*\"");
   // Valid operators in condition expressions: &&, ||, ==, !=, >, >=, <, <=
@@ -178,6 +178,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     final Node node = new Node(azkabanNode.getName());
     node.setType(azkabanNode.getType());
     validateCondition(node, azkabanNode, azkabanFlow);
+    node.setCondition(azkabanNode.getCondition());
     node.setPropsSource(flowFile.getName());
     node.setJobSource(flowFile.getName());
 
@@ -187,6 +188,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
       final Flow flowNode = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode, embeddedFlowId,
           flowFile);
       flowNode.setEmbeddedFlow(true);
+      flowNode.setCondition(node.getCondition());
       this.flowMap.put(flowNode.getId(), flowNode);
     }
 
@@ -233,9 +235,8 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
 
   private void validateCondition(final Node node, final AzkabanNode azkabanNode,
       final AzkabanFlow azkabanFlow) {
-    boolean valid = true;
     boolean foundConditionOnJobStatus = false;
-    String condition = azkabanNode.getCondition();
+    final String condition = azkabanNode.getCondition();
     if (condition == null) {
       return;
     }
@@ -252,11 +253,9 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
         if (foundConditionOnJobStatus) {
           this.errors.add("Invalid condition for " + node.getId()
               + ": cannot combine more than one conditionOnJobStatus macros.");
-          valid = false;
         }
         foundConditionOnJobStatus = true;
         node.setConditionOnJobStatus(ConditionOnJobStatus.fromString(matcher.group(1)));
-        condition = condition.replace(matcher.group(1), "true");
       } else {
         if (operands[i].startsWith("!")) {
           // Remove the operator '!' from the operand.
@@ -265,25 +264,15 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
         if (operands[i].equals("")) {
           this.errors
               .add("Invalid condition for " + node.getId() + ": operand is an empty string.");
-          valid = false;
-        } else if (!DIGIT_STRING_PATTERN.matcher(operands[i]).matches() &&
-            !isValidVariableSubstitution(operands[i], azkabanNode, azkabanFlow)) {
-          valid = false;
+        } else if (!DIGIT_STRING_PATTERN.matcher(operands[i]).matches()) {
+          validateVariableSubstitution(operands[i], azkabanNode, azkabanFlow);
         }
       }
     }
-    if (valid) {
-      node.setCondition(condition);
-    } else {
-      this.logger.info("Condition of " + node.getId() + ": " + azkabanNode.getCondition()
-          + " is invalid, set it to false");
-      node.setCondition("false");
-    }
   }
 
-  private boolean isValidVariableSubstitution(final String operand, final AzkabanNode azkabanNode,
+  private void validateVariableSubstitution(final String operand, final AzkabanNode azkabanNode,
       final AzkabanFlow azkabanFlow) {
-    boolean result = false;
     final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(operand);
     if (matcher.matches()) {
       final String jobName = matcher.group(1);
@@ -296,14 +285,11 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
       else if (isDescendantNode(conditionNode, azkabanNode, azkabanFlow)) {
         this.errors.add("Invalid condition for " + azkabanNode.getName()
             + ": should not define condition on its descendant node " + jobName + ".");
-      } else {
-        result = true;
       }
     } else {
       this.errors.add("Invalid condition for " + azkabanNode.getName()
           + ": cannot resolve the condition. Please check the syntax for supported conditions.");
     }
-    return result;
   }
 
   private boolean isDescendantNode(final AzkabanNode current, final AzkabanNode target,
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index acecbda..3eb7012 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -44,6 +44,7 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
   private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
   private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
+  private static final String CONDITION_YAML_DIR = "conditionalflowyamltest";
   private static final String INVALID_CONDITION_YAML_DIR = "invalidconditionalflowyamltest";
   private static final String BASIC_FLOW_1 = "basic_flow";
   private static final String BASIC_FLOW_2 = "basic_flow2";
@@ -59,6 +60,7 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String EMBEDDED_FLOW_B2 =
       "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
           + "embedded_flow2";
+  private static final String CONDITIONAL_FLOW = "conditional_flow6";
   private static final String DUPLICATE_NODENAME_FLOW_FILE = "duplicate_nodename.flow";
   private static final String DEPENDENCY_UNDEFINED_FLOW_FILE = "dependency_undefined.flow";
   private static final String CYCLE_FOUND_FLOW = "cycle_found";
@@ -171,6 +173,21 @@ public class DirectoryYamlFlowLoaderTest {
   }
 
   @Test
+  public void testFlowYamlFileWithValidCondition() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(CONDITION_YAML_DIR));
+    assertThat(loader.getFlowMap().containsKey(CONDITIONAL_FLOW)).isTrue();
+    checkFlowProperties(loader, CONDITIONAL_FLOW, 0, 4, 1, 4, null);
+    assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobA").getCondition()).isNull();
+    assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobB").getCondition())
+        .isEqualTo("${jobA:props} == 'foo'");
+    assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobC").getCondition())
+        .isEqualTo("${jobA:props} == 'bar'");
+    assertThat(loader.getFlowMap().get(CONDITIONAL_FLOW).getNode("jobD").getCondition())
+        .isEqualTo("one_success && ${jobA:props} == 'foo'");
+  }
+
+  @Test
   public void testFlowYamlFileWithInvalidConditions() {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_CONDITION_YAML_DIR));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1ba50c8..27981e7 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -20,6 +20,7 @@ import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
 import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
 import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
 import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
+import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_ON_JOB_STATUS_PATTERN;
 import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_VARIABLE_REPLACEMENT_PATTERN;
 
 import azkaban.Constants;
@@ -878,6 +879,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Check if condition on job status is satisfied
     switch (checkConditionOnJobStatus(node)) {
       case FAILED:
+        this.logger.info("Condition on job status: " + node.getConditionOnJobStatus() + " is "
+            + "evaluated to false for " + node.getId());
         status = Status.CANCELLED;
         break;
       // Condition not satisfied yet, need to wait
@@ -887,7 +890,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         break;
     }
 
-    if (!isConditionOnRuntimeVariableMet(node)) {
+    if (status != Status.CANCELLED && !isConditionOnRuntimeVariableMet(node)) {
       status = Status.CANCELLED;
     }
 
@@ -917,15 +920,24 @@ public class FlowRunner extends EventHandler implements Runnable {
       return true;
     }
 
-    final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
     String replaced = condition;
+    // Replace the condition on job status macro with "true" to skip the evaluation by Script
+    // Engine since it has already been evaluated.
+    final Matcher jobStatusMatcher = CONDITION_ON_JOB_STATUS_PATTERN.matcher
+        (condition);
+    if (jobStatusMatcher.find()) {
+      replaced = condition.replace(jobStatusMatcher.group(1), "true");
+    }
+
+    final Matcher variableMatcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(replaced);
 
-    while (matcher.find()) {
-      final String value = findValueForJobVariable(node, matcher.group(1), matcher.group(2));
+    while (variableMatcher.find()) {
+      final String value = findValueForJobVariable(node, variableMatcher.group(1),
+          variableMatcher.group(2));
       if (value != null) {
-        replaced = replaced.replace(matcher.group(), "'" + value + "'");
+        replaced = replaced.replace(variableMatcher.group(), "'" + value + "'");
       }
-      this.logger.info("Condition is " + replaced);
+      this.logger.info("Resolved condition of " + node.getId() + " is " + replaced);
     }
 
     // Evaluate string expression using script engine
@@ -971,10 +983,10 @@ public class FlowRunner extends EventHandler implements Runnable {
         result = (boolean) object;
       }
     } catch (final Exception e) {
-      this.logger.error("Failed to evaluate the expression.", e);
+      this.logger.error("Failed to evaluate the condition.", e);
     }
 
-    this.logger.info("Evaluate expression result: " + result);
+    this.logger.info("Condition is evaluated to " + result);
     return result;
   }
 
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 0f68b60..b384c66 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -365,15 +365,17 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
       throws ServletException {
     final String flowName = getParam(req, "flow");
 
-    Flow flow = null;
     try {
-      flow = project.getFlow(flowName);
+      final Flow flow = project.getFlow(flowName);
       if (flow == null) {
         ret.put("error", "Flow " + flowName + " not found.");
         return;
       }
 
       ret.put("jobTypes", getFlowJobTypes(flow));
+      if (flow.getCondition() != null) {
+        ret.put("condition", flow.getCondition());
+      }
     } catch (final AccessControlException e) {
       ret.put("error", e.getMessage());
     }
@@ -1355,6 +1357,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 
       page.add("jobid", node.getId());
       page.add("jobtype", node.getType());
+      if (node.getCondition() != null) {
+        page.add("condition", node.getCondition());
+      }
 
       final ArrayList<String> dependencies = new ArrayList<>();
       final Set<Edge> inEdges = flow.getInEdges(node.getId());
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm
index 44deca2..9a24ec5 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -121,6 +121,19 @@
           <p><strong>Job Type</strong> $jobtype</p>
         </div>
 
+      ## Condition
+
+        <div class="panel panel-default">
+          <div class="panel-heading">Condition</div>
+          <ul class="list-group">
+            #if ($condition)
+              <li class="list-group-item">$condition</li>
+            #else
+              <li class="list-group-item">No Condition</li>
+            #end
+          </ul>
+        </div><!-- /panel -->
+
       ## Dependencies
 
         <div class="panel panel-default">
diff --git a/azkaban-web-server/src/main/tl/flowsummary.tl b/azkaban-web-server/src/main/tl/flowsummary.tl
index aafdef1..a34f924 100644
--- a/azkaban-web-server/src/main/tl/flowsummary.tl
+++ b/azkaban-web-server/src/main/tl/flowsummary.tl
@@ -10,6 +10,14 @@
         <td class="property-key">Job Types Used</td>
         <td>{#jobTypes}{.} {/jobTypes}</td>
       </tr>
+      <tr>
+        <td class="property-key">Condition</td>
+        {?condition}
+          <td>{condition}</td>
+        {:else}
+          <td>none</td>
+        {/condition}
+      </tr>
       </tbody>
     </table>
   </div>
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow.js b/azkaban-web-server/src/web/js/azkaban/view/flow.js
index c9c1c81..7d97353 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/flow.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow.js
@@ -363,7 +363,8 @@ azkaban.SummaryView = Backbone.View.extend({
     var successHandler = function (data) {
       console.log(data);
       model.set({
-        'jobTypes': data.jobTypes
+        'jobTypes': data.jobTypes,
+        'condition': data.condition
       });
       model.trigger('render');
     };
@@ -454,6 +455,7 @@ azkaban.SummaryView = Backbone.View.extend({
       projectName: projectName,
       flowName: flowId,
       jobTypes: this.model.get('jobTypes'),
+      condition: this.model.get('condition'),
       schedule: this.model.get('schedule'),
       flowtrigger: this.model.get('flowtrigger'),
     };