Details
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 0e41840..7b67888 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -25,7 +25,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
/**
@@ -40,16 +42,18 @@ public class FlowTrigger implements Serializable {
private final Duration maxWaitDuration;
/**
- * @throws IllegalArgumentException if any of the argument is null or there is duplicate
+ * @throws IllegalArgumentException if illegal argument is found or there is duplicate
* dependency name or duplicate dependency type and params
*/
public FlowTrigger(final CronSchedule schedule, final List<FlowTriggerDependency> dependencies,
- final Duration maxWaitDuration) {
- // will perform some basic validation here, and futher validation will be performed on
+ @Nullable final Duration maxWaitDuration) {
+ // will perform some basic validation here, and further validation will be performed on
// parsing time when NodeBeanLoader parses the XML to flow trigger.
Preconditions.checkNotNull(schedule, "schedule cannot be null");
Preconditions.checkNotNull(dependencies, "dependency cannot be null");
- Preconditions.checkNotNull(maxWaitDuration, "max wait time cannot be null");
+ Preconditions.checkArgument(dependencies.isEmpty() || maxWaitDuration != null, "max wait "
+ + "time cannot be null unless no dependency is defined");
+
validateDependencies(dependencies);
this.schedule = schedule;
final ImmutableMap.Builder builder = new Builder();
@@ -74,7 +78,7 @@ public class FlowTrigger implements Serializable {
public String toString() {
return "FlowTrigger{" +
"schedule=" + this.schedule +
- ", maxWaitDurationInMins=" + this.maxWaitDuration.toMinutes() +
+ ", maxWaitDurationInMins=" + this.maxWaitDuration +
"\n " + StringUtils.join(this.dependencies.values(), "\n") + '}';
}
@@ -105,8 +109,8 @@ public class FlowTrigger implements Serializable {
return this.dependencies.values();
}
- public Duration getMaxWaitDuration() {
- return this.maxWaitDuration;
+ public Optional<Duration> getMaxWaitDuration() {
+ return Optional.ofNullable(this.maxWaitDuration);
}
public CronSchedule getSchedule() {
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
index e2c9756..f812dc8 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
@@ -17,7 +17,6 @@
package azkaban.project;
-import azkaban.Constants;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -27,15 +26,15 @@ import java.util.Map;
*/
public class FlowTriggerBean {
- private long maxWaitMins = Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes();
+ private Long maxWaitMins = null;
private Map<String, String> schedule;
private List<TriggerDependencyBean> triggerDependencies;
- public long getMaxWaitMins() {
+ public Long getMaxWaitMins() {
return this.maxWaitMins;
}
- public void setMaxWaitMins(final long maxWaitMins) {
+ public void setMaxWaitMins(final Long maxWaitMins) {
this.maxWaitMins = maxWaitMins;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 323a68f..273793b 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -121,13 +121,21 @@ public class NodeBeanLoader {
}
private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean) {
- // validate max wait mins
- Preconditions.checkArgument(flowTriggerBean.getMaxWaitMins() >= Constants
- .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes(), "max wait min must be at least " + Constants
- .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes() + " min(s)");
-
validateSchedule(flowTriggerBean);
validateTriggerDependencies(flowTriggerBean.getTriggerDependencies());
+ validateMaxWaitMins(flowTriggerBean);
+ }
+
+ private void validateMaxWaitMins(final FlowTriggerBean flowTriggerBean) {
+ Preconditions.checkArgument(flowTriggerBean.getTriggerDependencies().isEmpty() ||
+ flowTriggerBean.getMaxWaitMins() != null,
+ "max wait min cannot be null unless no dependency is defined");
+
+ if (flowTriggerBean.getMaxWaitMins() != null) {
+ Preconditions.checkArgument(flowTriggerBean.getMaxWaitMins() >= Constants
+ .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes(), "max wait min must be at least " + Constants
+ .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes() + " min(s)");
+ }
}
/**
@@ -185,16 +193,20 @@ public class NodeBeanLoader {
return null;
} else {
validateFlowTriggerBean(flowTriggerBean);
- if (flowTriggerBean.getMaxWaitMins() > Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME
+ if (flowTriggerBean.getMaxWaitMins() != null
+ && flowTriggerBean.getMaxWaitMins() > Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME
.toMinutes()) {
flowTriggerBean.setMaxWaitMins(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes());
}
+
+ final Duration duration = flowTriggerBean.getMaxWaitMins() == null ? null : Duration
+ .ofMinutes(flowTriggerBean.getMaxWaitMins());
+
return new FlowTrigger(
new CronSchedule(flowTriggerBean.getSchedule().get(FlowTriggerProps.SCHEDULE_VALUE)),
flowTriggerBean.getTriggerDependencies().stream()
.map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
- .collect(Collectors.toList()),
- Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+ .collect(Collectors.toList()), duration);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
index 9603015..286b509 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -17,10 +17,12 @@
package azkaban.project;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
@@ -48,12 +50,27 @@ public class FlowTriggerTest {
@Test
public void testFlowTriggerArgumentValidation() {
final CronSchedule validSchedule = new CronSchedule("* * * * ? *");
- final List<FlowTriggerDependency> validDependencyList = new ArrayList<>();
- final List<FlowTriggerDependency> invalidDependencyList = null;
+ final CronSchedule nullSchedule = null;
+ final List<FlowTriggerDependency> emptyDependencyList = new ArrayList<>();
+ final List<FlowTriggerDependency> nullDependencyList = null;
+
+ final List<FlowTriggerDependency> nonEmptyDependencyList = Arrays.asList(createTestDependency
+ ("type", "dep1"));
+
final Duration validDuration = Duration.ofMinutes(10);
+ final Duration nullDuration = null;
+
+ assertThatThrownBy(() -> new FlowTrigger(nullSchedule, nonEmptyDependencyList, validDuration))
+ .isInstanceOf(NullPointerException.class);
- assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
+ assertThatThrownBy(() -> new FlowTrigger(validSchedule, nullDependencyList, validDuration))
.isInstanceOf(NullPointerException.class);
+
+ assertThatThrownBy(() -> new FlowTrigger(validSchedule, nonEmptyDependencyList, nullDuration))
+ .isInstanceOf(IllegalArgumentException.class);
+
+ assertThatCode(() -> new FlowTrigger(validSchedule, emptyDependencyList, nullDuration))
+ .doesNotThrowAnyException();
}
@Test
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 9277c3a..defbfec 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -209,17 +209,12 @@ public class NodeBeanLoaderTest {
public void testFlowTriggerMaxWaitMinValidation() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
- NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_max_wait_min.flow"));
- FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
- assertThat(flowTrigger.getMaxWaitDuration())
- .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
- nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
- TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_large_max_wait_min.flow"));
- flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
- assertThat(flowTrigger.getMaxWaitDuration())
- .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("max wait min cannot be null unless no dependency is defined");
final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_zero_max_wait_min.flow"));
@@ -227,6 +222,22 @@ public class NodeBeanLoaderTest {
assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
.isInstanceOf(IllegalArgumentException.class).hasMessage("max wait min must be at least 1"
+ " min(s)");
+
+ NodeBean nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_large_max_wait_min.flow"));
+ FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean3.getTrigger());
+ assertThat(flowTrigger.getMaxWaitDuration().orElse(null))
+ .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+
+ nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_max_wait_min_zero_dep.flow"));
+ flowTrigger = loader.toFlowTrigger(nodeBean3.getTrigger());
+ assertThat(flowTrigger.getMaxWaitDuration().orElse(null)).isEqualTo(null);
+
+ nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_max_wait_min_zero_dep.flow"));
+ flowTrigger = loader.toFlowTrigger(nodeBean3.getTrigger());
+ assertThat(flowTrigger.getMaxWaitDuration().get().toMinutes()).isEqualTo(5);
}
@Test
@@ -450,7 +461,8 @@ public class NodeBeanLoaderTest {
private void validateFlowTrigger(final FlowTrigger flowTrigger, final long maxWaitMins, final
String cronExpression, final int numDependencies) {
- assertThat(flowTrigger.getMaxWaitDuration()).isEqualTo(Duration.ofMinutes(maxWaitMins));
+ assertThat(flowTrigger.getMaxWaitDuration().orElse(null)).isEqualTo(Duration.ofMinutes
+ (maxWaitMins));
assertThat(flowTrigger.getSchedule().getCronExpression()).isEqualTo(cronExpression);
assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index e900761..ce3fd76 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -365,7 +365,7 @@ public class FlowTriggerService {
private long remainingTimeBeforeTimeout(final TriggerInstance triggerInst) {
final long now = System.currentTimeMillis();
return Math.max(0,
- triggerInst.getFlowTrigger().getMaxWaitDuration().toMillis() - (now - triggerInst
+ triggerInst.getFlowTrigger().getMaxWaitDuration().get().toMillis() - (now - triggerInst
.getStartTime()));
}
@@ -403,7 +403,7 @@ public class FlowTriggerService {
// todo chengren311: it's possible web server restarts before the db update, then
// new instance will not be recoverable from db.
addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
- .getMaxWaitDuration(), CancellationCause.TIMEOUT);
+ .getMaxWaitDuration().get(), CancellationCause.TIMEOUT);
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
index fcfbeff..79faea0 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
@@ -73,7 +73,13 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
utils.formatDateTime(res.getQuartzTrigger().getStartTime().getTime()));
jsonObj.put("nextExecTime",
utils.formatDateTime(res.getQuartzTrigger().getNextFireTime().getTime()));
- jsonObj.put("maxWaitMin", res.getFlowTrigger().getMaxWaitDuration().toMinutes());
+
+ Long maxWaitMin = null;
+ if (res.getFlowTrigger().getMaxWaitDuration().isPresent()) {
+ maxWaitMin = res.getFlowTrigger().getMaxWaitDuration().get().toMinutes();
+ }
+ jsonObj.put("maxWaitMin", maxWaitMin);
+
if (!res.getFlowTrigger().getDependencies().isEmpty()) {
jsonObj.put("dependencies", res.getDependencyListJson());
}
@@ -81,7 +87,7 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
}
}
- private boolean checkProjectIdAndFlowId(HttpServletRequest req) {
+ private boolean checkProjectIdAndFlowId(final HttpServletRequest req) {
return hasParam(req, "projectId") && hasParam(req, "flowId");
}
@@ -104,19 +110,17 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
if (project == null) {
ret.put("error", "please specify a valid project id");
- }
- else if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
+ } else if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
ret.put("error", "Permission denied. Need ADMIN access.");
- }
- else {
+ } else {
try {
- if(ajaxName.equals("pauseTrigger")) {
+ if (ajaxName.equals("pauseTrigger")) {
this.scheduler.pauseFlowTrigger(projectId, flowId);
} else {
this.scheduler.resumeFlowTrigger(projectId, flowId);
}
ret.put("status", "success");
- } catch (SchedulerException ex) {
+ } catch (final SchedulerException ex) {
ret.put("error", ex.getMessage());
}
}
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
index 608e958..0dfd82b 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
@@ -39,7 +39,7 @@
<script type="text/javascript">
function pauseTrigger(projectId, flowId) {
var requestURL = document.location.href;
- var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "pauseTrigger"};
+ var requestData = {"projectId": projectId, "flowId": flowId, "ajax": "pauseTrigger"};
var successHandler = function (data) {
if (data.error) {
showDialog("Error", data.error);
@@ -53,7 +53,7 @@
function resumeTrigger(projectId, flowId) {
var requestURL = document.location.href;
- var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "resumeTrigger"};
+ var requestData = {"projectId": projectId, "flowId": flowId, "ajax": "resumeTrigger"};
var successHandler = function (data) {
if (data.error) {
showDialog("Error", data.error);
@@ -130,7 +130,12 @@
<td>$utils.formatDate(${trigger.getQuartzTrigger().getNextFireTime().getTime()})</td>
<td>${trigger.getFlowTrigger().getSchedule().getCronExpression()}</td>
- <td>${trigger.getFlowTrigger().getMaxWaitDuration().toMinutes()}</td>
+
+ #if (${trigger.getFlowTrigger().getMaxWaitDuration().isPresent()} == "true")
+ <td>${trigger.getFlowTrigger().getMaxWaitDuration().get().toMinutes()}</td>
+ #else
+ <td>-</td>
+ #end
## adopted from template to show executionOption in scheduledflowpage.vm
@@ -165,15 +170,17 @@
<td>${trigger.isPaused()}</td>
- ##todo chengren311: add pause/resume button in flow page.
+ ##todo chengren311: add pause/resume button in flow page.
<td>
#if (${trigger.isPaused()} == "false")
<button type="button" id="pausebtn" class="btn btn-danger btn-sm"
- onclick="pauseTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Pause
+ onclick="pauseTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">
+ Pause
</button>
#else
<button type="button" id="resumebtn" class="btn btn-info btn-sm"
- onclick="resumeTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Resume
+ onclick="resumeTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">
+ Resume
</button>
#end
</td>
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_max_wait_min_zero_dep.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_max_wait_min_zero_dep.flow
new file mode 100644
index 0000000..6923811
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_max_wait_min_zero_dep.flow
@@ -0,0 +1,43 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min_zero_dep.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min_zero_dep.flow
new file mode 100644
index 0000000..c73260b
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min_zero_dep.flow
@@ -0,0 +1,42 @@
+---
+# Flow trigger
+trigger:
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd