azkaban-aplcache
Changes
az-flow-trigger-dependency-type/build.gradle 17(+17 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/Constants.java 34(+34 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyCheck.java 103(+103 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyInstanceContext.java 67(+67 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDepInstanceCollection.java 171(+171 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaEventMonitor.java 164(+164 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/matcher/DependencyMatcher.java 28(+28 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/RegexKafkaDependencyMatcher.java 36(+36 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaDepInstanceCollectionTest.java 131(+131 -0)
az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaEventMonitorTest.java 59(+59 -0)
settings.gradle 4(+3 -1)
Details
az-flow-trigger-dependency-type/build.gradle 17(+17 -0)
diff --git a/az-flow-trigger-dependency-type/build.gradle b/az-flow-trigger-dependency-type/build.gradle
new file mode 100644
index 0000000..29dae93
--- /dev/null
+++ b/az-flow-trigger-dependency-type/build.gradle
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+//Intentionally leave this blank for future needs by adding new dependencies
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/build.gradle b/az-flow-trigger-dependency-type/kafka-event-trigger/build.gradle
new file mode 100644
index 0000000..8e60176
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/build.gradle
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+apply plugin: 'java'
+
+
+ext.deps = [
+ // External dependencies
+ assertj : 'org.assertj:assertj-core:3.8.0',
+ ]
+
+
+dependencies {
+ compileOnly project(":az-flow-trigger-dependency-plugin")
+ testCompile 'junit:junit:4.12'
+ testCompile 'org.assertj:assertj-core:3.8.0'
+ compile 'org.mockito:mockito-core:2.10.0'
+ compile 'org.apache.kafka:kafka_2.11:0.10.2.0'
+ compile 'org.apache.kafka:kafka-clients:1.0.0'
+ compile 'org.apache.kafka:connect-json:1.0.0'
+ compile 'org.apache.curator:curator-recipes:2.8.0'
+ compile 'org.apache.curator:curator-test:2.8.0'
+ compile 'org.apache.avro:avro:1.8.1'
+ compile 'org.apache.avro:avro-tools:1.8.1'
+ compile 'com.squareup.okhttp3:okhttp:3.7.0'
+ compile 'com.fasterxml.jackson.core:jackson-core:2.9.2'
+ compile 'com.fasterxml.jackson.core:jackson-annotations:2.9.2'
+ compile 'com.fasterxml.jackson.core:jackson-databind:2.9.2'
+ compile 'com.google.guava:guava:25.1-jre'
+}
+
+task fatJar(type: Jar) {
+ classifier = "fat" //so that 'fat' is included in jar name
+ from sourceSets.main.output // include our compiled source code in the JAR
+ //from configurations.compile.files
+
+ from {
+ configurations.compile
+ .collect {
+ it.name.endsWith('.jar') ? zipTree(it).matching {
+ exclude("log4j.properties")
+ } : it
+ }
+ }
+
+ fileMode 0755 // sometimes jars get unpacked with read-only permissions, this prevents errors when re-running
+}
+//If you want to publish the fatJar, include this
+artifacts {
+ archives fatJar
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/Constants.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/Constants.java
new file mode 100644
index 0000000..2c316cb
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/Constants.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+public class Constants {
+ public static class DependencyPluginConfigKey {
+ //Define where the Kafka brocker is located.
+ public static final String KAKFA_BROKER_URL = "kafka.broker.url";
+ }
+
+ /**
+ * Required properties for dependencies
+ */
+ public static class DependencyInstanceConfigKey {
+ public static final String NAME = "name";
+ public static final String TOPIC = "topic";
+ public static final String MATCH = "match";
+ }
+
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyCheck.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyCheck.java
new file mode 100644
index 0000000..dd03136
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyCheck.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import azkaban.flowtrigger.DependencyPluginConfig;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import trigger.kafka.Constants.DependencyInstanceConfigKey;
+import trigger.kafka.Constants.DependencyPluginConfigKey;
+
+
+/**
+ * A factory class which maintaines all KafkaDependencyInstanceContext and creates new KafkaDependencyInstanceContext based on the configuration file.
+ */
+
+@SuppressWarnings("FutureReturnValueIgnored")
+public class KafkaDependencyCheck implements DependencyCheck {
+ private final static Logger log = LoggerFactory.getLogger(KafkaDependencyCheck.class);
+ private final ExecutorService executorService;
+ private KafkaEventMonitor dependencyMonitor;
+
+ public KafkaDependencyCheck() {
+ this.executorService = Executors.newSingleThreadExecutor();
+ }
+
+ public void remove(final DependencyInstanceContext depContext) {
+ final KafkaDependencyInstanceContext depContextCasted = (KafkaDependencyInstanceContext) depContext;
+ this.dependencyMonitor.remove(depContextCasted);
+ }
+
+ private void validate(final DependencyInstanceConfig config, final DependencyInstanceRuntimeProps runtimeProps) {
+ final String LOG_SUFFIX = String.format("for dependency name: %s", config.get(DependencyInstanceConfigKey.NAME));
+
+ final String topic = config.get(DependencyInstanceConfigKey.TOPIC);
+ final String match = config.get(DependencyInstanceConfigKey.MATCH);
+ Preconditions.checkNotNull(topic, DependencyInstanceConfigKey.TOPIC + " cannot be null " + LOG_SUFFIX);
+ Preconditions.checkNotNull(match, DependencyInstanceConfigKey.MATCH + " cannot be null " + LOG_SUFFIX);
+ }
+
+ @Override
+ public DependencyInstanceContext run(final DependencyInstanceConfig config,
+ final DependencyInstanceRuntimeProps runtimeProps, final DependencyInstanceCallback callback) {
+ this.validate(config, runtimeProps);
+ final KafkaDependencyInstanceContext depInstance = new KafkaDependencyInstanceContext(config, this, callback);
+
+ this.dependencyMonitor.add(depInstance);
+ return depInstance;
+ }
+
+ @Override
+ public void shutdown() {
+ log.info("Shutting down KafkaDependencyCheck");
+ // disallow new tasks
+ this.executorService.shutdown();
+
+ try {
+ // interrupt current threads;
+ this.executorService.shutdownNow();
+ // Wait a while for tasks to respond to being cancelled
+ if (!this.executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ log.error("KafkaDependencyCheck does not terminate.");
+ }
+ } catch (final InterruptedException ex) {
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void init(final DependencyPluginConfig config) {
+ final Set<String> required = Sets.newHashSet(DependencyPluginConfigKey.KAKFA_BROKER_URL);
+ for (final String requiredField : required) {
+ Preconditions.checkNotNull(config.get(requiredField), requiredField + " is required");
+ }
+ this.dependencyMonitor = new KafkaEventMonitor(config);
+ this.executorService.submit(this.dependencyMonitor);
+ }
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyInstanceContext.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyInstanceContext.java
new file mode 100644
index 0000000..db57009
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDependencyInstanceContext.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+import trigger.kafka.Constants.DependencyInstanceConfigKey;
+
+/**
+ * KafkaDependencyInstanceContext maintains attributes of a running instance of kafka dependency.
+ */
+public class KafkaDependencyInstanceContext implements DependencyInstanceContext {
+ private final static Logger log = LoggerFactory.getLogger(KafkaDependencyInstanceContext.class);
+ private final KafkaDependencyCheck depCheck;
+ private final DependencyInstanceCallback callback;
+ private final String topicName;
+ private final String regexMatch;
+ private final String depName;
+
+ public KafkaDependencyInstanceContext(final DependencyInstanceConfig config,
+ final KafkaDependencyCheck dependencyCheck, final DependencyInstanceCallback callback) {
+ this.topicName = config.get(DependencyInstanceConfigKey.TOPIC);
+ this.callback = callback;
+ this.depCheck = dependencyCheck;
+ this.regexMatch = config.get(DependencyInstanceConfigKey.MATCH);
+ this.depName = config.get(DependencyInstanceConfigKey.NAME);
+ }
+
+ @Override
+ public void cancel() {
+ log.info(String.format("Canceling dependency %s", this));
+ this.depCheck.remove(this);
+ this.callback.onCancel(this);
+ }
+
+ public String getRegexMatch() {
+
+ return this.regexMatch;
+ }
+
+ public String getTopicName() {
+
+ return this.topicName;
+ }
+
+ public DependencyInstanceCallback getCallback() {
+
+ return this.callback;
+ }
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDepInstanceCollection.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDepInstanceCollection.java
new file mode 100644
index 0000000..de5714d
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaDepInstanceCollection.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+
+/**
+ * A map data structure that enables efficient lookup by topic and adding/removing topic event pairs.
+ * Structure looks like:
+ * {
+ * -Topic1:{
+ * ----Rule1
+ * ---------[List of dependencies]
+ * ----Rule2
+ * ---------[List of dependencies]
+ * }
+ * -Topic2:{
+ * ----Rule1
+ * ---------[List of dependencies]
+ * ----Rule2
+ * ---------[List of dependencies]
+ * }
+ * }
+ *
+ */
+public class KafkaDepInstanceCollection {
+
+ private final Map<String, Map<String, List<KafkaDependencyInstanceContext>>> topicEventMap;
+
+ public KafkaDepInstanceCollection() {
+ this.topicEventMap = new HashMap<>();
+ }
+
+ public synchronized void add(final KafkaDependencyInstanceContext dep) {
+ final String topic = dep.getTopicName();
+ Map<String, List<KafkaDependencyInstanceContext>> eventMap = this.topicEventMap.get(topic);
+ List<KafkaDependencyInstanceContext> depList;
+ if (eventMap == null) {
+ eventMap = new HashMap<>();
+ depList = new LinkedList<>();
+ } else {
+ depList = eventMap.get(dep.getRegexMatch());
+ if (depList == null) {
+ depList = new LinkedList<>();
+ }
+ }
+ depList.add(dep);
+ eventMap.put(dep.getRegexMatch(), depList);
+ this.topicEventMap.put(topic, eventMap);
+ }
+
+ public boolean hasTopic(final String topic) {
+ return !(this.topicEventMap.get(topic) == null);
+ }
+
+ /**
+ * Get a list of topics.
+ * @return List of String of topics
+ */
+ public synchronized List<String> getTopicList() {
+ final List<String> res = new ArrayList<>(this.topicEventMap.keySet());
+ return res;
+ }
+
+ /**
+ * Return a set of pattern that matches with the payload.
+ * @param payload and topic
+ * @return regexs that meet the customized requirement
+ */
+ public synchronized Set<String> regexInTopic(final String topic, final String payload) {
+ final Set<String> res = new HashSet<>();
+ final Map<String, List<KafkaDependencyInstanceContext>> eventMap = this.topicEventMap.get(topic);
+ if (eventMap == null) {
+ return Collections.emptySet();
+ }
+
+ for (final Map.Entry<String, List<KafkaDependencyInstanceContext>> entry : eventMap.entrySet()) {
+ final RegexKafkaDependencyMatcher matcher = new RegexKafkaDependencyMatcher(Pattern.compile(entry.getKey()));
+ if (matcher.isMatch(payload)) {
+ res.add(entry.getKey());
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Returns dependencies with topic and dependency's event regular expression match
+ */
+ public synchronized List<KafkaDependencyInstanceContext> getDepsByTopicAndEvent(final String topic,
+ final String regex) {
+ final Map<String, List<KafkaDependencyInstanceContext>> regexMap = this.topicEventMap.get(topic);
+ if (regexMap != null) {
+ return regexMap.get(regex);
+ }
+ return Collections.emptyList();
+ }
+
+ public synchronized void remove(final KafkaDependencyInstanceContext dep) {
+ final Map<String, List<KafkaDependencyInstanceContext>> regexMap = this.topicEventMap.get(dep.getTopicName());
+ if (regexMap != null) {
+ final List<KafkaDependencyInstanceContext> deps = regexMap.get(dep.getRegexMatch());
+ if (deps != null) {
+ final Iterator<KafkaDependencyInstanceContext> it = deps.iterator();
+ while (it.hasNext()) {
+ final KafkaDependencyInstanceContext curr = it.next();
+ if (curr == dep) {
+ it.remove();
+ break;
+ }
+ }
+ if (deps.isEmpty()) {
+ regexMap.remove(dep.getRegexMatch());
+ }
+ if (regexMap.isEmpty()) {
+ this.topicEventMap.remove(dep.getTopicName());
+ }
+ }
+ }
+ }
+
+ public synchronized boolean removeList(final String topic, final String event,
+ final List<KafkaDependencyInstanceContext> list) {
+ final List<String> ori = new ArrayList<>(this.topicEventMap.keySet());
+ final Map<String, List<KafkaDependencyInstanceContext>> eventMap = this.topicEventMap.get(topic);
+ if (eventMap != null) {
+ final List<KafkaDependencyInstanceContext> deps = eventMap.get(event);
+ if (deps != null) {
+ deps.removeAll(list);
+ }
+ if (deps.isEmpty()) {
+ eventMap.remove(event);
+ }
+ if (eventMap.isEmpty()) {
+ this.topicEventMap.remove(topic);
+ }
+ }
+ final List<String> res = new ArrayList<>(this.topicEventMap.keySet());
+ return res == ori;
+ }
+
+ @Override
+ public String toString() {
+ final Joiner.MapJoiner mapJoiner = Joiner.on("\n").withKeyValueSeparator("=");
+ return mapJoiner.join(this.topicEventMap);
+ }
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaEventMonitor.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaEventMonitor.java
new file mode 100644
index 0000000..8e62e78
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/KafkaEventMonitor.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import azkaban.flowtrigger.DependencyPluginConfig;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import trigger.kafka.Constants.DependencyPluginConfigKey;
+
+
+/**
+ * KafkaEventMonitor implements logic for kafka consumer and maintains the KafkaDepInstanceCollection for dependencies.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+public class KafkaEventMonitor implements Runnable {
+ private final static Logger log = LoggerFactory.getLogger(KafkaEventMonitor.class);
+ private static final String GROUP_ID =
+ "group_" + KafkaEventMonitor.class.getSimpleName() + System.currentTimeMillis();
+ private final KafkaDepInstanceCollection depInstances;
+ private final ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>();
+ private Consumer<String, String> consumer;
+
+ public KafkaEventMonitor(final DependencyPluginConfig pluginConfig) {
+ this.initKafkaClient(pluginConfig);
+ this.consumer.subscribe(Arrays.asList("AzEvent_Init_Topic"));
+ if (!this.subscribedTopics.isEmpty()) {
+ this.consumerSubscriptionRebalance();
+ }
+
+ this.depInstances = new KafkaDepInstanceCollection();
+ }
+
+ private void initKafkaClient(final DependencyPluginConfig pluginConfig) {
+ final Properties props = new Properties();
+ props.put("bootstrap.servers", pluginConfig.get(DependencyPluginConfigKey.KAKFA_BROKER_URL));
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("auto.offset.reset", "latest");
+ props.put("enable.auto.commit", "true");
+ props.put("group.id", GROUP_ID);
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ this.consumer = new KafkaConsumer<String, String>(props);
+ }
+
+ public void add(final KafkaDependencyInstanceContext context) {
+ if (!this.depInstances.hasTopic(context.getTopicName())) {
+ this.depInstances.add(context);
+ this.subscribedTopics.addAll(this.depInstances.getTopicList());
+ } else {
+ this.depInstances.add(context);
+ }
+ }
+
+ public void remove(final KafkaDependencyInstanceContext context) {
+ this.depInstances.remove(context);
+ if (!this.depInstances.hasTopic(context.getTopicName())) {
+ this.subscribedTopics.addAll(this.depInstances.getTopicList());
+ }
+ }
+
+ public Set<String> getMonitorSubscription() {
+ return this.consumer.subscription();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true && !Thread.interrupted()) {
+ if (!this.subscribedTopics.isEmpty()) {
+ this.consumerSubscriptionRebalance();
+ }
+ final ConsumerRecords<String, String> records = this.consumer.poll(10000);
+ final Record recordToProcess = null;
+ for (final ConsumerRecord<String, String> record : records) {
+ try {
+ final String payload = record.value();
+ final Set<String> matchedList = this.depInstances.regexInTopic(record.topic(), payload);
+ if (!matchedList.isEmpty()) {
+ this.triggerDependencies(matchedList, record);
+ }
+ } catch (final Exception ex) {
+ log.error("failure when parsing record " + recordToProcess, ex);
+ }
+ }
+ if (!this.subscribedTopics.isEmpty()) {
+ this.consumerSubscriptionRebalance();
+ }
+ }
+ } catch (final Exception ex) {
+ log.error("failure when consuming kafka events", ex);
+ } finally {
+ // Failed to send SSL Close message.
+ this.consumer.close();
+ log.info("kafka consumer closed...");
+ }
+ }
+
+ /**
+ * Dynamically tune subscription only for the topic that dependencies need.
+ */
+ @VisibleForTesting
+ synchronized void consumerSubscriptionRebalance() {
+ log.debug("Subscribed Topics " + this.consumer.subscription());
+ if (!this.subscribedTopics.isEmpty()) {
+ final Iterator<String> iter = this.subscribedTopics.iterator();
+ final List<String> topics = new ArrayList<>();
+ while (iter.hasNext()) {
+ topics.add(iter.next());
+ }
+ this.subscribedTopics.clear();
+ //re-subscribe topics that are needed
+ this.consumer.subscribe(topics);
+ }
+ }
+
+ /**
+ * If the matcher returns true, remove the dependency from collection.
+ */
+ private void triggerDependencies(final Set<String> matchedList, final ConsumerRecord<String, String> record) {
+ final List<KafkaDependencyInstanceContext> deleteList = new LinkedList<>();
+ for (final String it : matchedList) {
+ final List<KafkaDependencyInstanceContext> possibleAvailableDeps =
+ this.depInstances.getDepsByTopicAndEvent(record.topic(), it);
+ for (final KafkaDependencyInstanceContext dep : possibleAvailableDeps) {
+ dep.getCallback().onSuccess(dep);
+ deleteList.add(dep);
+ }
+ //If dependencies that need to be removed could lead to unsubscribing topics, do the topics rebalance
+ if (!this.depInstances.removeList(record.topic(), it, deleteList)) {
+ this.subscribedTopics.addAll(this.depInstances.getTopicList());
+ }
+ }
+ }
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/matcher/DependencyMatcher.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/matcher/DependencyMatcher.java
new file mode 100644
index 0000000..2d1a230
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/matcher/DependencyMatcher.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka.matcher;
+
+/**
+ * A generic interface that allows user to define their own matching method.
+ */
+public interface DependencyMatcher<T> {
+ /**
+ * Determine whether the dependency condition is match with the Kafka event.
+ */
+ boolean isMatch(T payload);
+
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/RegexKafkaDependencyMatcher.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/RegexKafkaDependencyMatcher.java
new file mode 100644
index 0000000..5b0f025
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/main/java/trigger/kafka/RegexKafkaDependencyMatcher.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import java.util.regex.Pattern;
+import trigger.kafka.matcher.DependencyMatcher;
+
+
+/**
+ * A RegexKafkaDependencyMatcher implements the regex match for whole kafka payload.
+ */
+public class RegexKafkaDependencyMatcher implements DependencyMatcher<String> {
+ private Pattern pattern ;
+ RegexKafkaDependencyMatcher(Pattern _pattern) {
+ this.pattern = _pattern;
+ }
+
+ @Override
+ public boolean isMatch(String payload) {
+ return pattern.matcher(payload).find();
+ }
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaDepInstanceCollectionTest.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaDepInstanceCollectionTest.java
new file mode 100644
index 0000000..3eb0cb1
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaDepInstanceCollectionTest.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import avro.shaded.com.google.common.collect.ImmutableMap;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceConfigImpl;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static trigger.kafka.Constants.*;
+
+
+public class KafkaDepInstanceCollectionTest {
+
+ static KafkaDependencyInstanceContext createContext(final String topic, final String match, final long startTime,
+ final String depName) {
+
+ final Map<String, String> props =
+ ImmutableMap.of(DependencyInstanceConfigKey.TOPIC, topic, DependencyInstanceConfigKey.MATCH, match,
+ DependencyInstanceConfigKey.NAME, depName);
+
+ final DependencyInstanceConfig config = new DependencyInstanceConfigImpl(props);
+
+ final KafkaDependencyCheck depCheck = new KafkaDependencyCheck();
+ final KafkaDependencyInstanceContext res =
+ new KafkaDependencyInstanceContext(config, depCheck, null);
+ return res;
+ }
+
+ private List<KafkaDependencyInstanceContext> createContextList(final String dateString) throws ParseException {
+ final List<KafkaDependencyInstanceContext> contexts = new ArrayList<>();
+
+ final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("PST"));
+ final Date date = sdf.parse(dateString);
+
+ KafkaDependencyInstanceContext context =
+ this.createContext("AzTest_Topic1", "^(\\\\d{3}-?\\\\d{2}-?\\\\d{4})$", date.getTime(), "dep1");
+ contexts.add(context);
+ context = this.createContext("AzTest_Topic1", "hadoop.*", date.getTime(), "dep2");
+ contexts.add(context);
+ context = this.createContext("AzTest_Topic2", "^\\w*", date.getTime(), "dep3");
+ contexts.add(context);
+ context = this.createContext("AzTest_Topic3", ".*", date.getTime(), "dep4");
+ contexts.add(context);
+
+ return contexts;
+ }
+
+ private void createContextListAndAddToCollection(final String dateString, final KafkaDepInstanceCollection collection)
+ throws ParseException {
+ final List<KafkaDependencyInstanceContext> contextList = this.createContextList(dateString);
+ for (final KafkaDependencyInstanceContext context : contextList) {
+ collection.add(context);
+ }
+ }
+
+ @Test
+ public void testAddAndGet() throws ParseException {
+ final KafkaDepInstanceCollection testMap = new KafkaDepInstanceCollection();
+ this.createContextListAndAddToCollection("2018-06-01 01:00:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:20:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:30:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:40:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:50:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 02:00:00", testMap);
+
+ assertThat(testMap.toString()).isEqualToIgnoringWhitespace(
+ "AzTest_Topic3={.*=[trigger.kafka.KafkaDependencyInstanceContext@2cb4c3ab, trigger.kafka.KafkaDependencyInstanceContext@13c78c0b, trigger.kafka.KafkaDependencyInstanceContext@12843fce, trigger.kafka.KafkaDependencyInstanceContext@3dd3bcd, trigger.kafka.KafkaDependencyInstanceContext@97e1986, trigger.kafka.KafkaDependencyInstanceContext@26f67b76]}"
+ + "\nAzTest_Topic2={^\\w*=[trigger.kafka.KafkaDependencyInstanceContext@153f5a29, trigger.kafka.KafkaDependencyInstanceContext@7f560810, trigger.kafka.KafkaDependencyInstanceContext@69d9c55, trigger.kafka.KafkaDependencyInstanceContext@13a57a3b, trigger.kafka.KafkaDependencyInstanceContext@7ca48474, trigger.kafka.KafkaDependencyInstanceContext@337d0578]}"
+ + "\nAzTest_Topic1={^(\\\\d{3}-?\\\\d{2}-?\\\\d{4})$=[trigger.kafka.KafkaDependencyInstanceContext@59e84876, trigger.kafka.KafkaDependencyInstanceContext@61a485d2, trigger.kafka.KafkaDependencyInstanceContext@39fb3ab6, trigger.kafka.KafkaDependencyInstanceContext@6276ae34, trigger.kafka.KafkaDependencyInstanceContext@7946e1f4, trigger.kafka.KafkaDependencyInstanceContext@3c09711b], hadoop.*=[trigger.kafka.KafkaDependencyInstanceContext@5cc7c2a6, trigger.kafka.KafkaDependencyInstanceContext@b97c004, trigger.kafka.KafkaDependencyInstanceContext@4590c9c3, trigger.kafka.KafkaDependencyInstanceContext@32e6e9c3, trigger.kafka.KafkaDependencyInstanceContext@5056dfcb, trigger.kafka.KafkaDependencyInstanceContext@6574b225]}");
+
+ assertThat(testMap.getDepsByTopicAndEvent("a", "b")).isEmpty();
+ assertThat(testMap.hasTopic("AzTest_Topic3")).isTrue();
+ assertThat(testMap.getDepsByTopicAndEvent("AzTest_Topic1", "hadoop.*").toString()).isEqualTo(
+ "[trigger.kafka.KafkaDependencyInstanceContext@5cc7c2a6, trigger.kafka.KafkaDependencyInstanceContext@b97c004, trigger.kafka.KafkaDependencyInstanceContext@4590c9c3, trigger.kafka.KafkaDependencyInstanceContext@32e6e9c3, trigger.kafka.KafkaDependencyInstanceContext@5056dfcb, trigger.kafka.KafkaDependencyInstanceContext@6574b225]");
+ }
+
+ @Test
+ public void testRemove() throws ParseException {
+ final KafkaDepInstanceCollection testMap = new KafkaDepInstanceCollection();
+ this.createContextListAndAddToCollection("2018-06-01 01:00:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:20:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:30:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:40:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:50:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 02:00:00", testMap);
+ final List<KafkaDependencyInstanceContext> contexts = testMap.getDepsByTopicAndEvent("AzTest_Topic1", "hadoop.*");
+ for (final KafkaDependencyInstanceContext context : new ArrayList<>(contexts)) {
+ testMap.remove(context);
+ }
+ assertThat(testMap.getDepsByTopicAndEvent("AzTest_Topic1", "hadoop.*")).isNull();
+ assertThat(testMap.hasTopic("AzTest_Topic1")).isTrue();
+ }
+
+ @Test
+ public void testRemoveList() throws ParseException {
+ final KafkaDepInstanceCollection testMap = new KafkaDepInstanceCollection();
+ this.createContextListAndAddToCollection("2018-06-01 01:00:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:20:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:30:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:40:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 01:50:00", testMap);
+ this.createContextListAndAddToCollection("2018-06-01 02:00:00", testMap);
+ testMap.removeList("AzTest_Topic3", ".*", testMap.getDepsByTopicAndEvent("AzTest_Topic3", ".*"));
+ assertThat(testMap.getDepsByTopicAndEvent("AzTest_Topic3", ".*")).isEmpty();
+ assertThat(testMap.hasTopic("AzTest_Topic3")).isFalse();
+ }
+}
diff --git a/az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaEventMonitorTest.java b/az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaEventMonitorTest.java
new file mode 100644
index 0000000..873784a
--- /dev/null
+++ b/az-flow-trigger-dependency-type/kafka-event-trigger/src/test/trigger/kafka/KafkaEventMonitorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package trigger.kafka;
+
+import azkaban.flowtrigger.DependencyPluginConfig;
+import azkaban.flowtrigger.DependencyPluginConfigImpl;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.*;
+
+
+public class KafkaEventMonitorTest {
+ @Test
+ public void testConsumerSubscriptionRebalance() throws ParseException {
+ final Map<String, String> pluginConfigMap = new HashMap<>();
+ final String brokerURL = "localhost:9092";
+
+ pluginConfigMap.put(Constants.DependencyPluginConfigKey.KAKFA_BROKER_URL, brokerURL);
+ final DependencyPluginConfig pluginConfig = new DependencyPluginConfigImpl(pluginConfigMap);
+ KafkaEventMonitor testMonitor = new KafkaEventMonitor(pluginConfig);
+ assertThat(testMonitor.getMonitorSubscription()).contains("AzEvent_Init_Topic");
+ Date date = new Date();
+ KafkaDependencyInstanceContext context =
+ KafkaDepInstanceCollectionTest.createContext("AzTest_Topic1", "^(\\\\d{3}-?\\\\d{2}-?\\\\d{4})$",date.getTime(), "dep1");
+ testMonitor.add(context);
+ context = KafkaDepInstanceCollectionTest.createContext("AzTest_Topic1", "hadoop.*",date.getTime(), "dep2");
+ testMonitor.add(context);
+ context = KafkaDepInstanceCollectionTest.createContext("AzTest_Topic2", "^\\w*",date.getTime(), "dep3");
+ testMonitor.add(context);
+ context = KafkaDepInstanceCollectionTest.createContext("AzTest_Topic3", ".*",date.getTime(), "dep4");
+ testMonitor.add(context);
+ testMonitor.consumerSubscriptionRebalance();
+ assertThat(testMonitor.getMonitorSubscription()).contains("AzTest_Topic1");
+ assertThat(testMonitor.getMonitorSubscription()).contains("AzTest_Topic2");
+ assertThat(testMonitor.getMonitorSubscription()).contains("AzTest_Topic3");
+ assertThat(testMonitor.getMonitorSubscription()).doesNotContain("AzEvent_Init_Topic");
+ testMonitor.remove(context);
+ testMonitor.consumerSubscriptionRebalance();
+ assertThat(testMonitor.getMonitorSubscription()).doesNotContain("AzTest_Topic3");
+ }
+}
settings.gradle 4(+3 -1)
diff --git a/settings.gradle b/settings.gradle
index 6366ff3..4b4c9b4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -31,4 +31,6 @@ include 'az-reportal'
include 'az-hadoop-jobtype-plugin'
include 'az-jobsummary'
include 'az-hdfs-viewer'
-include 'tools'
+include 'az-flow-trigger-dependency-type'
+include 'az-flow-trigger-dependency-type:kafka-event-trigger'
+include 'tools'
\ No newline at end of file