azkaban-aplcache

restrict second level schedule for flow trigger (#1745) Azkaban

4/23/2018 7:26:14 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index c411987..0375f64 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -107,6 +107,13 @@ public class NodeBeanLoader {
                 .isValidExpression(scheduleMap.get(FlowTriggerProps.SCHEDULE_VALUE)),
             "flow trigger schedule value must be a valid cron expression");
 
+    final String cronExpression = scheduleMap.get(FlowTriggerProps.SCHEDULE_VALUE).trim();
+    final String[] cronParts = cronExpression.split("\\s+");
+
+    Preconditions
+        .checkArgument(cronParts[0].equals("0"), "interval of flow trigger schedule has to"
+            + " be larger than 1 min");
+
     Preconditions.checkArgument(scheduleMap.size() == 2, "flow trigger schedule must "
         + "contain type and value only");
   }
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 6484d11..05c7b8d 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -267,6 +267,18 @@ public class NodeBeanLoaderTest {
     assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
         .isInstanceOf(IllegalArgumentException.class);
 
+    loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_second_level_cron_expression1.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+        .isInstanceOf(IllegalArgumentException.class);
+
+    loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_second_level_cron_expression2.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+        .isInstanceOf(IllegalArgumentException.class);
+
     final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_schedule.flow"));
 
diff --git a/azkaban-web-server/src/test/resources/flow_trigger.flow b/azkaban-web-server/src/test/resources/flow_trigger.flow
index 4157128..7bf2e12 100644
--- a/azkaban-web-server/src/test/resources/flow_trigger.flow
+++ b/azkaban-web-server/src/test/resources/flow_trigger.flow
@@ -4,7 +4,7 @@ trigger:
   maxWaitMins: 1
   schedule:
     type: cron
-    value: 0/5 * * * * ?
+    value: 0 0 0 ? * * *
 
   triggerDependencies:
     - name: search-impression # an unique name to identify the dependency
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_second_level_cron_expression1.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_second_level_cron_expression1.flow
new file mode 100644
index 0000000..840964c
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_second_level_cron_expression1.flow
@@ -0,0 +1,10 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 1/1   0   0  ? * * *
+
+  triggerDependencies:
+
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_second_level_cron_expression2.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_second_level_cron_expression2.flow
new file mode 100644
index 0000000..e1f1671
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_second_level_cron_expression2.flow
@@ -0,0 +1,10 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 1 0 0 ? * * *
+
+  triggerDependencies:
+