azkaban-aplcache

dependency type plugin manager (#1620) This PR consists

1/31/2018 9:15:44 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java
new file mode 100644
index 0000000..f7fa743
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.plugin;
+
+public class FlowTriggerDependencyPluginException extends Exception {
+
+  public FlowTriggerDependencyPluginException(final String message) {
+    super(message);
+  }
+
+  public FlowTriggerDependencyPluginException(final Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
new file mode 100644
index 0000000..c524f27
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.plugin;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyPluginConfig;
+import azkaban.flowtrigger.DependencyPluginConfigImpl;
+import azkaban.utils.Utils;
+import com.google.common.collect.Maps;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class FlowTriggerDependencyPluginManager {
+
+  public static final String CONFIG_FILE = "dependency.properties";
+  public static final String PRIVATE_CONFIG_FILE = "private.properties";
+  public static final String DEPENDENCY_CLASS = "dependency.class";
+  public static final String CLASS_PATH = "dependency.classpath";
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(FlowTriggerDependencyPluginManager.class);
+
+  private final String pluginDir;
+
+  private final Map<String, DependencyCheck> dependencyTypeMap;
+
+  @Inject
+  public FlowTriggerDependencyPluginManager(final String pluginDir) throws
+      FlowTriggerDependencyPluginException {
+    this.dependencyTypeMap = new ConcurrentHashMap<>();
+    this.pluginDir = pluginDir;
+    this.loadAllDependencyPlugins();
+  }
+
+  private Map<String, String> readConfig(final File file) throws
+      FlowTriggerDependencyPluginException {
+    final Properties props = new Properties();
+    InputStream input = null;
+    try {
+      input = new BufferedInputStream(new FileInputStream(file));
+      props.load(input);
+    } catch (final Exception e) {
+      logger.debug("unable to read the file " + file, e);
+      throw new FlowTriggerDependencyPluginException(e);
+    } finally {
+      try {
+        if (input != null) {
+          input.close();
+        }
+      } catch (final IOException e) {
+        logger.error("unable to close input stream when reading config from file " + file
+            .getAbsolutePath(), e);
+      }
+    }
+    return Maps.fromProperties(props);
+  }
+
+  private void validatePluginConfig(final DependencyPluginConfig pluginConfig)
+      throws FlowTriggerDependencyPluginException {
+    if (StringUtils.isEmpty(pluginConfig.get(DEPENDENCY_CLASS))) {
+      throw new FlowTriggerDependencyPluginException("missing " + DEPENDENCY_CLASS + " in "
+          + "dependency plugin properties");
+    } else if (StringUtils.isEmpty(pluginConfig.get(CLASS_PATH))) {
+      throw new FlowTriggerDependencyPluginException("missing " + CLASS_PATH + " in "
+          + "dependency plugin properties");
+    }
+  }
+
+  private DependencyPluginConfig mergePluginConfig(final Map<String, String> publicProps,
+      final Map<String, String> privateProps) throws FlowTriggerDependencyPluginException {
+    final Map<String, String> combined = new HashMap<>();
+    combined.putAll(publicProps);
+    combined.putAll(privateProps);
+    if (combined.size() != publicProps.size() + privateProps.size()) {
+      throw new FlowTriggerDependencyPluginException("duplicate property found in both public and"
+          + " private properties");
+    }
+    return new DependencyPluginConfigImpl(combined);
+  }
+
+
+  private DependencyCheck createDependencyCheck(final DependencyPluginConfig pluginConfig)
+      throws FlowTriggerDependencyPluginException {
+    final String classPath = pluginConfig.get(CLASS_PATH);
+    final String[] cpList = classPath.split(",");
+
+    final List<URL> resources = new ArrayList<>();
+
+    try {
+      for (final String cp : cpList) {
+        final URL cpItem = new File(cp).toURI().toURL();
+        if (!resources.contains(cpItem)) {
+          logger.info("adding to classpath " + cpItem);
+          resources.add(cpItem);
+        }
+      }
+    } catch (final Exception ex) {
+      throw new FlowTriggerDependencyPluginException(ex);
+    }
+
+    final ClassLoader dependencyClassloader =
+        new URLClassLoader(resources.toArray(new URL[resources.size()]));
+    Class<? extends DependencyCheck> clazz = null;
+    try {
+      clazz = (Class<? extends DependencyCheck>) dependencyClassloader.loadClass(pluginConfig.get
+          (DEPENDENCY_CLASS));
+      final DependencyCheck dependencyCheck =
+          (DependencyCheck) Utils.callConstructor(clazz);
+      return dependencyCheck;
+    } catch (final Exception ex) {
+      throw new FlowTriggerDependencyPluginException(ex);
+    }
+  }
+
+  public void loadDependencyPlugin(final File pluginDir)
+      throws FlowTriggerDependencyPluginException {
+    if (pluginDir.isDirectory() && pluginDir.canRead()) {
+      try {
+        final DependencyPluginConfig pluginConfig = createPluginConfig(pluginDir);
+        final DependencyCheck depCheck = createDependencyCheck(pluginConfig);
+        final String pluginName = getPluginName(pluginDir);
+        depCheck.init(pluginConfig);
+        this.dependencyTypeMap.put(pluginName, depCheck);
+      } catch (final Exception ex) {
+        logger.error("failed to initializing plugin in " + pluginDir, ex);
+        throw new FlowTriggerDependencyPluginException(ex);
+      }
+    }
+  }
+
+  private void loadAllDependencyPlugins() throws FlowTriggerDependencyPluginException {
+    final File pluginDir = new File(this.pluginDir);
+    for (final File dir : pluginDir.listFiles()) {
+      loadDependencyPlugin(dir);
+    }
+  }
+
+  private String getPluginName(final File dependencyPluginDir) {
+    //the name of the dependency plugin dir is treated as the name of the plugin
+    return dependencyPluginDir.getName();
+  }
+
+  private Map<String, String> readPublicConfig(final File publicConfigFile)
+      throws FlowTriggerDependencyPluginException {
+    return readConfig(publicConfigFile);
+  }
+
+  /**
+   * read config from private property file, if the file is not present, then return empty.
+   */
+  private Map<String, String> readPrivateConfig(final File privateConfigFile) {
+    try {
+      return readConfig(privateConfigFile);
+    } catch (final Exception ex) {
+      return new HashMap<>();
+    }
+  }
+
+  private DependencyPluginConfig createPluginConfig(final File dir) throws
+      FlowTriggerDependencyPluginException {
+    final File publicConfigFile = new File(dir.getAbsolutePath() + "/" + CONFIG_FILE);
+    final File privateConfigFile = new File(dir.getAbsolutePath() + "/" + PRIVATE_CONFIG_FILE);
+    try {
+      final DependencyPluginConfig pluginConfig = mergePluginConfig(
+          readPublicConfig(publicConfigFile),
+          readPrivateConfig(privateConfigFile));
+      validatePluginConfig(pluginConfig);
+      return pluginConfig;
+    } catch (final FlowTriggerDependencyPluginException exception) {
+      throw new FlowTriggerDependencyPluginException("exception when initializing plugin "
+          + "config in " + dir.getAbsolutePath() + ": " + exception.getMessage());
+    }
+  }
+
+  /**
+   * return or create a dependency check based on type
+   *
+   * @return if the dependencyCheck of the same type already exists, return the check,
+   * otherwise create a new one and return.
+   */
+
+  public DependencyCheck getDependencyCheck(final String type) {
+    return this.dependencyTypeMap.get(type);
+  }
+
+  public void shutdown() {
+    for (final DependencyCheck depCheck : this.dependencyTypeMap.values()) {
+      try {
+        depCheck.shutdown();
+      } catch (final Exception ex) {
+        logger.error("failed to shutdown dependency check " + depCheck, ex);
+      }
+    }
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
new file mode 100644
index 0000000..abf30d7
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import java.net.URL;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class FlowTriggerDependencyPluginManagerTest {
+
+  private static final String pluginDir = "dependencyplugin";
+  private static FlowTriggerDependencyPluginManager pluginManager;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    final URL url = FlowTriggerDependencyPluginManagerTest.class.getClassLoader()
+        .getResource(pluginDir);
+    pluginManager = new FlowTriggerDependencyPluginManager(url.getPath());
+  }
+
+
+  @Test
+  public void testPluginLoading() {
+    assertThat(pluginManager.getDependencyCheck("test")).isNotNull();
+    assertThat(pluginManager.getDependencyCheck("test2")).isNotNull();
+    //verify the dependency check contains the specified properties
+    assertThat(pluginManager.getDependencyCheck("test").toString()).contains("kafka.url=123");
+    assertThat(pluginManager.getDependencyCheck("test").toString()).contains("kafka.port=1234");
+    //verify the dependency check contains the specified private properties
+    assertThat(pluginManager.getDependencyCheck("test2").toString()).contains("password=1234");
+  }
+}
+
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
new file mode 100644
index 0000000..d3f4784
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import azkaban.flowtrigger.DependencyPluginConfig;
+
+public class TestDependencyCheck implements DependencyCheck {
+
+  private DependencyPluginConfig config;
+
+  @Override
+  public DependencyInstanceContext run(final DependencyInstanceConfig config,
+      final DependencyInstanceRuntimeProps runtimeProps,
+      final DependencyInstanceCallback callback) {
+    return new TestDependencyInstanceContext(config, runtimeProps, callback);
+  }
+
+  @Override
+  public void shutdown() {
+  }
+
+  @Override
+  public String toString() {
+    return this.config.toString();
+  }
+
+  @Override
+  public void init(final DependencyPluginConfig config) {
+    this.config = config;
+  }
+}
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
new file mode 100644
index 0000000..638c421
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin;
+
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+
+public class TestDependencyInstanceContext implements DependencyInstanceContext {
+
+  public TestDependencyInstanceContext(final DependencyInstanceConfig config,
+      final DependencyInstanceRuntimeProps runtimeProps,
+      final DependencyInstanceCallback callback) {
+  }
+
+  @Override
+  public void cancel() {
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java
new file mode 100644
index 0000000..77b61b0
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyCheck2.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin2;
+
+import azkaban.flowtrigger.DependencyCheck;
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import azkaban.flowtrigger.DependencyPluginConfig;
+
+public class TestDependencyCheck2 implements DependencyCheck {
+
+  private DependencyPluginConfig config;
+
+  @Override
+  public DependencyInstanceContext run(final DependencyInstanceConfig config,
+      final DependencyInstanceRuntimeProps runtimeProps,
+      final DependencyInstanceCallback callback) {
+    return new TestDependencyInstanceContext2(config, runtimeProps, callback);
+  }
+
+  @Override
+  public void shutdown() {
+  }
+
+  @Override
+  public String toString() {
+    return this.config.toString();
+  }
+
+  @Override
+  public void init(final DependencyPluginConfig config) {
+    this.config = config;
+  }
+}
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyInstanceContext2.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyInstanceContext2.java
new file mode 100644
index 0000000..6ffc28c
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin2/TestDependencyInstanceContext2.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.testplugin2;
+
+import azkaban.flowtrigger.DependencyInstanceCallback;
+import azkaban.flowtrigger.DependencyInstanceConfig;
+import azkaban.flowtrigger.DependencyInstanceContext;
+import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+
+public class TestDependencyInstanceContext2 implements DependencyInstanceContext {
+
+
+  public TestDependencyInstanceContext2(final DependencyInstanceConfig config,
+      final DependencyInstanceRuntimeProps runtimeProps,
+      final DependencyInstanceCallback callback) {
+  }
+
+  @Override
+  public void cancel() {
+  }
+}
diff --git a/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar
new file mode 100644
index 0000000..7198f75
Binary files /dev/null and b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar differ
diff --git a/azkaban-web-server/src/test/resources/dependencyplugin/test/dependency.properties b/azkaban-web-server/src/test/resources/dependencyplugin/test/dependency.properties
new file mode 100644
index 0000000..c83e29a
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/dependencyplugin/test/dependency.properties
@@ -0,0 +1,5 @@
+kafka.url=123
+kafka.port=1234
+#required
+dependency.class=azkaban.testplugin.TestDependencyCheck
+dependency.classpath=src/test/resources/az-flow-trigger-dependency-plugin.jar
\ No newline at end of file
diff --git a/azkaban-web-server/src/test/resources/dependencyplugin/test2/dependency.properties b/azkaban-web-server/src/test/resources/dependencyplugin/test2/dependency.properties
new file mode 100644
index 0000000..a32a45d
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/dependencyplugin/test2/dependency.properties
@@ -0,0 +1,4 @@
+hdfsnamenode=123
+hdfsurl=1234
+dependency.class=azkaban.testplugin2.TestDependencyCheck2
+dependency.classpath=src/test/resources/az-flow-trigger-dependency-plugin.jar
diff --git a/azkaban-web-server/src/test/resources/dependencyplugin/test2/private.properties b/azkaban-web-server/src/test/resources/dependencyplugin/test2/private.properties
new file mode 100644
index 0000000..9612336
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/dependencyplugin/test2/private.properties
@@ -0,0 +1 @@
+password=1234