azkaban-aplcache

Details

diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index e27a65d..e0865a3 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -5,8 +5,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -59,6 +61,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Appender flowAppender;
 
 	private Thread currentThread;
+	
+	private Set<String> emailAddress;
+	private List<String> jobsFinished;
 
 	public enum FailedFlowOptions {
 		FINISH_RUNNING_JOBS, KILL_ALL
@@ -72,6 +77,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
+		this.emailAddress = new HashSet<String>();
+		this.jobsFinished = new ArrayList<String>();
 
 		createLogger();
 	}
@@ -80,6 +87,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return flow;
 	}
 
+	public Set<String> getEmails() {
+		return emailAddress;
+	}
+	
+	public List<String> getJobsFinished() {
+		return jobsFinished;
+	}
+	
 	private void createLogger() {
 		// Create logger
 		String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
@@ -432,17 +447,23 @@ public class FlowRunner extends EventHandler implements Runnable {
 			System.out.println("Event " + jobID + " "
 					+ event.getType().toString());
 
+			
+			
 			// On Job success, we add the output props and then set up the next
 			// run.
 			if (event.getType() == Type.JOB_SUCCEEDED) {
 				logger.info("Job Succeeded " + jobID + " in "
 						+ (node.getEndTime() - node.getStartTime()) + " ms");
+				emailAddress.addAll(runner.getNotifyEmails());
+				jobsFinished.add(jobID);
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
 				flowRunner.handleSucceededJob(runner.getNode());
 			} else if (event.getType() == Type.JOB_FAILED) {
 				logger.info("Job Failed " + jobID + " in "
 						+ (node.getEndTime() - node.getStartTime()) + " ms");
+				emailAddress.addAll(runner.getNotifyEmails());
+				jobsFinished.add(jobID);
 				logger.info(jobID + " FAILED");
 				flowRunner.handleFailedJob(runner.getNode());
 			}
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 28d78cf..6f4e91e 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -1,18 +1,33 @@
 package azkaban.executor;
 
 import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.servlet.ServletRequest;
+
 import org.apache.log4j.Logger;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormat;
 
+import azkaban.utils.Utils;
+import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
 import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventListener;
 import azkaban.utils.ExecutableFlowLoader;
+import azkaban.utils.Mailman;
 import azkaban.utils.Props;
 
 /**
@@ -31,8 +46,22 @@ public class FlowRunnerManager {
 	private ExecutorService executorService;
 	private SubmitterThread submitterThread;
 	private FlowRunnerEventListener eventListener;
+
+	private Mailman mailer;
+	//private String defaultFailureEmail;
+	//private String defaultSuccessEmail;
+	private String senderAddress;
+	private String clientHostname;
+	private String clientPortNumber;
 	
-	public FlowRunnerManager(Props props) {
+	public FlowRunnerManager(Props props, Mailman mailer) {
+		this.mailer = mailer;
+//		this.defaultFailureEmail = props.getString("job.failure.email");
+//		this.defaultSuccessEmail = props.getString("job.success.email");
+		this.senderAddress = props.getString("mail.sender");
+		this.clientHostname = props.getString("jetty.hostname", "localhost");
+		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+		
 		basePath = new File(props.getString("execution.directory"));
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
@@ -128,11 +157,99 @@ public class FlowRunnerManager {
 			FlowRunner runner = (FlowRunner)event.getRunner();
 			ExecutableFlow flow = runner.getFlow();
 			
+			
+			
 			System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
 			if (event.getType() == Type.FLOW_FINISHED) {
+				if(flow.getStatus() == Status.SUCCEEDED)
+					sendSuccessEmail(runner);
+				else sendErrorEmail(runner);
+				
 				logger.info("Flow " + flow.getExecutionId() + " has finished.");
 				runningFlows.remove(flow.getExecutionId());
+				
 			}
 		}
 	}
+	
+	private List<String> getLogURLs(FlowRunner runner)
+	{
+		List<String> logURLs = new ArrayList<String>();
+		
+		String flowID = runner.getFlow().getFlowId();
+		String execID = runner.getFlow().getExecutionId();
+		List<String> jobIDs = runner.getJobsFinished();
+		
+		//first construct log URL;
+		String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "#log";
+		logURLs.add(logURL);
+		//then the individual jobs log URL that actually ran
+		for(String jobID : jobIDs) {
+			String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "&flow=" + flowID + "&job=" + jobID;
+			logURLs.add(jobLog);
+		}
+		
+		return logURLs;
+	}
+	
+    /*
+     * Wrap a single exception with the name of the scheduled job
+     */
+    private void sendErrorEmail(FlowRunner runner) {
+    	ExecutableFlow flow = runner.getFlow();
+    	List<String> emailList = new ArrayList<String>(runner.getEmails());
+        if(emailList != null && !emailList.isEmpty() && mailer != null) {
+        	
+        	
+        	
+        	
+            try {
+            	
+            	String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+            	String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n" ;
+            	for(String URL : getLogURLs(runner)) {
+            		body += (URL + "\n");
+            	}
+            	
+                mailer.sendEmailIfPossible(senderAddress,
+                                             emailList,
+                                             subject,
+                                             body);
+            } catch(UnknownHostException uhe) {
+                logger.error(uhe);
+            }
+            catch (Exception e) {
+                logger.error(e);
+            }
+        }
+    }
+    
+
+    private void sendSuccessEmail(FlowRunner runner) {
+    	
+    	ExecutableFlow flow = runner.getFlow();
+    	List<String> emailList = new ArrayList<String>(runner.getEmails());
+        
+        if(emailList != null && !emailList.isEmpty() && mailer != null) {
+            try {
+            	
+            	String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+            	String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n" ;
+            	for(String URL : getLogURLs(runner)) {
+            		body += (URL + "\n");
+            	}
+            	
+                mailer.sendEmailIfPossible(senderAddress,
+                                             emailList,
+                                             subject,
+                                             body);
+            } catch(UnknownHostException uhe) {
+                logger.error(uhe);
+            }
+            catch (Exception e) {
+                logger.error(e);
+            }
+        }
+    }
+    
 }
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 11acd34..ac6bcc9 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -2,6 +2,9 @@ package azkaban.executor;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -23,7 +26,9 @@ import azkaban.utils.Props;
 public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
 			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+	
+	private static final String EMAILLIST = "notify.emails";
+	
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
@@ -101,9 +106,9 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 		}
 
-/*
+
 		// Run Job
-		boolean succeeded = true;
+		//boolean succeeded = true;
 
 		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 		JobWrappingFactory factory  = JobWrappingFactory.getJobWrappingFactory();
@@ -116,7 +121,7 @@ public class JobRunner extends EventHandler implements Runnable {
                 //logger.error("job run failed!");
                 e.printStackTrace();
         }
-		*/
+		
 		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
 			node.setStatus(Status.SUCCEEDED);
@@ -159,4 +164,10 @@ public class JobRunner extends EventHandler implements Runnable {
 	public Props getOutputProps() {
 		return outputProps;
 	}
+
+	public List<String> getNotifyEmails() {
+		List<String> emails = new ArrayList<String>();
+		emails = this.props.getStringList(EMAILLIST);
+		return emails;
+	}
 }
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index ee86bab..8eed304 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -223,6 +223,7 @@ public class FileProjectManager implements ProjectManager {
 		List<String> errors = new ArrayList<String>();
 		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
 		loader.loadProjectFlow(dir);
+		errors.addAll(loader.getErrors());
 		Map<String, Flow> flows = loader.getFlowMap();
 
 		File projectPath = new File(projectDirectory, projectName);
@@ -253,7 +254,9 @@ public class FileProjectManager implements ProjectManager {
 		dir.renameTo(destDirectory);
 
 		// We install only if the project is not forced install or has no errors
-		if (force || errors.isEmpty()) {
+		//if (force || errors.isEmpty()) {
+		// We don't do force install any more
+		if (errors.isEmpty()) {
 			// We synchronize on project so that we don't collide when
 			// uploading.
 			synchronized (project) {
diff --git a/src/java/azkaban/utils/DirectoryFlowLoader.java b/src/java/azkaban/utils/DirectoryFlowLoader.java
index acc0b9e..d376522 100644
--- a/src/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/src/java/azkaban/utils/DirectoryFlowLoader.java
@@ -41,6 +41,10 @@ public class DirectoryFlowLoader {
 		return flowMap;
 	}
 	
+	public Set<String> getErrors() {
+		return errors;
+	}
+	
 	public void loadProjectFlow(File baseDirectory) {
 		propsList = new ArrayList<Props>();
 		flowPropsList = new ArrayList<FlowProps>();
@@ -59,6 +63,7 @@ public class DirectoryFlowLoader {
 
 		// Create the flows.
 		buildFlowsFromDependencies();
+
 	}
 	
 	private void loadProjectFromDir(String base, File dir) {
@@ -99,8 +104,11 @@ public class DirectoryFlowLoader {
 						prop.setSource(relative);
 						
 						Node node = new Node(jobName);
-						String type = prop.getString("type", "none");
-						errors.add("Job doesn't have type set '" + jobName + "'.");
+						String type = prop.getString("type", null);
+						if(type == null) {
+							errors.add("Job doesn't have type set '" + jobName + "'.");
+						}
+						
 						node.setType(type);
 						
 						node.setJobSource(relative);
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index e850dd8..f55d511 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -39,6 +39,7 @@ import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import azkaban.utils.Mailman;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.FlowRunnerManager;
@@ -67,6 +68,8 @@ public class AzkabanExecutorServer {
 	private Props props;
 	private File tempDir;
 	private Server server;
+	
+	private final Mailman mailer;
 
 	/**
 	 * Constructor
@@ -89,8 +92,14 @@ public class AzkabanExecutorServer {
 		ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
 		root.addServlet(executorHolder, "/executor");
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
-		runnerManager = new FlowRunnerManager(props);
+        mailer = new Mailman(props.getString("mail.host", "localhost"),
+                props.getString("mail.user", ""),
+                props.getString("mail.password", ""),
+                props.getString("mail.sender", ""));
+        
+		runnerManager = new FlowRunnerManager(props, mailer);
 		
+
 		try {
 			server.start();
 		} 
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index cc32319..9a16b9e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -56,6 +56,7 @@
 							<th>Flow</th>
 							<th>Project</th>
 							<th>User</th>
+							<th>Submitted By</th>
 							<th class="date">First Scheduled to Run</th>
 							<th class="date">Next Execution Time</th>
 							<th class="date">Repeats Every</th>
@@ -76,6 +77,7 @@
 								<a href="${context}/manager?project=${sched.projectId}">${sched.projectId}</a>
 							</td>
 							<td>${sched.user}</td>
+							<td>${sched.userSubmit}</td>
 							<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
 							<td>$utils.formatDateTime(${sched.nextExecTime})</td>
 							<td>$utils.formatPeriod(${sched.period})</td>