azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java 28(+28 -0)
azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java 226(+226 -0)
azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java 52(+52 -0)
azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java 34(+34 -0)
azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java 51(+51 -0)
Details
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java
new file mode 100644
index 0000000..f7fa743
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.plugin;
+
+public class FlowTriggerDependencyPluginException extends Exception {
+
+ public FlowTriggerDependencyPluginException(final String message) {
+ super(message);
+ }
+
+ public FlowTriggerDependencyPluginException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
new file mode 100644
index 0000000..c524f27
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.plugin;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyPluginConfig;
+import azkaban.flowtrigger.DependencyPluginConfigImpl;
+import azkaban.utils.Utils;
+import com.google.common.collect.Maps;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class FlowTriggerDependencyPluginManager {
+
+ public static final String CONFIG_FILE = "dependency.properties";
+ public static final String PRIVATE_CONFIG_FILE = "private.properties";
+ public static final String DEPENDENCY_CLASS = "dependency.class";
+ public static final String CLASS_PATH = "dependency.classpath";
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(FlowTriggerDependencyPluginManager.class);
+
+ private final String pluginDir;
+
+ private final Map<String, DependencyCheck> dependencyTypeMap;
+
+ @Inject
+ public FlowTriggerDependencyPluginManager(final String pluginDir) throws
+ FlowTriggerDependencyPluginException {
+ this.dependencyTypeMap = new ConcurrentHashMap<>();
+ this.pluginDir = pluginDir;
+ this.loadAllDependencyPlugins();
+ }
+
+ private Map<String, String> readConfig(final File file) throws
+ FlowTriggerDependencyPluginException {
+ final Properties props = new Properties();
+ InputStream input = null;
+ try {
+ input = new BufferedInputStream(new FileInputStream(file));
+ props.load(input);
+ } catch (final Exception e) {
+ logger.debug("unable to read the file " + file, e);
+ throw new FlowTriggerDependencyPluginException(e);
+ } finally {
+ try {
+ if (input != null) {
+ input.close();
+ }
+ } catch (final IOException e) {
+ logger.error("unable to close input stream when reading config from file " + file
+ .getAbsolutePath(), e);
+ }
+ }
+ return Maps.fromProperties(props);
+ }
+
+ private void validatePluginConfig(final DependencyPluginConfig pluginConfig)
+ throws FlowTriggerDependencyPluginException {
+ if (StringUtils.isEmpty(pluginConfig.get(DEPENDENCY_CLASS))) {
+ throw new FlowTriggerDependencyPluginException("missing " + DEPENDENCY_CLASS + " in "
+ + "dependency plugin properties");
+ } else if (StringUtils.isEmpty(pluginConfig.get(CLASS_PATH))) {
+ throw new FlowTriggerDependencyPluginException("missing " + CLASS_PATH + " in "
+ + "dependency plugin properties");
+ }
+ }
+
+ private DependencyPluginConfig mergePluginConfig(final Map<String, String> publicProps,
+ final Map<String, String> privateProps) throws FlowTriggerDependencyPluginException {
+ final Map<String, String> combined = new HashMap<>();
+ combined.putAll(publicProps);
+ combined.putAll(privateProps);
+ if (combined.size() != publicProps.size() + privateProps.size()) {
+ throw new FlowTriggerDependencyPluginException("duplicate property found in both public and"
+ + " private properties");
+ }
+ return new DependencyPluginConfigImpl(combined);
+ }
+
+
+ private DependencyCheck createDependencyCheck(final DependencyPluginConfig pluginConfig)
+ throws FlowTriggerDependencyPluginException {
+ final String classPath = pluginConfig.get(CLASS_PATH);
+ final String[] cpList = classPath.split(",");
+
+ final List<URL> resources = new ArrayList<>();
+
+ try {
+ for (final String cp : cpList) {
+ final URL cpItem = new File(cp).toURI().toURL();
+ if (!resources.contains(cpItem)) {
+ logger.info("adding to classpath " + cpItem);
+ resources.add(cpItem);
+ }
+ }
+ } catch (final Exception ex) {
+ throw new FlowTriggerDependencyPluginException(ex);
+ }
+
+ final ClassLoader dependencyClassloader =
+ new URLClassLoader(resources.toArray(new URL[resources.size()]));
+ Class<? extends DependencyCheck> clazz = null;
+ try {
+ clazz = (Class<? extends DependencyCheck>) dependencyClassloader.loadClass(pluginConfig.get
+ (DEPENDENCY_CLASS));
+ final DependencyCheck dependencyCheck =
+ (DependencyCheck) Utils.callConstructor(clazz);
+ return dependencyCheck;
+ } catch (final Exception ex) {
+ throw new FlowTriggerDependencyPluginException(ex);
+ }
+ }
+
+ public void loadDependencyPlugin(final File pluginDir)
+ throws FlowTriggerDependencyPluginException {
+ if (pluginDir.isDirectory() && pluginDir.canRead()) {
+ try {
+ final DependencyPluginConfig pluginConfig = createPluginConfig(pluginDir);
+ final DependencyCheck depCheck = createDependencyCheck(pluginConfig);
+ final String pluginName = getPluginName(pluginDir);
+ depCheck.init(pluginConfig);
+ this.dependencyTypeMap.put(pluginName, depCheck);
+ } catch (final Exception ex) {
+ logger.error("failed to initializing plugin in " + pluginDir, ex);
+ throw new FlowTriggerDependencyPluginException(ex);
+ }
+ }
+ }
+
+ private void loadAllDependencyPlugins() throws FlowTriggerDependencyPluginException {
+ final File pluginDir = new File(this.pluginDir);
+ for (final File dir : pluginDir.listFiles()) {
+ loadDependencyPlugin(dir);
+ }
+ }
+
+ private String getPluginName(final File dependencyPluginDir) {
+ //the name of the dependency plugin dir is treated as the name of the plugin
+ return dependencyPluginDir.getName();
+ }
+
+ private Map<String, String> readPublicConfig(final File publicConfigFile)
+ throws FlowTriggerDependencyPluginException {
+ return readConfig(publicConfigFile);
+ }
+
+ /**
+ * read config from private property file, if the file is not present, then return empty.
+ */
+ private Map<String, String> readPrivateConfig(final File privateConfigFile) {
+ try {
+ return readConfig(privateConfigFile);
+ } catch (final Exception ex) {
+ return new HashMap<>();
+ }
+ }
+
+ private DependencyPluginConfig createPluginConfig(final File dir) throws
+ FlowTriggerDependencyPluginException {
+ final File publicConfigFile = new File(dir.getAbsolutePath() + "/" + CONFIG_FILE);
+ final File privateConfigFile = new File(dir.getAbsolutePath() + "/" + PRIVATE_CONFIG_FILE);
+ try {
+ final DependencyPluginConfig pluginConfig = mergePluginConfig(
+ readPublicConfig(publicConfigFile),
+ readPrivateConfig(privateConfigFile));
+ validatePluginConfig(pluginConfig);
+ return pluginConfig;
+ } catch (final FlowTriggerDependencyPluginException exception) {
+ throw new FlowTriggerDependencyPluginException("exception when initializing plugin "
+ + "config in " + dir.getAbsolutePath() + ": " + exception.getMessage());
+ }
+ }
+
+ /**
+ * return or create a dependency check based on type
+ *
+ * @return if the dependencyCheck of the same type already exists, return the check,
+ * otherwise create a new one and return.
+ */
+
+ public DependencyCheck getDependencyCheck(final String type) {
+ return this.dependencyTypeMap.get(type);
+ }
+
+ public void shutdown() {
+ for (final DependencyCheck depCheck : this.dependencyTypeMap.values()) {
+ try {
+ depCheck.shutdown();
+ } catch (final Exception ex) {
+ logger.error("failed to shutdown dependency check " + depCheck, ex);
+ }
+ }
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
new file mode 100644
index 0000000..abf30d7
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import java.net.URL;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class FlowTriggerDependencyPluginManagerTest {
+
+ private static final String pluginDir = "dependencyplugin";
+ private static FlowTriggerDependencyPluginManager pluginManager;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ final URL url = FlowTriggerDependencyPluginManagerTest.class.getClassLoader()
+ .getResource(pluginDir);
+ pluginManager = new FlowTriggerDependencyPluginManager(url.getPath());
+ }
+
+
+ @Test
+ public void testPluginLoading() {
+ assertThat(pluginManager.getDependencyCheck("test")).isNotNull();
+ assertThat(pluginManager.getDependencyCheck("test2")).isNotNull();
+ //verify the dependency check contains the specified properties
+ assertThat(pluginManager.getDependencyCheck("test").toString()).contains("kafka.url=123");
+ assertThat(pluginManager.getDependencyCheck("test").toString()).contains("kafka.port=1234");
+ //verify the dependency check contains the specified private properties
+ assertThat(pluginManager.getDependencyCheck("test2").toString()).contains("password=1234");
+ }
+}
+
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
new file mode 100644
index 0000000..d3f4784
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import azkaban.flowtrigger.DependencyPluginConfig;
+
+public class TestDependencyCheck implements DependencyCheck {
+
+ private DependencyPluginConfig config;
+
+ @Override
+ public DependencyInstanceContext run(final DependencyInstanceConfig config,
+ final DependencyInstanceRuntimeProps runtimeProps,
+ final DependencyInstanceCallback callback) {
+ return new TestDependencyInstanceContext(config, runtimeProps, callback);
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public String toString() {
+ return this.config.toString();
+ }
+
+ @Override
+ public void init(final DependencyPluginConfig config) {
+ this.config = config;
+ }
+}
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
new file mode 100644
index 0000000..638c421
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin;
+
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+
+public class TestDependencyInstanceContext implements DependencyInstanceContext {
+
+ public TestDependencyInstanceContext(final DependencyInstanceConfig config,
+ final DependencyInstanceRuntimeProps runtimeProps,
+ final DependencyInstanceCallback callback) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java
new file mode 100644
index 0000000..77b61b0
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin2;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import azkaban.flowtrigger.DependencyPluginConfig;
+
+public class TestDependencyCheck2 implements DependencyCheck {
+
+ private DependencyPluginConfig config;
+
+ @Override
+ public DependencyInstanceContext run(final DependencyInstanceConfig config,
+ final DependencyInstanceRuntimeProps runtimeProps,
+ final DependencyInstanceCallback callback) {
+ return new TestDependencyInstanceContext2(config, runtimeProps, callback);
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public String toString() {
+ return this.config.toString();
+ }
+
+ @Override
+ public void init(final DependencyPluginConfig config) {
+ this.config = config;
+ }
+}
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyInstanceContext2.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyInstanceContext2.java
new file mode 100644
index 0000000..6ffc28c
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyInstanceContext2.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin2;
+
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+
+public class TestDependencyInstanceContext2 implements DependencyInstanceContext {
+
+
+ public TestDependencyInstanceContext2(final DependencyInstanceConfig config,
+ final DependencyInstanceRuntimeProps runtimeProps,
+ final DependencyInstanceCallback callback) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+}
diff --git a/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar
new file mode 100644
index 0000000..7198f75
Binary files /dev/null and b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar differ
diff --git a/azkaban-web-server/src/test/resources/dependencyplugin/test/dependency.properties b/azkaban-web-server/src/test/resources/dependencyplugin/test/dependency.properties
new file mode 100644
index 0000000..c83e29a
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/dependencyplugin/test/dependency.properties
@@ -0,0 +1,5 @@
+kafka.url=123
+kafka.port=1234
+#required
+dependency.class=azkaban.testplugin.TestDependencyCheck
+dependency.classpath=src/test/resources/az-flow-trigger-dependency-plugin.jar
\ No newline at end of file
diff --git a/azkaban-web-server/src/test/resources/dependencyplugin/test2/dependency.properties b/azkaban-web-server/src/test/resources/dependencyplugin/test2/dependency.properties
new file mode 100644
index 0000000..a32a45d
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/dependencyplugin/test2/dependency.properties
@@ -0,0 +1,4 @@
+hdfsnamenode=123
+hdfsurl=1234
+dependency.class=azkaban.testplugin2.TestDependencyCheck2
+dependency.classpath=src/test/resources/az-flow-trigger-dependency-plugin.jar
diff --git a/azkaban-web-server/src/test/resources/dependencyplugin/test2/private.properties b/azkaban-web-server/src/test/resources/dependencyplugin/test2/private.properties
new file mode 100644
index 0000000..9612336
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/dependencyplugin/test2/private.properties
@@ -0,0 +1 @@
+password=1234