azkaban-aplcache

initial mailman checkin

9/24/2012 10:47:48 PM

Details

diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 09628b4..c26d050 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -5,8 +5,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -59,6 +61,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Appender flowAppender;
 
 	private Thread currentThread;
+	
+	private Set<String> emailAddress;
 
 	public enum FailedFlowOptions {
 		FINISH_RUNNING_JOBS, KILL_ALL
@@ -72,6 +76,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
+		this.emailAddress = new HashSet<String>();
 
 		createLogger();
 	}
@@ -80,6 +85,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return flow;
 	}
 
+	public Set<String> getEmails() {
+		return emailAddress;
+	}
+	
 	private void createLogger() {
 		// Create logger
 		String loggerName = System.currentTimeMillis() + "."
@@ -433,6 +442,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 			System.out.println("Event " + jobID + " "
 					+ event.getType().toString());
 
+			emailAddress.addAll(runner.getNotifyEmails());
+			
 			// On Job success, we add the output props and then set up the next
 			// run.
 			if (event.getType() == Type.JOB_SUCCEEDED) {
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 28d78cf..2d5a7ae 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -1,6 +1,14 @@
 package azkaban.executor;
 
 import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -8,11 +16,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormat;
 
+import azkaban.utils.Utils;
+import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
 import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventListener;
 import azkaban.utils.ExecutableFlowLoader;
+import azkaban.utils.Mailman;
 import azkaban.utils.Props;
 
 /**
@@ -31,8 +44,17 @@ public class FlowRunnerManager {
 	private ExecutorService executorService;
 	private SubmitterThread submitterThread;
 	private FlowRunnerEventListener eventListener;
+
+	private Mailman mailer;
+	private String defaultFailureEmail;
+	private String defaultSuccessEmail;
+	private String senderAddress;
 	
-	public FlowRunnerManager(Props props) {
+	public FlowRunnerManager(Props props, Mailman mailer) {
+		this.mailer = mailer;
+//		this.defaultFailureEmail = props.getString("job.failure.email");
+//		this.defaultSuccessEmail = props.getString("job.success.email");
+		this.senderAddress = props.getString("mail.sender");
 		basePath = new File(props.getString("execution.directory"));
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
@@ -128,11 +150,64 @@ public class FlowRunnerManager {
 			FlowRunner runner = (FlowRunner)event.getRunner();
 			ExecutableFlow flow = runner.getFlow();
 			
+			List<String> emailList = new ArrayList<String>(runner.getEmails());
+			
 			System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
 			if (event.getType() == Type.FLOW_FINISHED) {
+				if(flow.getStatus() == Status.SUCCEEDED)
+					sendSuccessEmail(flow, emailList);
+				else sendErrorEmail(flow, emailList);
+				
 				logger.info("Flow " + flow.getExecutionId() + " has finished.");
 				runningFlows.remove(flow.getExecutionId());
+				
 			}
 		}
 	}
+	
+    /*
+     * Wrap a single exception with the name of the scheduled job
+     */
+    private void sendErrorEmail(ExecutableFlow flow, List<String> emailList) {
+        
+        if(emailList != null && !emailList.isEmpty() && mailer != null) {
+            try {
+                mailer.sendEmailIfPossible(senderAddress,
+                                             emailList,
+                                             "Flow '" + flow.getFlowId() + "' has completed on "
+                                                     + InetAddress.getLocalHost().getHostName()
+                                                     + "!",
+                                             "The Flow '"
+                                                     + flow.getFlowId()
+                                                     + "' failed.");
+            } catch(UnknownHostException uhe) {
+                logger.error(uhe);
+            }
+            catch (Exception e) {
+                logger.error(e);
+            }
+        }
+    }
+    
+
+    private void sendSuccessEmail(ExecutableFlow flow, List<String> emailList) {
+        
+        if(emailList != null && !emailList.isEmpty() && mailer != null) {
+            try {
+                mailer.sendEmailIfPossible(senderAddress,
+                                             emailList,
+                                             "Flow '" + flow.getFlowId() + "' has completed on "
+                                                     + InetAddress.getLocalHost().getHostName()
+                                                     + "!",
+                                             "The Flow '"
+                                                     + flow.getFlowId()
+                                                     + "' was successful.");
+            } catch(UnknownHostException uhe) {
+                logger.error(uhe);
+            }
+            catch (Exception e) {
+                logger.error(e);
+            }
+        }
+    }
 }
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 11acd34..ac6bcc9 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -2,6 +2,9 @@ package azkaban.executor;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -23,7 +26,9 @@ import azkaban.utils.Props;
 public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
 			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+	
+	private static final String EMAILLIST = "notify.emails";
+	
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
@@ -101,9 +106,9 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 		}
 
-/*
+
 		// Run Job
-		boolean succeeded = true;
+		//boolean succeeded = true;
 
 		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 		JobWrappingFactory factory  = JobWrappingFactory.getJobWrappingFactory();
@@ -116,7 +121,7 @@ public class JobRunner extends EventHandler implements Runnable {
                 //logger.error("job run failed!");
                 e.printStackTrace();
         }
-		*/
+		
 		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
 			node.setStatus(Status.SUCCEEDED);
@@ -159,4 +164,10 @@ public class JobRunner extends EventHandler implements Runnable {
 	public Props getOutputProps() {
 		return outputProps;
 	}
+
+	public List<String> getNotifyEmails() {
+		List<String> emails = new ArrayList<String>();
+		emails = this.props.getStringList(EMAILLIST);
+		return emails;
+	}
 }
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index e850dd8..f55d511 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -39,6 +39,7 @@ import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import azkaban.utils.Mailman;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.FlowRunnerManager;
@@ -67,6 +68,8 @@ public class AzkabanExecutorServer {
 	private Props props;
 	private File tempDir;
 	private Server server;
+	
+	private final Mailman mailer;
 
 	/**
 	 * Constructor
@@ -89,8 +92,14 @@ public class AzkabanExecutorServer {
 		ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
 		root.addServlet(executorHolder, "/executor");
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
-		runnerManager = new FlowRunnerManager(props);
+        mailer = new Mailman(props.getString("mail.host", "localhost"),
+                props.getString("mail.user", ""),
+                props.getString("mail.password", ""),
+                props.getString("mail.sender", ""));
+        
+		runnerManager = new FlowRunnerManager(props, mailer);
 		
+
 		try {
 			server.start();
 		} 
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index cc32319..9a16b9e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -56,6 +56,7 @@
 							<th>Flow</th>
 							<th>Project</th>
 							<th>User</th>
+							<th>Submitted By</th>
 							<th class="date">First Scheduled to Run</th>
 							<th class="date">Next Execution Time</th>
 							<th class="date">Repeats Every</th>
@@ -76,6 +77,7 @@
 								<a href="${context}/manager?project=${sched.projectId}">${sched.projectId}</a>
 							</td>
 							<td>${sched.user}</td>
+							<td>${sched.userSubmit}</td>
 							<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
 							<td>$utils.formatDateTime(${sched.nextExecTime})</td>
 							<td>$utils.formatPeriod(${sched.period})</td>