FlowTriggerDependencyPluginManager.java
Home
/
azkaban-web-server /
src /
main /
java /
azkaban /
flowtrigger /
plugin /
FlowTriggerDependencyPluginManager.java
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.flowtrigger.plugin;
import azkaban.flowtrigger.DependencyCheck;
import azkaban.flowtrigger.DependencyPluginConfig;
import azkaban.flowtrigger.DependencyPluginConfigImpl;
import azkaban.utils.Utils;
import com.google.common.collect.Maps;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class FlowTriggerDependencyPluginManager {
public static final String CONFIG_FILE = "dependency.properties";
public static final String PRIVATE_CONFIG_FILE = "private.properties";
public static final String DEPENDENCY_CLASS = "dependency.class";
public static final String CLASS_PATH = "dependency.classpath";
private static final Logger logger = LoggerFactory
.getLogger(FlowTriggerDependencyPluginManager.class);
private final String pluginDir;
private final Map<String, DependencyCheck> dependencyTypeMap;
@Inject
public FlowTriggerDependencyPluginManager(final String pluginDir) throws
FlowTriggerDependencyPluginException {
this.dependencyTypeMap = new ConcurrentHashMap<>();
this.pluginDir = pluginDir;
this.loadAllDependencyPlugins();
}
private Map<String, String> readConfig(final File file) throws
FlowTriggerDependencyPluginException {
final Properties props = new Properties();
InputStream input = null;
try {
input = new BufferedInputStream(new FileInputStream(file));
props.load(input);
} catch (final Exception e) {
logger.debug("unable to read the file " + file, e);
throw new FlowTriggerDependencyPluginException(e);
} finally {
try {
if (input != null) {
input.close();
}
} catch (final IOException e) {
logger.error("unable to close input stream when reading config from file " + file
.getAbsolutePath(), e);
}
}
return Maps.fromProperties(props);
}
private void validatePluginConfig(final DependencyPluginConfig pluginConfig)
throws FlowTriggerDependencyPluginException {
if (StringUtils.isEmpty(pluginConfig.get(DEPENDENCY_CLASS))) {
throw new FlowTriggerDependencyPluginException("missing " + DEPENDENCY_CLASS + " in "
+ "dependency plugin properties");
} else if (StringUtils.isEmpty(pluginConfig.get(CLASS_PATH))) {
throw new FlowTriggerDependencyPluginException("missing " + CLASS_PATH + " in "
+ "dependency plugin properties");
}
}
private DependencyPluginConfig mergePluginConfig(final Map<String, String> publicProps,
final Map<String, String> privateProps) throws FlowTriggerDependencyPluginException {
final Map<String, String> combined = new HashMap<>();
combined.putAll(publicProps);
combined.putAll(privateProps);
if (combined.size() != publicProps.size() + privateProps.size()) {
throw new FlowTriggerDependencyPluginException("duplicate property found in both public and"
+ " private properties");
}
return new DependencyPluginConfigImpl(combined);
}
private DependencyCheck createDependencyCheck(final DependencyPluginConfig pluginConfig)
throws FlowTriggerDependencyPluginException {
final String classPath = pluginConfig.get(CLASS_PATH);
final String[] cpList = classPath.split(",");
final List<URL> resources = new ArrayList<>();
try {
for (final String cp : cpList) {
final URL cpItem = new File(cp).toURI().toURL();
if (!resources.contains(cpItem)) {
logger.info("adding to classpath " + cpItem);
resources.add(cpItem);
}
}
} catch (final Exception ex) {
throw new FlowTriggerDependencyPluginException(ex);
}
final ClassLoader dependencyClassloader =
new URLClassLoader(resources.toArray(new URL[resources.size()]));
Class<? extends DependencyCheck> clazz = null;
try {
clazz = (Class<? extends DependencyCheck>) dependencyClassloader.loadClass(pluginConfig.get
(DEPENDENCY_CLASS));
final DependencyCheck dependencyCheck =
(DependencyCheck) Utils.callConstructor(clazz);
return dependencyCheck;
} catch (final Exception ex) {
throw new FlowTriggerDependencyPluginException(ex);
}
}
public void loadDependencyPlugin(final File pluginDir)
throws FlowTriggerDependencyPluginException {
if (pluginDir.isDirectory() && pluginDir.canRead()) {
try {
final DependencyPluginConfig pluginConfig = createPluginConfig(pluginDir);
final DependencyCheck depCheck = createDependencyCheck(pluginConfig);
final String pluginName = getPluginName(pluginDir);
depCheck.init(pluginConfig);
this.dependencyTypeMap.put(pluginName, depCheck);
} catch (final Exception ex) {
logger.error("failed to initializing plugin in " + pluginDir, ex);
throw new FlowTriggerDependencyPluginException(ex);
}
}
}
private void loadAllDependencyPlugins() throws FlowTriggerDependencyPluginException {
final File pluginDir = new File(this.pluginDir);
for (final File dir : pluginDir.listFiles()) {
loadDependencyPlugin(dir);
}
}
private String getPluginName(final File dependencyPluginDir) {
//the name of the dependency plugin dir is treated as the name of the plugin
return dependencyPluginDir.getName();
}
private Map<String, String> readPublicConfig(final File publicConfigFile)
throws FlowTriggerDependencyPluginException {
return readConfig(publicConfigFile);
}
/**
* read config from private property file, if the file is not present, then return empty.
*/
private Map<String, String> readPrivateConfig(final File privateConfigFile) {
try {
return readConfig(privateConfigFile);
} catch (final Exception ex) {
return new HashMap<>();
}
}
private DependencyPluginConfig createPluginConfig(final File dir) throws
FlowTriggerDependencyPluginException {
final File publicConfigFile = new File(dir.getAbsolutePath() + "/" + CONFIG_FILE);
final File privateConfigFile = new File(dir.getAbsolutePath() + "/" + PRIVATE_CONFIG_FILE);
try {
final DependencyPluginConfig pluginConfig = mergePluginConfig(
readPublicConfig(publicConfigFile),
readPrivateConfig(privateConfigFile));
validatePluginConfig(pluginConfig);
return pluginConfig;
} catch (final FlowTriggerDependencyPluginException exception) {
throw new FlowTriggerDependencyPluginException("exception when initializing plugin "
+ "config in " + dir.getAbsolutePath() + ": " + exception.getMessage());
}
}
/**
* return or create a dependency check based on type
*
* @return if the dependencyCheck of the same type already exists, return the check,
* otherwise create a new one and return.
*/
public DependencyCheck getDependencyCheck(final String type) {
return this.dependencyTypeMap.get(type);
}
public void shutdown() {
for (final DependencyCheck depCheck : this.dependencyTypeMap.values()) {
try {
depCheck.shutdown();
} catch (final Exception ex) {
logger.error("failed to shutdown dependency check " + depCheck, ex);
}
}
}
}