FlowTriggerDependencyPluginManager.java

265 lines | 9.664 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.flowtrigger.plugin;

import azkaban.flowtrigger.DependencyCheck;
import azkaban.flowtrigger.DependencyPluginConfig;
import azkaban.flowtrigger.DependencyPluginConfigImpl;
import azkaban.utils.Utils;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class FlowTriggerDependencyPluginManager {

  public static final String CONFIG_FILE = "dependency.properties";
  public static final String PRIVATE_CONFIG_FILE = "private.properties";
  public static final String DEPENDENCY_CLASS = "dependency.class";
  public static final String CLASS_PATH = "dependency.classpath";
  private static final Logger logger = LoggerFactory
      .getLogger(FlowTriggerDependencyPluginManager.class);
  private final String pluginDir;
  private final Map<String, DependencyCheck> dependencyTypeMap;
  private final ClassLoader prevClassLoader;

  @Inject
  public FlowTriggerDependencyPluginManager(final String pluginDir)
      throws FlowTriggerDependencyPluginException {
    this.dependencyTypeMap = new ConcurrentHashMap<>();
    this.pluginDir = pluginDir;
    this.prevClassLoader = Thread.currentThread().getContextClassLoader();
  }

  /**
   * retrieve files with wildcard matching.
   * Only support "dir/*". Pattern like "dir/foo*" or "dir/*foo*" will not be supported
   * since user shouldn't upload the jars they don't want to import
   * the reason for supporting dir/* is to provide another packaging option
   * which let user upload a dir of all required jars
   * in addition to one fat jar.
   */
  private File[] getFilesMatchingPath(final String path) {
    if (path.endsWith("*")) {
      final File dir = new File(path.substring(0, path.lastIndexOf("/") + 1));
      final FileFilter fileFilter = new WildcardFileFilter(path.substring(path.lastIndexOf("/")
          + 1));
      final File[] files = dir.listFiles(fileFilter);
      return files;
    } else {
      return new File[]{new File(path)};
    }
  }

  private Map<String, String> readConfig(final File file) throws
      FlowTriggerDependencyPluginException {
    final Properties props = new Properties();
    InputStream input = null;
    try {
      input = new BufferedInputStream(new FileInputStream(file));
      props.load(input);
    } catch (final Exception e) {
      logger.debug("unable to read the file " + file, e);
      throw new FlowTriggerDependencyPluginException(e);
    } finally {
      try {
        if (input != null) {
          input.close();
        }
      } catch (final IOException e) {
        logger.error("unable to close input stream when reading config from file " + file
            .getAbsolutePath(), e);
      }
    }
    return Maps.fromProperties(props);
  }

  private void validatePluginConfig(final DependencyPluginConfig pluginConfig)
      throws FlowTriggerDependencyPluginException {
    for (final String requiredField : ImmutableSet
        .of(DEPENDENCY_CLASS, CLASS_PATH)) {
      if (StringUtils.isEmpty(pluginConfig.get(requiredField))) {
        throw new FlowTriggerDependencyPluginException("missing " + requiredField + " in "
            + "dependency plugin properties");
      }
    }
  }

  private DependencyPluginConfig mergePluginConfig(final Map<String, String> publicProps,
      final Map<String, String> privateProps) throws FlowTriggerDependencyPluginException {
    final Map<String, String> combined = new HashMap<>();
    combined.putAll(publicProps);
    combined.putAll(privateProps);
    if (combined.size() != publicProps.size() + privateProps.size()) {
      throw new FlowTriggerDependencyPluginException("duplicate property found in both public and"
          + " private properties");
    }
    return new DependencyPluginConfigImpl(combined);
  }

  private DependencyCheck createDependencyCheck(final DependencyPluginConfig pluginConfig)
      throws FlowTriggerDependencyPluginException {
    final String classPath = pluginConfig.get(CLASS_PATH);

    final String[] cpList = classPath.split(",");

    final List<URL> resources = new ArrayList<>();

    try {
      for (final String cp : cpList) {
        final File[] files = getFilesMatchingPath(cp);
        if (files != null) {
          for (final File file : files) {
            final URL cpItem = file.toURI().toURL();
            if (!resources.contains(cpItem)) {
              logger.info("adding to classpath " + cpItem);
              resources.add(cpItem);
            }
          }
        }
      }
    } catch (final Exception ex) {
      throw new FlowTriggerDependencyPluginException(ex);
    }

    final ClassLoader dependencyClassloader = new ParentLastURLClassLoader(
        resources.toArray(new URL[resources.size()]), this.getClass().getClassLoader());

    Thread.currentThread().setContextClassLoader(dependencyClassloader);

    Class<? extends DependencyCheck> clazz = null;
    try {
      clazz = (Class<? extends DependencyCheck>) dependencyClassloader.loadClass(pluginConfig.get
          (DEPENDENCY_CLASS));
      return (DependencyCheck) Utils.callConstructor(clazz);
    } catch (final Exception ex) {
      throw new FlowTriggerDependencyPluginException(ex);
    }
  }

  public void loadDependencyPlugin(final File pluginDir)
      throws FlowTriggerDependencyPluginException {
    if (pluginDir.isDirectory() && pluginDir.canRead()) {
      try {
        final DependencyPluginConfig pluginConfig = createPluginConfig(pluginDir);
        final DependencyCheck depCheck = createDependencyCheck(pluginConfig);
        final String pluginName = getPluginName(pluginDir);
        depCheck.init(pluginConfig);
        this.dependencyTypeMap.put(pluginName, depCheck);
      } catch (final Exception ex) {
        logger.error("failed to initializing plugin in " + pluginDir, ex);
        throw new FlowTriggerDependencyPluginException(ex);
      }
    }
  }

  /**
   * Initialize all dependency plugins.
   * todo chengren311: Current design aborts loadAllPlugins if any of the plugin fails to be
   * initialized.
   * However, this might not be the optimal design. Suppose we have two dependency plugin types
   * - MySQL and Kafka, if MySQL is down, then kafka dependency type will also be unavailable.
   */
  public void loadAllPlugins() throws FlowTriggerDependencyPluginException {
    final File pluginDir = new File(this.pluginDir);
    for (final File dir : pluginDir.listFiles()) {
      loadDependencyPlugin(dir);
    }
    //reset thread context loader so that other azkaban class will be loaded with the old
    // classloader
    Thread.currentThread().setContextClassLoader(this.prevClassLoader);
  }

  private String getPluginName(final File dependencyPluginDir) {
    //the name of the dependency plugin dir is treated as the name of the plugin
    return dependencyPluginDir.getName();
  }

  private Map<String, String> readPublicConfig(final File publicConfigFile)
      throws FlowTriggerDependencyPluginException {
    return readConfig(publicConfigFile);
  }

  /**
   * read config from private property file, if the file is not present, then return empty.
   */
  private Map<String, String> readPrivateConfig(final File privateConfigFile) {
    try {
      return readConfig(privateConfigFile);
    } catch (final Exception ex) {
      return new HashMap<>();
    }
  }

  private DependencyPluginConfig createPluginConfig(final File dir) throws
      FlowTriggerDependencyPluginException {
    final File publicConfigFile = new File(dir.getAbsolutePath() + "/" + CONFIG_FILE);
    final File privateConfigFile = new File(dir.getAbsolutePath() + "/" + PRIVATE_CONFIG_FILE);
    try {
      final DependencyPluginConfig pluginConfig = mergePluginConfig(
          readPublicConfig(publicConfigFile),
          readPrivateConfig(privateConfigFile));
      validatePluginConfig(pluginConfig);
      return pluginConfig;
    } catch (final FlowTriggerDependencyPluginException exception) {
      throw new FlowTriggerDependencyPluginException("exception when initializing plugin "
          + "config in " + dir.getAbsolutePath() + ": " + exception.getMessage());
    }
  }

  /**
   * return or create a dependency check based on type
   *
   * @return if the dependencyCheck of the same type already exists, return the check,
   * otherwise create a new one and return.
   */

  public DependencyCheck getDependencyCheck(final String type) {
    return this.dependencyTypeMap.get(type);
  }

  public void shutdown() {
    for (final DependencyCheck depCheck : this.dependencyTypeMap.values()) {
      try {
        depCheck.shutdown();
      } catch (final Exception ex) {
        logger.error("failed to shutdown dependency check " + depCheck, ex);
      }
    }
  }


}