azkaban-aplcache

make flow trigger plugin manager search classes from plugin

4/6/2018 5:27:25 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
index ddb860e..f51e1e9 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -20,14 +20,15 @@ import azkaban.flowtrigger.DependencyCheck;
 import azkaban.flowtrigger.DependencyPluginConfig;
 import azkaban.flowtrigger.DependencyPluginConfigImpl;
 import azkaban.utils.Utils;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import java.io.BufferedInputStream;
 import java.io.File;
+import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,19 +49,38 @@ public class FlowTriggerDependencyPluginManager {
   public static final String PRIVATE_CONFIG_FILE = "private.properties";
   public static final String DEPENDENCY_CLASS = "dependency.class";
   public static final String CLASS_PATH = "dependency.classpath";
-
   private static final Logger logger = LoggerFactory
       .getLogger(FlowTriggerDependencyPluginManager.class);
-
   private final String pluginDir;
-
   private final Map<String, DependencyCheck> dependencyTypeMap;
+  private final ClassLoader prevClassLoader;
 
   @Inject
-  public FlowTriggerDependencyPluginManager(final String pluginDir) throws
-      FlowTriggerDependencyPluginException {
+  public FlowTriggerDependencyPluginManager(final String pluginDir)
+      throws FlowTriggerDependencyPluginException {
     this.dependencyTypeMap = new ConcurrentHashMap<>();
     this.pluginDir = pluginDir;
+    this.prevClassLoader = Thread.currentThread().getContextClassLoader();
+  }
+
+  /**
+   * retrieve files with wildcard matching.
+   * Only support "dir/*". Pattern like "dir/foo*" or "dir/*foo*" will not be supported
+   * since user shouldn't upload the jars they don't want to import
+   * the reason for supporting dir/* is to provide another packaging option
+   * which let user upload a dir of all required jars
+   * in addition to one fat jar.
+   */
+  private File[] getFilesMatchingPath(final String path) {
+    if (path.endsWith("*")) {
+      final File dir = new File(path.substring(0, path.lastIndexOf("/") + 1));
+      final FileFilter fileFilter = new WildcardFileFilter(path.substring(path.lastIndexOf("/")
+          + 1));
+      final File[] files = dir.listFiles(fileFilter);
+      return files;
+    } else {
+      return new File[]{new File(path)};
+    }
   }
 
   private Map<String, String> readConfig(final File file) throws
@@ -87,12 +108,12 @@ public class FlowTriggerDependencyPluginManager {
 
   private void validatePluginConfig(final DependencyPluginConfig pluginConfig)
       throws FlowTriggerDependencyPluginException {
-    if (StringUtils.isEmpty(pluginConfig.get(DEPENDENCY_CLASS))) {
-      throw new FlowTriggerDependencyPluginException("missing " + DEPENDENCY_CLASS + " in "
-          + "dependency plugin properties");
-    } else if (StringUtils.isEmpty(pluginConfig.get(CLASS_PATH))) {
-      throw new FlowTriggerDependencyPluginException("missing " + CLASS_PATH + " in "
-          + "dependency plugin properties");
+    for (final String requiredField : ImmutableSet
+        .of(DEPENDENCY_CLASS, CLASS_PATH)) {
+      if (StringUtils.isEmpty(pluginConfig.get(requiredField))) {
+        throw new FlowTriggerDependencyPluginException("missing " + requiredField + " in "
+            + "dependency plugin properties");
+      }
     }
   }
 
@@ -108,35 +129,41 @@ public class FlowTriggerDependencyPluginManager {
     return new DependencyPluginConfigImpl(combined);
   }
 
-
   private DependencyCheck createDependencyCheck(final DependencyPluginConfig pluginConfig)
       throws FlowTriggerDependencyPluginException {
     final String classPath = pluginConfig.get(CLASS_PATH);
+
     final String[] cpList = classPath.split(",");
 
     final List<URL> resources = new ArrayList<>();
 
     try {
       for (final String cp : cpList) {
-        final URL cpItem = new File(cp).toURI().toURL();
-        if (!resources.contains(cpItem)) {
-          logger.info("adding to classpath " + cpItem);
-          resources.add(cpItem);
+        final File[] files = getFilesMatchingPath(cp);
+        if (files != null) {
+          for (final File file : files) {
+            final URL cpItem = file.toURI().toURL();
+            if (!resources.contains(cpItem)) {
+              logger.info("adding to classpath " + cpItem);
+              resources.add(cpItem);
+            }
+          }
         }
       }
     } catch (final Exception ex) {
       throw new FlowTriggerDependencyPluginException(ex);
     }
 
-    final ClassLoader dependencyClassloader =
-        new URLClassLoader(resources.toArray(new URL[resources.size()]));
+    final ClassLoader dependencyClassloader = new ParentLastURLClassLoader(
+        resources.toArray(new URL[resources.size()]), this.getClass().getClassLoader());
+
+    Thread.currentThread().setContextClassLoader(dependencyClassloader);
+
     Class<? extends DependencyCheck> clazz = null;
     try {
       clazz = (Class<? extends DependencyCheck>) dependencyClassloader.loadClass(pluginConfig.get
           (DEPENDENCY_CLASS));
-      final DependencyCheck dependencyCheck =
-          (DependencyCheck) Utils.callConstructor(clazz);
-      return dependencyCheck;
+      return (DependencyCheck) Utils.callConstructor(clazz);
     } catch (final Exception ex) {
       throw new FlowTriggerDependencyPluginException(ex);
     }
@@ -170,6 +197,9 @@ public class FlowTriggerDependencyPluginManager {
     for (final File dir : pluginDir.listFiles()) {
       loadDependencyPlugin(dir);
     }
+    //reset thread context loader so that other azkaban class will be loaded with the old
+    // classloader
+    Thread.currentThread().setContextClassLoader(this.prevClassLoader);
   }
 
   private String getPluginName(final File dependencyPluginDir) {
@@ -229,4 +259,6 @@ public class FlowTriggerDependencyPluginManager {
       }
     }
   }
+
+
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/ParentLastURLClassLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/ParentLastURLClassLoader.java
new file mode 100644
index 0000000..16e7c15
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/ParentLastURLClassLoader.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.plugin;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * A parent-last classloader that will try the child classloader first and then the parent.
+ * Adopted from https://stackoverflow.com/questions/5445511/how-do-i-create-a-parent-last-child-first-classloader-in-java-or-how-to-overr
+ */
+public class ParentLastURLClassLoader extends ClassLoader {
+
+  private final ChildURLClassLoader childClassLoader;
+
+  public ParentLastURLClassLoader(final URL[] urls, final ClassLoader parentCL) {
+    super(parentCL);
+
+    this.childClassLoader = new ChildURLClassLoader(urls,
+        new FindClassClassLoader(this.getParent()));
+  }
+
+  @Override
+  protected synchronized Class<?> loadClass(final String name, final boolean resolve)
+      throws ClassNotFoundException {
+    try {
+      // first we try to find a class inside the child classloader
+      return this.childClassLoader.findClass(name);
+    } catch (final ClassNotFoundException e) {
+      // didn't find it, try the parent
+      return super.loadClass(name, resolve);
+    }
+  }
+
+  /**
+   * This class allows me to call findClass on a classloader
+   */
+  private static class FindClassClassLoader extends ClassLoader {
+
+    public FindClassClassLoader(final ClassLoader parent) {
+      super(parent);
+    }
+
+    @Override
+    public Class<?> findClass(final String name) throws ClassNotFoundException {
+      return super.findClass(name);
+    }
+  }
+
+  /**
+   * This class delegates (child then parent) for the findClass method for a URLClassLoader.
+   * We need this because findClass is protected in URLClassLoader
+   */
+  private static class ChildURLClassLoader extends URLClassLoader {
+
+    private final FindClassClassLoader realParent;
+
+    public ChildURLClassLoader(final URL[] urls, final FindClassClassLoader realParent) {
+      super(urls, null);
+
+      this.realParent = realParent;
+    }
+
+    @Override
+    public Class<?> findClass(final String name) throws ClassNotFoundException {
+      try {
+        final Class<?> loaded = super.findLoadedClass(name);
+        if (loaded != null) {
+          return loaded;
+        }
+
+        // first try to use the URLClassLoader findClass
+        return super.findClass(name);
+      } catch (final ClassNotFoundException e) {
+        // if that fails, we ask our real parent classloader to load the class (we give up)
+        return this.realParent.loadClass(name);
+      }
+    }
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
index a265c34..df94237 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
@@ -42,6 +42,17 @@ public class FlowTriggerDependencyPluginManagerTest {
   public void testPluginLoading() {
     assertThat(pluginManager.getDependencyCheck("test")).isNotNull();
     assertThat(pluginManager.getDependencyCheck("test2")).isNotNull();
+
+    // class loader of pluginManager.getDependencyCheck("test") is different from
+    // loader of FakeDependencyCheck1, so
+    // assertThat(pluginManager.getDependencyCheck("test")).isInstanceOf(FakeDependencyCheck1)
+    // will return false.
+    assertThat(pluginManager.getDependencyCheck("test").getClass().getCanonicalName()).isEqualTo
+        ("azkaban.flowtrigger.testplugin.FakeDependencyCheck1");
+
+    assertThat(pluginManager.getDependencyCheck("test2").getClass().getCanonicalName()).isEqualTo
+        ("azkaban.flowtrigger.testplugin.FakeDependencyCheck2");
+
     //verify the dependency check contains the specified properties
     assertThat(pluginManager.getDependencyCheck("test").toString()).contains("kafka.url=123");
     assertThat(pluginManager.getDependencyCheck("test").toString()).contains("kafka.port=1234");
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyCheck1.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyCheck1.java
index bc3106c..49da78f 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyCheck1.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyCheck1.java
@@ -25,7 +25,7 @@ import azkaban.flowtrigger.DependencyPluginConfig;
 
 /**
  * todo chengren311:
- * az-flow-trigger-dependency-plugin.jar in resource folder is generated from
+ * test-dependency-plugin.jar in resource folder is generated from
  *
  * @see azkaban.flowtrigger.testplugin.FakeDependencyCheck1
  * @see azkaban.flowtrigger.testplugin.FakeDependencyCheck2