azkaban-aplcache

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1e4ba8c..831a557 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -465,9 +465,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         }
       }
 
-      if (outNodeIds.isEmpty()) {
-        // There's no outnodes means it's the end of a flow, so we finalize
-        // and fire an event.
+      if (outNodeIds.isEmpty() && isFlowReadytoFinalize(parentFlow)) {
         finalizeFlow(parentFlow);
         finishExecutableNode(parentFlow);
 
@@ -591,6 +589,16 @@ public class FlowRunner extends EventHandler implements Runnable {
     fireEventListeners(Event.create(this, EventType.JOB_FINISHED, eventData));
   }
 
+  private boolean isFlowReadytoFinalize(final ExecutableFlowBase flow) {
+    // Only when all the end nodes are finished, the flow is ready to finalize.
+    for (final String end : flow.getEndNodes()) {
+      if (!Status.isStatusFinished(flow.getExecutableNode(end).getStatus())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void finalizeFlow(final ExecutableFlowBase flow) {
     final String id = flow == this.flow ? "" : flow.getNestedId();
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestYaml.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestYaml.java
new file mode 100644
index 0000000..e4a7850
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestYaml.java
@@ -0,0 +1,120 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.execapp;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
+import java.io.File;
+import java.util.HashMap;
+import org.junit.Test;
+
+public class FlowRunnerTestYaml extends FlowRunnerTestBase {
+
+  private static final String BASIC_FLOW_YAML_DIR = "basicflowwithoutendnode";
+  private static final String FAIL_BASIC_FLOW_YAML_DIR = "failbasicflowwithoutendnode";
+  private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowwithoutendnode";
+  private static final String BASIC_FLOW_NAME = "basic_flow";
+  private static final String BASIC_FLOW_YAML_FILE = BASIC_FLOW_NAME + ".flow";
+  private static final String FAIL_BASIC_FLOW_NAME = "fail_basic_flow";
+  private static final String FAIL_BASIC_FLOW_YAML_FILE = FAIL_BASIC_FLOW_NAME + ".flow";
+  private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
+  private static final String EMBEDDED_FLOW_YAML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+  private FlowRunnerTestUtil testUtil;
+
+  @Test
+  public void testBasicFlowWithoutEndNode() throws Exception {
+    setUp(BASIC_FLOW_YAML_DIR, BASIC_FLOW_YAML_FILE);
+    final HashMap<String, String> flowProps = new HashMap<>();
+    this.runner = this.testUtil.createFromFlowMap(BASIC_FLOW_NAME, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    FlowRunnerTestUtil.startThread(this.runner);
+    assertFlowStatus(flow, Status.RUNNING);
+    assertStatus("jobA", Status.SUCCEEDED);
+    assertStatus("jobB", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.RUNNING);
+    assertStatus("jobC", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.SUCCEEDED);
+  }
+
+  @Test
+  public void testKillBasicFlowWithoutEndNode() throws Exception {
+    setUp(BASIC_FLOW_YAML_DIR, BASIC_FLOW_YAML_FILE);
+    final HashMap<String, String> flowProps = new HashMap<>();
+    this.runner = this.testUtil.createFromFlowMap(BASIC_FLOW_NAME, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    FlowRunnerTestUtil.startThread(this.runner);
+    assertFlowStatus(flow, Status.RUNNING);
+    assertStatus("jobA", Status.SUCCEEDED);
+    assertStatus("jobB", Status.SUCCEEDED);
+    this.runner.kill();
+    assertStatus("jobC", Status.KILLED);
+    assertFlowStatus(flow, Status.KILLED);
+  }
+
+  @Test
+  public void testFailBasicFlowWithoutEndNode() throws Exception {
+    setUp(FAIL_BASIC_FLOW_YAML_DIR, FAIL_BASIC_FLOW_YAML_FILE);
+    final HashMap<String, String> flowProps = new HashMap<>();
+    this.runner = this.testUtil.createFromFlowMap(FAIL_BASIC_FLOW_NAME, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    FlowRunnerTestUtil.startThread(this.runner);
+    assertFlowStatus(flow, Status.FAILED_FINISHING);
+    assertStatus("jobC", Status.FAILED);
+    assertStatus("jobB", Status.SUCCEEDED);
+    assertStatus("jobA", Status.SUCCEEDED);
+    assertStatus("jobD", Status.CANCELLED);
+    assertFlowStatus(flow, Status.FAILED);
+  }
+
+  @Test
+  public void testEmbeddedFlowWithoutEndNode() throws Exception {
+    setUp(EMBEDDED_FLOW_YAML_DIR, EMBEDDED_FLOW_YAML_FILE);
+    final HashMap<String, String> flowProps = new HashMap<>();
+    this.runner = this.testUtil.createFromFlowMap(EMBEDDED_FLOW_NAME, flowProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    FlowRunnerTestUtil.startThread(this.runner);
+    assertFlowStatus(flow, Status.RUNNING);
+    assertStatus("jobA", Status.SUCCEEDED);
+    assertStatus("embedded_flow1:jobB", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.RUNNING);
+    assertStatus("embedded_flow1:jobC", Status.SUCCEEDED);
+    assertStatus("embedded_flow1", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.RUNNING);
+    assertStatus("jobD", Status.SUCCEEDED);
+    assertFlowStatus(flow, Status.SUCCEEDED);
+  }
+
+  private void setUp(final String projectDir, final String flowYamlFile) throws Exception {
+    this.testUtil = new FlowRunnerTestUtil(projectDir, this.temporaryFolder);
+    final Project project = this.testUtil.getProject();
+    when(this.testUtil.getProjectLoader()
+        .getLatestFlowVersion(project.getId(), project.getVersion(), flowYamlFile))
+        .thenReturn(1);
+    when(this.testUtil.getProjectLoader()
+        .getUploadedFlowFile(eq(project.getId()), eq(project.getVersion()),
+            eq(flowYamlFile),
+            eq(1), any(File.class)))
+        .thenReturn(
+            ExecutionsTestUtil.getFlowFile(projectDir, flowYamlFile));
+  }
+
+}
diff --git a/azkaban-web-server/src/web/js/azkaban/util/layout.js b/azkaban-web-server/src/web/js/azkaban/util/layout.js
index 2736b8e..6e70922 100644
--- a/azkaban-web-server/src/web/js/azkaban/util/layout.js
+++ b/azkaban-web-server/src/web/js/azkaban/util/layout.js
@@ -392,7 +392,7 @@ function findAverage(nodes) {
   for (var i = 0; i < nodes.length; ++i) {
     sum += nodes[i].x;
   }
-  return sum / nodes.length;
+  return nodes.length != 0 ? sum / nodes.length : 0;
 }
 
 function uncrossWithOut(layer) {
diff --git a/test/execution-test-data/basicflowwithoutendnode/basic_flow.flow b/test/execution-test-data/basicflowwithoutendnode/basic_flow.flow
new file mode 100644
index 0000000..558bc3b
--- /dev/null
+++ b/test/execution-test-data/basicflowwithoutendnode/basic_flow.flow
@@ -0,0 +1,22 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobA
+    type: test
+    config:
+      seconds: 0
+      fail: false
+
+  - name: jobB
+    type: test
+    config:
+      seconds: 0
+      fail: false
+
+  - name: jobC
+    type: test
+    config:
+      seconds: 1
+      fail: false
diff --git a/test/execution-test-data/basicflowwithoutendnode/basic_flow.project b/test/execution-test-data/basicflowwithoutendnode/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/basicflowwithoutendnode/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/embeddedflowwithoutendnode/embedded_flow.flow b/test/execution-test-data/embeddedflowwithoutendnode/embedded_flow.flow
new file mode 100644
index 0000000..0ec51d8
--- /dev/null
+++ b/test/execution-test-data/embeddedflowwithoutendnode/embedded_flow.flow
@@ -0,0 +1,34 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobA
+    type: test
+    config:
+      seconds: 0
+      fail: false
+
+  - name: embedded_flow1
+    type: flow
+    nodes:
+      - name: jobB
+        type: test
+        config:
+          seconds: 0
+          fail: false
+      - name: jobC
+        type: test
+        config:
+          seconds: 1
+          fail: false
+        dependsOn:
+          - jobB
+
+  - name: jobD
+    type: test
+    config:
+      seconds: 1
+      fail: false
+    dependsOn:
+      - embedded_flow1
diff --git a/test/execution-test-data/embeddedflowwithoutendnode/embedded_flow.project b/test/execution-test-data/embeddedflowwithoutendnode/embedded_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/embeddedflowwithoutendnode/embedded_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/failbasicflowwithoutendnode/fail_basic_flow.flow b/test/execution-test-data/failbasicflowwithoutendnode/fail_basic_flow.flow
new file mode 100644
index 0000000..4de226e
--- /dev/null
+++ b/test/execution-test-data/failbasicflowwithoutendnode/fail_basic_flow.flow
@@ -0,0 +1,30 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobD
+    type: test
+    config:
+      seconds: 0
+      fail: false
+    dependsOn:
+      - jobA
+
+  - name: jobA
+    type: test
+    config:
+      seconds: 1
+      fail: false
+
+  - name: jobB
+    type: test
+    config:
+      seconds: 1
+      fail: false
+
+  - name: jobC
+    type: test
+    config:
+      seconds: 1
+      fail: true
diff --git a/test/execution-test-data/failbasicflowwithoutendnode/fail_basic_flow.project b/test/execution-test-data/failbasicflowwithoutendnode/fail_basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/failbasicflowwithoutendnode/fail_basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0