azkaban-aplcache

new FlowTrigger and Dependency class (#1482) FlowTrigger

9/28/2017 9:36:47 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 4c1172a..53e006c 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -89,7 +89,7 @@ public class Constants {
 
     // User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.
     // enduser -> myazkabanhost:443 -> proxy -> localhost:8081
-    // when this parameters set then these parameters are used to generate email links. 
+    // when this parameters set then these parameters are used to generate email links.
     // if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
     public static final String AZKABAN_WEBSERVER_EXTERNAL_HOSTNAME = "azkaban.webserver.external_hostname";
     public static final String AZKABAN_WEBSERVER_EXTERNAL_SSL_PORT = "azkaban.webserver.external_ssl_port";
diff --git a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
new file mode 100644
index 0000000..9875b9a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.project;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * FlowTriggerSchedule is the logical representation of a cron-based schedule.
+ * It couldn't be changed once gets constructed.
+ * It will be used to schedule a trigger.
+ */
+public class CronSchedule {
+
+  private final String cronExpression;
+
+  /**
+   * @throws IllegalArgumentException if cronExpression is null or blank
+   */
+  public CronSchedule(final String cronExpression) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(cronExpression));
+    this.cronExpression = cronExpression;
+    //todo chengren311: check cronExpression is valid: quartz has CronExpression.isValidExpression()
+  }
+
+  public String getCronExpression() {
+    return this.cronExpression;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
new file mode 100644
index 0000000..98d5fb1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.project;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * FlowTrigger is the logical representation of a trigger.
+ * It couldn't be changed once gets constructed.
+ * It will be used to create running trigger instance.
+ */
+public class FlowTrigger {
+
+  private final List<FlowTriggerDependency> dependencies;
+  private final CronSchedule schedule;
+  private final Duration maxWaitDuration;
+
+  /**
+   * @throws IllegalArgumentException if any of the argument is null or there is duplicate
+   * dependency name or duplicate dependency type and params
+   */
+  public FlowTrigger(final CronSchedule schedule,
+      final List<FlowTriggerDependency> dependencies, final Duration maxWaitDuration) {
+    Preconditions.checkArgument(schedule != null);
+    Preconditions.checkArgument(dependencies != null);
+    Preconditions.checkArgument(maxWaitDuration != null);
+    Preconditions.checkArgument(!maxWaitDuration.isNegative());
+    validateDependencies(dependencies);
+    this.schedule = schedule;
+    this.dependencies = Collections.unmodifiableList(dependencies);
+    this.maxWaitDuration = maxWaitDuration;
+  }
+
+  /**
+   * check uniqueness of dependency.name
+   */
+  private void validateDepNameUniqueness(final List<FlowTriggerDependency> dependencies) {
+    final Set<String> seen = new HashSet<>();
+    for (final FlowTriggerDependency dep : dependencies) {
+      // set.add() returns false when there exists duplicate
+      Preconditions.checkArgument(seen.add(dep.getName()), String.format("duplicate dependency"
+          + ".name %s found, dependency.name should be unique", dep.getName()));
+    }
+  }
+
+  /**
+   * check uniqueness of dependency type and params
+   */
+  private void validateDepDefinitionUniqueness(final List<FlowTriggerDependency> dependencies) {
+    final Set<String> seen = new HashSet<>();
+    for (final FlowTriggerDependency dep : dependencies) {
+      final Map<String, String> props = dep.getProps();
+      // set.add() returns false when there exists duplicate
+      Preconditions.checkArgument(seen.add(dep.getType() + ":" + props.toString()), String.format
+          ("duplicate "
+              + "dependency"
+              + "config %s found, dependency config should be unique", dep.getName()));
+    }
+  }
+
+  private void validateDependencies(final List<FlowTriggerDependency> dependencies) {
+    validateDepNameUniqueness(dependencies);
+    validateDepDefinitionUniqueness(dependencies);
+  }
+
+  @Override
+  public String toString() {
+    return "FlowTrigger{" +
+        "dependencies=" + this.dependencies +
+        ", schedule=" + this.schedule +
+        ", maxWaitDuration=" + this.maxWaitDuration +
+        '}';
+  }
+
+  public List<FlowTriggerDependency> getDependencies() {
+    return this.dependencies;
+  }
+
+  public Duration getMaxWaitDuration() {
+    return this.maxWaitDuration;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
new file mode 100644
index 0000000..c161cb1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerDependency.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.project;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * FlowTriggerDependency is the logic representation of a trigger dependency.
+ * It couldn't be changed once gets constructed.
+ * It will be used to create running dependency instance.
+ */
+public class FlowTriggerDependency {
+
+  private final Map<String, String> props;
+  private final String name;
+  private final String type;
+
+  /**
+   * @throws IllegalArgumentException if name or type is null or blank
+   * @throws IllegalArgumentException if depProps is null
+   */
+  public FlowTriggerDependency(final String name, final String type, final Map<String, String>
+      depProps) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(name));
+    Preconditions.checkArgument(StringUtils.isNotBlank(type));
+    Preconditions.checkArgument(depProps != null);
+    this.name = name;
+    this.type = type;
+    this.props = Collections.unmodifiableMap(depProps);
+    //todo chengren311: validate per dependencyType: some dependency type might need extra special
+    //check, also check if it's a valid dependency type
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  public String getType() {
+    return this.type;
+  }
+
+  public Map<String, String> getProps() {
+    return this.props;
+  }
+
+  @Override
+  public String toString() {
+    return "FlowTriggerDependency{" +
+        "props=" + this.props +
+        ", name='" + this.name + '\'' +
+        ", type='" + this.type + '\'' +
+        '}';
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
new file mode 100644
index 0000000..7584e7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.project;
+
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import org.junit.Test;
+
+
+public class FlowTriggerTest {
+
+  private FlowTriggerDependency createUniqueTestDependency(final String type) {
+    final UUID uuid = UUID.randomUUID();
+    return createTestDependency(type, uuid.toString());
+  }
+
+  private FlowTriggerDependency createTestDependency(final String type, final String name) {
+    final FlowTriggerDependency dep = new FlowTriggerDependency(name, type, new HashMap<>());
+    return dep;
+  }
+
+  @Test
+  public void testScheduleArgumentValidation() {
+    assertThatThrownBy(() -> new CronSchedule(""))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testFlowTriggerArgumentValidation() {
+    final CronSchedule validSchedule = new CronSchedule("* * * * ? *");
+    final List<FlowTriggerDependency> validDependencyList = new ArrayList<>();
+    final List<FlowTriggerDependency> invalidDependencyList = null;
+    final Duration validDuration = Duration.ofMinutes(10);
+    final Duration invalidDuration = Duration.ofMinutes(-1);
+
+    assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
+        .isInstanceOf(IllegalArgumentException.class);
+
+    assertThatThrownBy(() -> new FlowTrigger(validSchedule, validDependencyList, invalidDuration))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testDuplicateDependencies() {
+    final FlowTriggerDependency dep = createUniqueTestDependency("type");
+
+    final CronSchedule schedule = new CronSchedule("* * * * ? *");
+    final List<FlowTriggerDependency> dependencyList = new ArrayList<>();
+    dependencyList.add(dep);
+    dependencyList.add(dep);
+
+    assertThatThrownBy(() -> new FlowTrigger(schedule, dependencyList, Duration.ofMinutes(10)))
+        .isInstanceOf
+            (IllegalArgumentException
+                .class)
+        .hasMessageContaining("dependency.name should be unique");
+  }
+
+  @Test
+  public void testDifferentDepNameSameDepConfig() {
+    final FlowTriggerDependency dep1 = createTestDependency("type", "dep1");
+    final FlowTriggerDependency dep2 = createTestDependency("type", "dep2");
+
+    final CronSchedule schedule = new CronSchedule("* * * * ? *");
+    final List<FlowTriggerDependency> dependencyList = new ArrayList<>();
+    dependencyList.add(dep1);
+    dependencyList.add(dep2);
+
+    assertThatThrownBy(() -> new FlowTrigger(schedule, dependencyList, Duration.ofMinutes(10)))
+        .isInstanceOf
+            (IllegalArgumentException
+                .class)
+        .hasMessageContaining("dependency config should be unique");
+  }
+}