azkaban-aplcache

Conditional workflow - modify conditionOnJobStatus macros.

8/23/2018 9:01:23 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java b/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
index 8f9d0a2..4508a27 100644
--- a/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
+++ b/azkaban-common/src/main/java/azkaban/flow/ConditionOnJobStatus.java
@@ -20,9 +20,7 @@ public enum ConditionOnJobStatus {
   ALL_FAILED("all_failed"),
   ALL_DONE("all_done"),
   ONE_FAILED("one_failed"),
-  ONE_SUCCESS("one_success"),
-  ONE_FAILED_ALL_DONE("one_failed_all_done"),
-  ONE_SUCCESS_ALL_DONE("one_success_all_done");
+  ONE_SUCCESS("one_success");
 
   private final String condition;
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
index 0c0d2b2..d6a3241 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ConditionalWorkflowUtils.java
@@ -18,9 +18,7 @@ package azkaban.execapp;
 import static azkaban.flow.ConditionOnJobStatus.ALL_FAILED;
 import static azkaban.flow.ConditionOnJobStatus.ALL_SUCCESS;
 import static azkaban.flow.ConditionOnJobStatus.ONE_FAILED;
-import static azkaban.flow.ConditionOnJobStatus.ONE_FAILED_ALL_DONE;
 import static azkaban.flow.ConditionOnJobStatus.ONE_SUCCESS;
-import static azkaban.flow.ConditionOnJobStatus.ONE_SUCCESS_ALL_DONE;
 
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
@@ -42,9 +40,6 @@ public class ConditionalWorkflowUtils {
       case ONE_FAILED:
       case ONE_SUCCESS:
         return checkOneStatus(node, conditionOnJobStatus);
-      case ONE_FAILED_ALL_DONE:
-      case ONE_SUCCESS_ALL_DONE:
-        return checkOneStatusAllDone(node, conditionOnJobStatus);
       default:
         return checkAllStatus(node, ALL_SUCCESS);
     }
@@ -68,30 +63,14 @@ public class ConditionalWorkflowUtils {
 
   private static String checkOneStatus(final ExecutableNode node, final ConditionOnJobStatus
       condition) {
-    boolean finished = true;
-    for (final String dependency : node.getInNodes()) {
-      final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
-      final Status depStatus = dependencyNode.getStatus();
-      if ((condition.equals(ONE_SUCCESS) && Status.isStatusSucceeded(depStatus)) ||
-          (condition.equals(ONE_FAILED) && Status.isStatusFailed(depStatus))) {
-        return SATISFIED;
-      } else if (!Status.isStatusFinished(depStatus)) {
-        finished = false;
-      }
-    }
-    return finished ? FAILED : PENDING;
-  }
-
-  private static String checkOneStatusAllDone(final ExecutableNode node, final ConditionOnJobStatus
-      condition) {
     String result = FAILED;
     for (final String dependency : node.getInNodes()) {
       final ExecutableNode dependencyNode = node.getParentFlow().getExecutableNode(dependency);
       final Status depStatus = dependencyNode.getStatus();
       if (!Status.isStatusFinished(depStatus)) {
         return PENDING;
-      } else if ((condition.equals(ONE_SUCCESS_ALL_DONE) && Status.isStatusSucceeded(depStatus)) ||
-          (condition.equals(ONE_FAILED_ALL_DONE) && Status.isStatusFailed(depStatus))) {
+      } else if ((condition.equals(ONE_SUCCESS) && Status.isStatusSucceeded(depStatus)) ||
+          (condition.equals(ONE_FAILED) && Status.isStatusFailed(depStatus))) {
         result = SATISFIED;
       }
     }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
index ba22df6..ca3565b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -97,18 +97,6 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
   }
 
   @Test
-  public void runFlowOnJobStatusOneFailed() throws Exception {
-    final HashMap<String, String> flowProps = new HashMap<>();
-    setUp(CONDITIONAL_FLOW_3, flowProps);
-    final ExecutableFlow flow = this.runner.getExecutableFlow();
-    InteractiveTestJob.getTestJob("jobA").failJob();
-    assertStatus(flow, "jobA", Status.FAILED);
-    assertStatus(flow, "jobB", Status.RUNNING);
-    assertStatus(flow, "jobC", Status.SUCCEEDED);
-    assertFlowStatus(flow, Status.SUCCEEDED);
-  }
-
-  @Test
   public void runFlowOnJobStatusAllFailed() throws Exception {
     final HashMap<String, String> flowProps = new HashMap<>();
     setUp(CONDITIONAL_FLOW_4, flowProps);
@@ -124,7 +112,7 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
   }
 
   @Test
-  public void runFlowOnJobStatusOneSuccessAllDone() throws Exception {
+  public void runFlowOnJobStatusOneSuccess() throws Exception {
     final HashMap<String, String> flowProps = new HashMap<>();
     setUp(CONDITIONAL_FLOW_5, flowProps);
     final ExecutableFlow flow = this.runner.getExecutableFlow();
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
index 5a7c723..97f7825 100644
--- a/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
@@ -16,8 +16,6 @@ nodes:
 
   - name: jobA
     type: test
-    config:
-      seconds: 0
 
   - name: jobB
     type: test
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
index 219c53d..43956e6 100644
--- a/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow3.flow
@@ -16,11 +16,9 @@ nodes:
 
   - name: jobA
     type: test
-    config:
-      seconds: 2
 
   - name: jobB
     type: test
     config:
       fail: false
-      seconds: 2
+      seconds: 0
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
index 9faef5a..f9091da 100644
--- a/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow4.flow
@@ -16,10 +16,6 @@ nodes:
 
   - name: jobA
     type: test
-    config:
-      seconds: 2
 
   - name: jobB
     type: test
-    config:
-      seconds: 2
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
index 17f54be..9619153 100644
--- a/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow5.flow
@@ -8,7 +8,7 @@ nodes:
     config:
       fail: false
       seconds: 0
-    condition: one_success_all_done
+    condition: one_success
 
     dependsOn:
       - jobA
@@ -16,10 +16,6 @@ nodes:
 
   - name: jobA
     type: test
-    config:
-      seconds: 2
 
   - name: jobB
     type: test
-    config:
-      seconds: 2