azkaban-developers

FlowTrigger and FlowTriggerDependency update (#1609) This

1/23/2018 5:28:15 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 9a26c73..039dbf7 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -17,21 +17,25 @@
 package azkaban.project;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import java.io.Serializable;
 import java.time.Duration;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * FlowTrigger is the logical representation of a trigger.
  * It couldn't be changed once gets constructed.
  * It will be used to create running trigger instance.
  */
-public class FlowTrigger {
+public class FlowTrigger implements Serializable {
 
-  private final List<FlowTriggerDependency> dependencies;
+  private final Map<String, FlowTriggerDependency> dependencies;
   private final CronSchedule schedule;
   private final Duration maxWaitDuration;
 
@@ -39,15 +43,18 @@ public class FlowTrigger {
    * @throws IllegalArgumentException if any of the argument is null or there is duplicate
    * dependency name or duplicate dependency type and params
    */
-  public FlowTrigger(final CronSchedule schedule,
-      final List<FlowTriggerDependency> dependencies, final Duration maxWaitDuration) {
-    Preconditions.checkArgument(schedule != null);
-    Preconditions.checkArgument(dependencies != null);
-    Preconditions.checkArgument(maxWaitDuration != null);
-    Preconditions.checkArgument(!maxWaitDuration.isNegative());
+  public FlowTrigger(final CronSchedule schedule, final List<FlowTriggerDependency> dependencies,
+      final Duration maxWaitDuration) {
+    Preconditions.checkNotNull(schedule, "schedule cannot be null");
+    Preconditions.checkNotNull(dependencies, "dependency cannot be null");
+    Preconditions.checkNotNull(maxWaitDuration, "max wait time cannot be null");
+    Preconditions.checkArgument(maxWaitDuration.toMinutes() >= 1, "max wait time should be "
+        + "longer than 1 min");
     validateDependencies(dependencies);
     this.schedule = schedule;
-    this.dependencies = Collections.unmodifiableList(dependencies);
+    final ImmutableMap.Builder builder = new Builder();
+    dependencies.forEach(dep -> builder.put(dep.getName(), dep));
+    this.dependencies = builder.build();
     this.maxWaitDuration = maxWaitDuration;
   }
 
@@ -63,6 +70,14 @@ public class FlowTrigger {
     }
   }
 
+  @Override
+  public String toString() {
+    return "FlowTrigger{" +
+        "schedule=" + this.schedule +
+        ", maxWaitDurationInMins=" + this.maxWaitDuration.toMinutes() +
+        "\n " + StringUtils.join(this.dependencies.values(), "\n") + '}';
+  }
+
   /**
    * check uniqueness of dependency type and params
    */
@@ -83,17 +98,12 @@ public class FlowTrigger {
     validateDepDefinitionUniqueness(dependencies);
   }
 
-  @Override
-  public String toString() {
-    return "FlowTrigger{" +
-        "dependencies=" + this.dependencies +
-        ", schedule=" + this.schedule +
-        ", maxWaitDuration=" + this.maxWaitDuration +
-        '}';
+  public FlowTriggerDependency getDependencyByName(final String name) {
+    return this.dependencies.get(name);
   }
 
-  public List<FlowTriggerDependency> getDependencies() {
-    return this.dependencies;
+  public Collection<FlowTriggerDependency> getDependencies() {
+    return this.dependencies.values();
   }
 
   public Duration getMaxWaitDuration() {
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
index c161cb1..f20d6c1 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
@@ -17,6 +17,7 @@
 package azkaban.project;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.lang.StringUtils;
@@ -26,7 +27,7 @@ import org.apache.commons.lang.StringUtils;
  * It couldn't be changed once gets constructed.
  * It will be used to create running dependency instance.
  */
-public class FlowTriggerDependency {
+public class FlowTriggerDependency implements Serializable {
 
   private final Map<String, String> props;
   private final String name;
@@ -63,9 +64,9 @@ public class FlowTriggerDependency {
   @Override
   public String toString() {
     return "FlowTriggerDependency{" +
-        "props=" + this.props +
-        ", name='" + this.name + '\'' +
+        "name='" + this.name + '\'' +
         ", type='" + this.type + '\'' +
+        ", props=" + this.props +
         '}';
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
index 7584e7e..350c779 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -54,7 +54,7 @@ public class FlowTriggerTest {
     final Duration invalidDuration = Duration.ofMinutes(-1);
 
     assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
-        .isInstanceOf(IllegalArgumentException.class);
+        .isInstanceOf(NullPointerException.class);
 
     assertThatThrownBy(() -> new FlowTrigger(validSchedule, validDependencyList, invalidDuration))
         .isInstanceOf(IllegalArgumentException.class);
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index f89b218..b999e35 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -22,13 +22,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 import azkaban.Constants;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
-import java.io.File;
-import org.apache.commons.io.FileUtils;
 import com.google.common.collect.ImmutableMap;
+import java.io.File;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.junit.Test;
 
 public class NodeBeanLoaderTest {
@@ -198,10 +198,8 @@ public class NodeBeanLoaderTest {
         TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
     final FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
     validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
-    validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
-        PARAMS_1);
-    validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
-        PARAMS_2);
+    validateTriggerDependency(flowTrigger, TRIGGER_NAME_1, TRIGGER_TYPE, PARAMS_1);
+    validateTriggerDependency(flowTrigger, TRIGGER_NAME_2, TRIGGER_TYPE, PARAMS_2);
   }
 
   @Test
@@ -211,10 +209,10 @@ public class NodeBeanLoaderTest {
         TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
     final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
     validateFlowTrigger(flow.getFlowTrigger(), MAX_WAIT_MINS, CRON_EXPRESSION, 2);
-    validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(0), TRIGGER_NAME_1,
+    validateTriggerDependency(flow.getFlowTrigger(), TRIGGER_NAME_1,
         TRIGGER_TYPE,
         PARAMS_1);
-    validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(1), TRIGGER_NAME_2,
+    validateTriggerDependency(flow.getFlowTrigger(), TRIGGER_NAME_2,
         TRIGGER_TYPE,
         PARAMS_2);
   }
@@ -316,9 +314,9 @@ public class NodeBeanLoaderTest {
     final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
         ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
     validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
-    validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+    validateTriggerDependency(flowTrigger, TRIGGER_NAME_1, TRIGGER_TYPE,
         PARAMS_1);
-    validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+    validateTriggerDependency(flowTrigger, TRIGGER_NAME_2, TRIGGER_TYPE,
         PARAMS_2);
   }
 
@@ -373,10 +371,10 @@ public class NodeBeanLoaderTest {
     assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
   }
 
-  private void validateTriggerDependency(final FlowTriggerDependency flowTriggerDependency, final
+  private void validateTriggerDependency(final FlowTrigger flowTrigger, final
   String name, final String type, final Map<String, String> params) {
-    assertThat(flowTriggerDependency.getName()).isEqualTo(name);
-    assertThat(flowTriggerDependency.getType()).isEqualTo(type);
-    assertThat(flowTriggerDependency.getProps()).isEqualTo(params);
+    assertThat(flowTrigger.getDependencyByName(name)).isNotNull();
+    assertThat(flowTrigger.getDependencyByName(name).getType()).isEqualTo(type);
+    assertThat(flowTrigger.getDependencyByName(name).getProps()).isEqualTo(params);
   }
 }