azkaban-uncached
Changes
src/java/azkaban/executor/ExecutorManager.java 217(+212 -5)
Details
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index c6c6f41..39363ba 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -8,12 +8,13 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManager.Alerter;
import azkaban.utils.AbstractMailer;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
import azkaban.utils.Utils;
-public class ExecutorMailer extends AbstractMailer {
+public class ExecutorMailer extends AbstractMailer implements Alerter {
private static Logger logger = Logger.getLogger(ExecutorMailer.class);
private boolean testMode = false;
@@ -164,4 +165,19 @@ public class ExecutorMailer extends AbstractMailer {
return failedJobs;
}
+
+ @Override
+ public void alertOnSuccess(ExecutableFlow exflow) throws Exception {
+ sendSuccessEmail(exflow);
+ }
+
+ @Override
+ public void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception {
+ sendErrorEmail(exflow, extraReasons);
+ }
+
+ @Override
+ public void alertOnFirstError(ExecutableFlow exflow) throws Exception {
+ sendFirstErrorMessage(exflow);
+ }
}
\ No newline at end of file
src/java/azkaban/executor/ExecutorManager.java 217(+212 -5)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5d51a2b..a7eef2d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,10 +16,15 @@
package azkaban.executor;
+import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -43,9 +48,11 @@ import azkaban.project.Project;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.FileIOUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
/**
* Executor manager used to manage the client side job.
@@ -62,7 +69,6 @@ public class ExecutorManager {
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
- private ExecutorMailer mailer;
private ExecutingManagerUpdaterThread executingManager;
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3*4*7*24*60*60*1000l;
@@ -72,6 +78,14 @@ public class ExecutorManager {
private final boolean isPrimary;
+ private Map<String, Alerter> alerters;
+
+ public interface Alerter {
+ void alertOnSuccess(ExecutableFlow exflow) throws Exception;
+ void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
+ void alertOnFirstError(ExecutableFlow exflow) throws Exception;
+ }
+
public ExecutorManager(Props props, ExecutorLoader loader, boolean isPrimary) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
@@ -79,7 +93,7 @@ public class ExecutorManager {
executorHost = props.getString("executor.host", "localhost");
executorPort = props.getInt("executor.port");
- mailer = new ExecutorMailer(props);
+ alerters = loadAlerters(props);
this.isPrimary = isPrimary;
@@ -93,6 +107,143 @@ public class ExecutorManager {
}
}
+ private Map<String, Alerter> loadAlerters(Props props) {
+ Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+ // load built-in alerters
+ ExecutorMailer mailAlerter = new ExecutorMailer(props);
+ allAlerters.put("email", mailAlerter);
+ // load all plugin alerters
+ String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+ allAlerters.putAll(loadPluginAlerters(pluginDir));
+ return allAlerters;
+ }
+
+ private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+ File alerterPluginPath = new File(pluginPath);
+ if (!alerterPluginPath.exists()) {
+ return Collections.<String, Alerter>emptyMap();
+ }
+
+ Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+ ClassLoader parentLoader = this.getClass().getClassLoader();
+ File[] pluginDirs = alerterPluginPath.listFiles();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (File pluginDir: pluginDirs) {
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ File propertiesFile = new File(propertiesDir, "plugin.properties");
+ File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+
+ if (propertiesFile.exists()) {
+ if (propertiesOverrideFile.exists()) {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+ }
+ else {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile);
+ }
+ }
+ else {
+ logger.error("Plugin conf file " + propertiesFile + " not found.");
+ continue;
+ }
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ String pluginName = pluginProps.getString("alerter.name");
+ List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+
+ String pluginClass = pluginProps.getString("alerter.class");
+ if (pluginClass == null) {
+ logger.error("Alerter class is not set.");
+ }
+ else {
+ logger.info("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (int i=0; i < files.length; ++i) {
+ try {
+ URL url = files[i].toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ if (extLibClasspath != null) {
+ for (String extLib : extLibClasspath) {
+ try {
+ File file = new File(pluginDir, extLib);
+ URL url = file.toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> alerterClass = null;
+ try {
+ alerterClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+
+ String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+ Constructor<?> constructor = null;
+ try {
+ constructor = alerterClass.getConstructor(Props.class);
+ } catch (NoSuchMethodException e) {
+ logger.error("Constructor not found in " + pluginClass);
+ continue;
+ }
+
+ Object obj = null;
+ try {
+ obj = constructor.newInstance(pluginProps);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+
+ if (!(obj instanceof Alerter)) {
+ logger.error("The object is not an Alerter");
+ continue;
+ }
+
+ Alerter plugin = (Alerter) obj;
+ installedAlerterPlugins.put(pluginName, plugin);
+ }
+
+ return installedAlerterPlugins;
+
+ }
+
public String getExecutorHost() {
return executorHost;
}
@@ -745,27 +896,61 @@ public class ExecutorManager {
ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
+ Alerter mailAlerter = alerters.get("email");
if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
{
if(options.getFailureEmails() != null && !options.getFailureEmails().isEmpty())
{
try {
- mailer.sendErrorEmail(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+ mailAlerter.alertOnError(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
} catch (Exception e) {
logger.error(e);
}
}
+ if(options.getFlowParameters().containsKey("alert.type")) {
+ String alertType = options.getFlowParameters().get("alert.type");
+ Alerter alerter = alerters.get(alertType);
+ if(alerter != null) {
+ try {
+ alerter.alertOnError(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to alert by " + alertType);
+ }
+ }
+ else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
}
else
{
if(options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty())
{
try {
- mailer.sendSuccessEmail(flow);
+
+ mailAlerter.alertOnSuccess(flow);
} catch (Exception e) {
logger.error(e);
}
}
+ if(options.getFlowParameters().containsKey("alert.type")) {
+ String alertType = options.getFlowParameters().get("alert.type");
+ Alerter alerter = alerters.get(alertType);
+ if(alerter != null) {
+ try {
+ alerter.alertOnSuccess(flow);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to alert by " + alertType);
+ }
+ }
+ else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
}
}
@@ -846,7 +1031,29 @@ public class ExecutorManager {
if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
// We want to see if we should give an email status on first failure.
if (options.getNotifyOnFirstFailure()) {
- mailer.sendFirstErrorMessage(flow);
+ Alerter mailAlerter = alerters.get("email");
+ try {
+ mailAlerter.alertOnFirstError(flow);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("Failed to send first error email." + e.getMessage());
+ }
+ }
+ if(options.getFlowParameters().containsKey("alert.type")) {
+ String alertType = options.getFlowParameters().get("alert.type");
+ Alerter alerter = alerters.get(alertType);
+ if(alerter != null) {
+ try {
+ alerter.alertOnFirstError(flow);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to alert by " + alertType);
+ }
+ }
+ else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
}
}