diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 9a26c73..039dbf7 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -17,21 +17,25 @@
package azkaban.project;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import java.io.Serializable;
import java.time.Duration;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang.StringUtils;
/**
* FlowTrigger is the logical representation of a trigger.
* It couldn't be changed once gets constructed.
* It will be used to create running trigger instance.
*/
-public class FlowTrigger {
+public class FlowTrigger implements Serializable {
- private final List<FlowTriggerDependency> dependencies;
+ private final Map<String, FlowTriggerDependency> dependencies;
private final CronSchedule schedule;
private final Duration maxWaitDuration;
@@ -39,15 +43,18 @@ public class FlowTrigger {
* @throws IllegalArgumentException if any of the argument is null or there is duplicate
* dependency name or duplicate dependency type and params
*/
- public FlowTrigger(final CronSchedule schedule,
- final List<FlowTriggerDependency> dependencies, final Duration maxWaitDuration) {
- Preconditions.checkArgument(schedule != null);
- Preconditions.checkArgument(dependencies != null);
- Preconditions.checkArgument(maxWaitDuration != null);
- Preconditions.checkArgument(!maxWaitDuration.isNegative());
+ public FlowTrigger(final CronSchedule schedule, final List<FlowTriggerDependency> dependencies,
+ final Duration maxWaitDuration) {
+ Preconditions.checkNotNull(schedule, "schedule cannot be null");
+ Preconditions.checkNotNull(dependencies, "dependency cannot be null");
+ Preconditions.checkNotNull(maxWaitDuration, "max wait time cannot be null");
+ Preconditions.checkArgument(maxWaitDuration.toMinutes() >= 1, "max wait time should be "
+ + "longer than 1 min");
validateDependencies(dependencies);
this.schedule = schedule;
- this.dependencies = Collections.unmodifiableList(dependencies);
+ final ImmutableMap.Builder builder = new Builder();
+ dependencies.forEach(dep -> builder.put(dep.getName(), dep));
+ this.dependencies = builder.build();
this.maxWaitDuration = maxWaitDuration;
}
@@ -63,6 +70,14 @@ public class FlowTrigger {
}
}
+ @Override
+ public String toString() {
+ return "FlowTrigger{" +
+ "schedule=" + this.schedule +
+ ", maxWaitDurationInMins=" + this.maxWaitDuration.toMinutes() +
+ "\n " + StringUtils.join(this.dependencies.values(), "\n") + '}';
+ }
+
/**
* check uniqueness of dependency type and params
*/
@@ -83,17 +98,12 @@ public class FlowTrigger {
validateDepDefinitionUniqueness(dependencies);
}
- @Override
- public String toString() {
- return "FlowTrigger{" +
- "dependencies=" + this.dependencies +
- ", schedule=" + this.schedule +
- ", maxWaitDuration=" + this.maxWaitDuration +
- '}';
+ public FlowTriggerDependency getDependencyByName(final String name) {
+ return this.dependencies.get(name);
}
- public List<FlowTriggerDependency> getDependencies() {
- return this.dependencies;
+ public Collection<FlowTriggerDependency> getDependencies() {
+ return this.dependencies.values();
}
public Duration getMaxWaitDuration() {
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
index c161cb1..f20d6c1 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
@@ -17,6 +17,7 @@
package azkaban.project;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
@@ -26,7 +27,7 @@ import org.apache.commons.lang.StringUtils;
* It couldn't be changed once gets constructed.
* It will be used to create running dependency instance.
*/
-public class FlowTriggerDependency {
+public class FlowTriggerDependency implements Serializable {
private final Map<String, String> props;
private final String name;
@@ -63,9 +64,9 @@ public class FlowTriggerDependency {
@Override
public String toString() {
return "FlowTriggerDependency{" +
- "props=" + this.props +
- ", name='" + this.name + '\'' +
+ "name='" + this.name + '\'' +
", type='" + this.type + '\'' +
+ ", props=" + this.props +
'}';
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
index 7584e7e..350c779 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -54,7 +54,7 @@ public class FlowTriggerTest {
final Duration invalidDuration = Duration.ofMinutes(-1);
assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
- .isInstanceOf(IllegalArgumentException.class);
+ .isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> new FlowTrigger(validSchedule, validDependencyList, invalidDuration))
.isInstanceOf(IllegalArgumentException.class);
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index f89b218..b999e35 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -22,13 +22,13 @@ import static org.assertj.core.api.Assertions.assertThat;
import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
-import java.io.File;
-import org.apache.commons.io.FileUtils;
import com.google.common.collect.ImmutableMap;
+import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.junit.Test;
public class NodeBeanLoaderTest {
@@ -198,10 +198,8 @@ public class NodeBeanLoaderTest {
TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
final FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
- validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
- PARAMS_1);
- validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
- PARAMS_2);
+ validateTriggerDependency(flowTrigger, TRIGGER_NAME_1, TRIGGER_TYPE, PARAMS_1);
+ validateTriggerDependency(flowTrigger, TRIGGER_NAME_2, TRIGGER_TYPE, PARAMS_2);
}
@Test
@@ -211,10 +209,10 @@ public class NodeBeanLoaderTest {
TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
validateFlowTrigger(flow.getFlowTrigger(), MAX_WAIT_MINS, CRON_EXPRESSION, 2);
- validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(0), TRIGGER_NAME_1,
+ validateTriggerDependency(flow.getFlowTrigger(), TRIGGER_NAME_1,
TRIGGER_TYPE,
PARAMS_1);
- validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(1), TRIGGER_NAME_2,
+ validateTriggerDependency(flow.getFlowTrigger(), TRIGGER_NAME_2,
TRIGGER_TYPE,
PARAMS_2);
}
@@ -316,9 +314,9 @@ public class NodeBeanLoaderTest {
final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
- validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+ validateTriggerDependency(flowTrigger, TRIGGER_NAME_1, TRIGGER_TYPE,
PARAMS_1);
- validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+ validateTriggerDependency(flowTrigger, TRIGGER_NAME_2, TRIGGER_TYPE,
PARAMS_2);
}
@@ -373,10 +371,10 @@ public class NodeBeanLoaderTest {
assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
}
- private void validateTriggerDependency(final FlowTriggerDependency flowTriggerDependency, final
+ private void validateTriggerDependency(final FlowTrigger flowTrigger, final
String name, final String type, final Map<String, String> params) {
- assertThat(flowTriggerDependency.getName()).isEqualTo(name);
- assertThat(flowTriggerDependency.getType()).isEqualTo(type);
- assertThat(flowTriggerDependency.getProps()).isEqualTo(params);
+ assertThat(flowTrigger.getDependencyByName(name)).isNotNull();
+ assertThat(flowTrigger.getDependencyByName(name).getType()).isEqualTo(type);
+ assertThat(flowTrigger.getDependencyByName(name).getProps()).isEqualTo(params);
}
}