azkaban-aplcache

make max wait min non-required when flow trigger's data dependency

7/16/2018 11:53:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 0e41840..7b67888 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -25,7 +25,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 
 /**
@@ -40,16 +42,18 @@ public class FlowTrigger implements Serializable {
   private final Duration maxWaitDuration;
 
   /**
-   * @throws IllegalArgumentException if any of the argument is null or there is duplicate
+   * @throws IllegalArgumentException if illegal argument is found or there is duplicate
    * dependency name or duplicate dependency type and params
    */
   public FlowTrigger(final CronSchedule schedule, final List<FlowTriggerDependency> dependencies,
-      final Duration maxWaitDuration) {
-    // will perform some basic validation here, and futher validation will be performed on
+      @Nullable final Duration maxWaitDuration) {
+    // will perform some basic validation here, and further validation will be performed on
     // parsing time when NodeBeanLoader parses the XML to flow trigger.
     Preconditions.checkNotNull(schedule, "schedule cannot be null");
     Preconditions.checkNotNull(dependencies, "dependency cannot be null");
-    Preconditions.checkNotNull(maxWaitDuration, "max wait time cannot be null");
+    Preconditions.checkArgument(dependencies.isEmpty() || maxWaitDuration != null, "max wait "
+        + "time cannot be null unless no dependency is defined");
+
     validateDependencies(dependencies);
     this.schedule = schedule;
     final ImmutableMap.Builder builder = new Builder();
@@ -74,7 +78,7 @@ public class FlowTrigger implements Serializable {
   public String toString() {
     return "FlowTrigger{" +
         "schedule=" + this.schedule +
-        ", maxWaitDurationInMins=" + this.maxWaitDuration.toMinutes() +
+        ", maxWaitDurationInMins=" + this.maxWaitDuration +
         "\n " + StringUtils.join(this.dependencies.values(), "\n") + '}';
   }
 
@@ -105,8 +109,8 @@ public class FlowTrigger implements Serializable {
     return this.dependencies.values();
   }
 
-  public Duration getMaxWaitDuration() {
-    return this.maxWaitDuration;
+  public Optional<Duration> getMaxWaitDuration() {
+    return Optional.ofNullable(this.maxWaitDuration);
   }
 
   public CronSchedule getSchedule() {
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
index e2c9756..f812dc8 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
@@ -17,7 +17,6 @@
 
 package azkaban.project;
 
-import azkaban.Constants;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -27,15 +26,15 @@ import java.util.Map;
  */
 public class FlowTriggerBean {
 
-  private long maxWaitMins = Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes();
+  private Long maxWaitMins = null;
   private Map<String, String> schedule;
   private List<TriggerDependencyBean> triggerDependencies;
 
-  public long getMaxWaitMins() {
+  public Long getMaxWaitMins() {
     return this.maxWaitMins;
   }
 
-  public void setMaxWaitMins(final long maxWaitMins) {
+  public void setMaxWaitMins(final Long maxWaitMins) {
     this.maxWaitMins = maxWaitMins;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 323a68f..273793b 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -121,13 +121,21 @@ public class NodeBeanLoader {
   }
 
   private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean) {
-    // validate max wait mins
-    Preconditions.checkArgument(flowTriggerBean.getMaxWaitMins() >= Constants
-        .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes(), "max wait min must be at least " + Constants
-        .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes() + " min(s)");
-
     validateSchedule(flowTriggerBean);
     validateTriggerDependencies(flowTriggerBean.getTriggerDependencies());
+    validateMaxWaitMins(flowTriggerBean);
+  }
+
+  private void validateMaxWaitMins(final FlowTriggerBean flowTriggerBean) {
+    Preconditions.checkArgument(flowTriggerBean.getTriggerDependencies().isEmpty() ||
+            flowTriggerBean.getMaxWaitMins() != null,
+        "max wait min cannot be null unless no dependency is defined");
+
+    if (flowTriggerBean.getMaxWaitMins() != null) {
+      Preconditions.checkArgument(flowTriggerBean.getMaxWaitMins() >= Constants
+          .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes(), "max wait min must be at least " + Constants
+          .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes() + " min(s)");
+    }
   }
 
   /**
@@ -185,16 +193,20 @@ public class NodeBeanLoader {
       return null;
     } else {
       validateFlowTriggerBean(flowTriggerBean);
-      if (flowTriggerBean.getMaxWaitMins() > Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME
+      if (flowTriggerBean.getMaxWaitMins() != null
+          && flowTriggerBean.getMaxWaitMins() > Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME
           .toMinutes()) {
         flowTriggerBean.setMaxWaitMins(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes());
       }
+
+      final Duration duration = flowTriggerBean.getMaxWaitMins() == null ? null : Duration
+          .ofMinutes(flowTriggerBean.getMaxWaitMins());
+
       return new FlowTrigger(
           new CronSchedule(flowTriggerBean.getSchedule().get(FlowTriggerProps.SCHEDULE_VALUE)),
           flowTriggerBean.getTriggerDependencies().stream()
               .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
-              .collect(Collectors.toList()),
-          Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+              .collect(Collectors.toList()), duration);
     }
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
index 9603015..286b509 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -17,10 +17,12 @@
 package azkaban.project;
 
 
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
@@ -48,12 +50,27 @@ public class FlowTriggerTest {
   @Test
   public void testFlowTriggerArgumentValidation() {
     final CronSchedule validSchedule = new CronSchedule("* * * * ? *");
-    final List<FlowTriggerDependency> validDependencyList = new ArrayList<>();
-    final List<FlowTriggerDependency> invalidDependencyList = null;
+    final CronSchedule nullSchedule = null;
+    final List<FlowTriggerDependency> emptyDependencyList = new ArrayList<>();
+    final List<FlowTriggerDependency> nullDependencyList = null;
+
+    final List<FlowTriggerDependency> nonEmptyDependencyList = Arrays.asList(createTestDependency
+        ("type", "dep1"));
+
     final Duration validDuration = Duration.ofMinutes(10);
+    final Duration nullDuration = null;
+
+    assertThatThrownBy(() -> new FlowTrigger(nullSchedule, nonEmptyDependencyList, validDuration))
+        .isInstanceOf(NullPointerException.class);
 
-    assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
+    assertThatThrownBy(() -> new FlowTrigger(validSchedule, nullDependencyList, validDuration))
         .isInstanceOf(NullPointerException.class);
+
+    assertThatThrownBy(() -> new FlowTrigger(validSchedule, nonEmptyDependencyList, nullDuration))
+        .isInstanceOf(IllegalArgumentException.class);
+
+    assertThatCode(() -> new FlowTrigger(validSchedule, emptyDependencyList, nullDuration))
+        .doesNotThrowAnyException();
   }
 
   @Test
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 9277c3a..defbfec 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -209,17 +209,12 @@ public class NodeBeanLoaderTest {
   public void testFlowTriggerMaxWaitMinValidation() throws Exception {
     final NodeBeanLoader loader = new NodeBeanLoader();
 
-    NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_max_wait_min.flow"));
-    FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
-    assertThat(flowTrigger.getMaxWaitDuration())
-        .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
 
-    nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
-        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_large_max_wait_min.flow"));
-    flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
-    assertThat(flowTrigger.getMaxWaitDuration())
-        .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("max wait min cannot be null unless no dependency is defined");
 
     final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_zero_max_wait_min.flow"));
@@ -227,6 +222,22 @@ public class NodeBeanLoaderTest {
     assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
         .isInstanceOf(IllegalArgumentException.class).hasMessage("max wait min must be at least 1"
         + " min(s)");
+
+    NodeBean nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_large_max_wait_min.flow"));
+    FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean3.getTrigger());
+    assertThat(flowTrigger.getMaxWaitDuration().orElse(null))
+        .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+
+    nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_max_wait_min_zero_dep.flow"));
+    flowTrigger = loader.toFlowTrigger(nodeBean3.getTrigger());
+    assertThat(flowTrigger.getMaxWaitDuration().orElse(null)).isEqualTo(null);
+
+    nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_max_wait_min_zero_dep.flow"));
+    flowTrigger = loader.toFlowTrigger(nodeBean3.getTrigger());
+    assertThat(flowTrigger.getMaxWaitDuration().get().toMinutes()).isEqualTo(5);
   }
 
   @Test
@@ -450,7 +461,8 @@ public class NodeBeanLoaderTest {
 
   private void validateFlowTrigger(final FlowTrigger flowTrigger, final long maxWaitMins, final
   String cronExpression, final int numDependencies) {
-    assertThat(flowTrigger.getMaxWaitDuration()).isEqualTo(Duration.ofMinutes(maxWaitMins));
+    assertThat(flowTrigger.getMaxWaitDuration().orElse(null)).isEqualTo(Duration.ofMinutes
+        (maxWaitMins));
     assertThat(flowTrigger.getSchedule().getCronExpression()).isEqualTo(cronExpression);
     assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
   }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index e900761..ce3fd76 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -365,7 +365,7 @@ public class FlowTriggerService {
   private long remainingTimeBeforeTimeout(final TriggerInstance triggerInst) {
     final long now = System.currentTimeMillis();
     return Math.max(0,
-        triggerInst.getFlowTrigger().getMaxWaitDuration().toMillis() - (now - triggerInst
+        triggerInst.getFlowTrigger().getMaxWaitDuration().get().toMillis() - (now - triggerInst
             .getStartTime()));
   }
 
@@ -403,7 +403,7 @@ public class FlowTriggerService {
       // todo chengren311: it's possible web server restarts before the db update, then
       // new instance will not be recoverable from db.
       addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
-          .getMaxWaitDuration(), CancellationCause.TIMEOUT);
+          .getMaxWaitDuration().get(), CancellationCause.TIMEOUT);
     }
   }
 
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
index fcfbeff..79faea0 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
@@ -73,7 +73,13 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
           utils.formatDateTime(res.getQuartzTrigger().getStartTime().getTime()));
       jsonObj.put("nextExecTime",
           utils.formatDateTime(res.getQuartzTrigger().getNextFireTime().getTime()));
-      jsonObj.put("maxWaitMin", res.getFlowTrigger().getMaxWaitDuration().toMinutes());
+
+      Long maxWaitMin = null;
+      if (res.getFlowTrigger().getMaxWaitDuration().isPresent()) {
+        maxWaitMin = res.getFlowTrigger().getMaxWaitDuration().get().toMinutes();
+      }
+      jsonObj.put("maxWaitMin", maxWaitMin);
+
       if (!res.getFlowTrigger().getDependencies().isEmpty()) {
         jsonObj.put("dependencies", res.getDependencyListJson());
       }
@@ -81,7 +87,7 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
-  private boolean checkProjectIdAndFlowId(HttpServletRequest req) {
+  private boolean checkProjectIdAndFlowId(final HttpServletRequest req) {
     return hasParam(req, "projectId") && hasParam(req, "flowId");
   }
 
@@ -104,19 +110,17 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
 
         if (project == null) {
           ret.put("error", "please specify a valid project id");
-        }
-        else if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
+        } else if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
           ret.put("error", "Permission denied. Need ADMIN access.");
-        }
-        else {
+        } else {
           try {
-            if(ajaxName.equals("pauseTrigger")) {
+            if (ajaxName.equals("pauseTrigger")) {
               this.scheduler.pauseFlowTrigger(projectId, flowId);
             } else {
               this.scheduler.resumeFlowTrigger(projectId, flowId);
             }
             ret.put("status", "success");
-          } catch (SchedulerException ex) {
+          } catch (final SchedulerException ex) {
             ret.put("error", ex.getMessage());
           }
         }
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
index 608e958..0dfd82b 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
@@ -39,7 +39,7 @@
   <script type="text/javascript">
     function pauseTrigger(projectId, flowId) {
       var requestURL = document.location.href;
-      var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "pauseTrigger"};
+      var requestData = {"projectId": projectId, "flowId": flowId, "ajax": "pauseTrigger"};
       var successHandler = function (data) {
         if (data.error) {
           showDialog("Error", data.error);
@@ -53,7 +53,7 @@
 
     function resumeTrigger(projectId, flowId) {
       var requestURL = document.location.href;
-      var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "resumeTrigger"};
+      var requestData = {"projectId": projectId, "flowId": flowId, "ajax": "resumeTrigger"};
       var successHandler = function (data) {
         if (data.error) {
           showDialog("Error", data.error);
@@ -130,7 +130,12 @@
               <td>$utils.formatDate(${trigger.getQuartzTrigger().getNextFireTime().getTime()})</td>
 
               <td>${trigger.getFlowTrigger().getSchedule().getCronExpression()}</td>
-              <td>${trigger.getFlowTrigger().getMaxWaitDuration().toMinutes()}</td>
+
+              #if (${trigger.getFlowTrigger().getMaxWaitDuration().isPresent()} == "true")
+                <td>${trigger.getFlowTrigger().getMaxWaitDuration().get().toMinutes()}</td>
+              #else
+                <td>-</td>
+              #end
 
 
             ## adopted from template to show executionOption in scheduledflowpage.vm
@@ -165,15 +170,17 @@
 
               <td>${trigger.isPaused()}</td>
 
-              ##todo chengren311: add pause/resume button in flow page.
+            ##todo chengren311: add pause/resume button in flow page.
               <td>
                 #if (${trigger.isPaused()} == "false")
                   <button type="button" id="pausebtn" class="btn btn-danger btn-sm"
-                          onclick="pauseTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Pause
+                          onclick="pauseTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">
+                    Pause
                   </button>
                 #else
                   <button type="button" id="resumebtn" class="btn btn-info btn-sm"
-                          onclick="resumeTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Resume
+                          onclick="resumeTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">
+                    Resume
                   </button>
                 #end
               </td>
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_max_wait_min_zero_dep.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_max_wait_min_zero_dep.flow
new file mode 100644
index 0000000..6923811
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_max_wait_min_zero_dep.flow
@@ -0,0 +1,43 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min_zero_dep.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min_zero_dep.flow
new file mode 100644
index 0000000..c73260b
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min_zero_dep.flow
@@ -0,0 +1,42 @@
+---
+# Flow trigger
+trigger:
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd