azkaban-uncached

adding generic alert types

8/8/2013 3:47:21 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index c6c6f41..39363ba 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -8,12 +8,13 @@ import javax.mail.MessagingException;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManager.Alerter;
 import azkaban.utils.AbstractMailer;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
-public class ExecutorMailer extends AbstractMailer {
+public class ExecutorMailer extends AbstractMailer implements Alerter {
 	private static Logger logger = Logger.getLogger(ExecutorMailer.class);
 	
 	private boolean testMode = false;
@@ -164,4 +165,19 @@ public class ExecutorMailer extends AbstractMailer {
 		
 		return failedJobs;
 	}
+
+	@Override
+	public void alertOnSuccess(ExecutableFlow exflow) throws Exception {
+		sendSuccessEmail(exflow);
+	}
+	
+	@Override
+	public void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception {
+		sendErrorEmail(exflow, extraReasons);
+	}
+	
+	@Override
+	public void alertOnFirstError(ExecutableFlow exflow) throws Exception {
+		sendFirstErrorMessage(exflow);
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5d51a2b..a7eef2d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,10 +16,15 @@
 
 package azkaban.executor;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,9 +48,11 @@ import azkaban.project.Project;
 import azkaban.scheduler.ScheduleStatisticManager;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.FileIOUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
 
 /**
  * Executor manager used to manage the client side job.
@@ -62,7 +69,6 @@ public class ExecutorManager {
 	private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
 	private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
 
-	private ExecutorMailer mailer;
 	private ExecutingManagerUpdaterThread executingManager;
 	
 	private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3*4*7*24*60*60*1000l;
@@ -72,6 +78,14 @@ public class ExecutorManager {
 	
 	private final boolean isPrimary;
 	
+	private Map<String, Alerter> alerters;
+	
+	public interface Alerter {
+		void alertOnSuccess(ExecutableFlow exflow) throws Exception;
+		void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
+		void alertOnFirstError(ExecutableFlow exflow) throws Exception;
+	}
+	
 	public ExecutorManager(Props props, ExecutorLoader loader, boolean isPrimary) throws ExecutorManagerException {
 		this.executorLoader = loader;
 		this.loadRunningFlows();
@@ -79,7 +93,7 @@ public class ExecutorManager {
 		executorHost = props.getString("executor.host", "localhost");
 		executorPort = props.getInt("executor.port");
 		
-		mailer = new ExecutorMailer(props);
+		alerters = loadAlerters(props);
 		
 		this.isPrimary = isPrimary;		
 		
@@ -93,6 +107,143 @@ public class ExecutorManager {
 		}
 	}
 	
+	private Map<String, Alerter> loadAlerters(Props props) {
+		Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+		// load built-in alerters
+		ExecutorMailer mailAlerter = new ExecutorMailer(props);
+		allAlerters.put("email", mailAlerter);
+		// load all plugin alerters
+		String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+		allAlerters.putAll(loadPluginAlerters(pluginDir));
+		return allAlerters;
+	}
+
+	private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+		File alerterPluginPath = new File(pluginPath);
+		if (!alerterPluginPath.exists()) {
+			return Collections.<String, Alerter>emptyMap();
+		}
+			
+		Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+		ClassLoader parentLoader = this.getClass().getClassLoader();
+		File[] pluginDirs = alerterPluginPath.listFiles();
+		ArrayList<String> jarPaths = new ArrayList<String>();
+		for (File pluginDir: pluginDirs) {
+			if (!pluginDir.isDirectory()) {
+				logger.error("The plugin path " + pluginDir + " is not a directory.");
+				continue;
+			}
+			
+			// Load the conf directory
+			File propertiesDir = new File(pluginDir, "conf");
+			Props pluginProps = null;
+			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+				File propertiesFile = new File(propertiesDir, "plugin.properties");
+				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+				
+				if (propertiesFile.exists()) {
+					if (propertiesOverrideFile.exists()) {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+					}
+					else {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile);
+					}
+				}
+				else {
+					logger.error("Plugin conf file " + propertiesFile + " not found.");
+					continue;
+				}
+			}
+			else {
+				logger.error("Plugin conf path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			String pluginName = pluginProps.getString("alerter.name");
+			List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+			
+			String pluginClass = pluginProps.getString("alerter.class");
+			if (pluginClass == null) {
+				logger.error("Alerter class is not set.");
+			}
+			else {
+				logger.info("Plugin class " + pluginClass);
+			}
+			
+			URLClassLoader urlClassLoader = null;
+			File libDir = new File(pluginDir, "lib");
+			if (libDir.exists() && libDir.isDirectory()) {
+				File[] files = libDir.listFiles();
+				
+				ArrayList<URL> urls = new ArrayList<URL>();
+				for (int i=0; i < files.length; ++i) {
+					try {
+						URL url = files[i].toURI().toURL();
+						urls.add(url);
+					} catch (MalformedURLException e) {
+						logger.error(e);
+					}
+				}
+				if (extLibClasspath != null) {
+					for (String extLib : extLibClasspath) {
+						try {
+							File file = new File(pluginDir, extLib);
+							URL url = file.toURI().toURL();
+							urls.add(url);
+						} catch (MalformedURLException e) {
+							logger.error(e);
+						}
+					}
+				}
+				
+				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+			}
+			else {
+				logger.error("Library path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			Class<?> alerterClass = null;
+			try {
+				alerterClass = urlClassLoader.loadClass(pluginClass);
+			}
+			catch (ClassNotFoundException e) {
+				logger.error("Class " + pluginClass + " not found.");
+				continue;
+			}
+
+			String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+			logger.info("Source jar " + source);
+			jarPaths.add("jar:file:" + source);
+			
+			Constructor<?> constructor = null;
+			try {
+				constructor = alerterClass.getConstructor(Props.class);
+			} catch (NoSuchMethodException e) {
+				logger.error("Constructor not found in " + pluginClass);
+				continue;
+			}
+			
+			Object obj = null;
+			try {
+				obj = constructor.newInstance(pluginProps);
+			} catch (Exception e) {
+				logger.error(e);
+			} 
+			
+			if (!(obj instanceof Alerter)) {
+				logger.error("The object is not an Alerter");
+				continue;
+			}
+			
+			Alerter plugin = (Alerter) obj;
+			installedAlerterPlugins.put(pluginName, plugin);
+		}
+		
+		return installedAlerterPlugins;
+		
+	}
+
 	public String getExecutorHost() {
 		return executorHost;
 	}
@@ -745,27 +896,61 @@ public class ExecutorManager {
 		
 		ExecutionOptions options = flow.getExecutionOptions();
 		// But we can definitely email them.
+		Alerter mailAlerter = alerters.get("email");
 		if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
 		{
 			if(options.getFailureEmails() != null && !options.getFailureEmails().isEmpty())
 			{
 				try {
-					mailer.sendErrorEmail(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+					mailAlerter.alertOnError(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
 				} catch (Exception e) {
 					logger.error(e);
 				}
 			}
+			if(options.getFlowParameters().containsKey("alert.type")) {
+				String alertType = options.getFlowParameters().get("alert.type");
+				Alerter alerter = alerters.get(alertType);
+				if(alerter != null) {
+					try {
+						alerter.alertOnError(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+						logger.error("Failed to alert by " + alertType);
+					}
+				}
+				else {
+					logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+				}
+			}
 		}
 		else
 		{
 			if(options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty())
 			{
 				try {
-					mailer.sendSuccessEmail(flow);
+					
+					mailAlerter.alertOnSuccess(flow);
 				} catch (Exception e) {
 					logger.error(e);
 				}
 			}
+			if(options.getFlowParameters().containsKey("alert.type")) {
+				String alertType = options.getFlowParameters().get("alert.type");
+				Alerter alerter = alerters.get(alertType);
+				if(alerter != null) {
+					try {
+						alerter.alertOnSuccess(flow);
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+						logger.error("Failed to alert by " + alertType);
+					}
+				}
+				else {
+					logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+				}
+			}
 		}
 		
 	}
@@ -846,7 +1031,29 @@ public class ExecutorManager {
 		if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
 			// We want to see if we should give an email status on first failure.
 			if (options.getNotifyOnFirstFailure()) {
-				mailer.sendFirstErrorMessage(flow);
+				Alerter mailAlerter = alerters.get("email");
+				try {
+					mailAlerter.alertOnFirstError(flow);
+				} catch (Exception e) {
+					e.printStackTrace();
+					logger.error("Failed to send first error email." + e.getMessage());
+				}
+			}
+			if(options.getFlowParameters().containsKey("alert.type")) {
+				String alertType = options.getFlowParameters().get("alert.type");
+				Alerter alerter = alerters.get(alertType);
+				if(alerter != null) {
+					try {
+						alerter.alertOnFirstError(flow);
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+						logger.error("Failed to alert by " + alertType);
+					}
+				}
+				else {
+					logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+				}
 			}
 		}