azkaban-aplcache

Details

diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index c26d050..dd4bae4 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -63,6 +63,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Thread currentThread;
 	
 	private Set<String> emailAddress;
+	private List<String> jobsFinished;
 
 	public enum FailedFlowOptions {
 		FINISH_RUNNING_JOBS, KILL_ALL
@@ -77,6 +78,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
 		this.emailAddress = new HashSet<String>();
+		this.jobsFinished = new ArrayList<String>();
 
 		createLogger();
 	}
@@ -89,6 +91,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return emailAddress;
 	}
 	
+	public List<String> getJobsFinished() {
+		return jobsFinished;
+	}
+	
 	private void createLogger() {
 		// Create logger
 		String loggerName = System.currentTimeMillis() + "."
@@ -442,19 +448,23 @@ public class FlowRunner extends EventHandler implements Runnable {
 			System.out.println("Event " + jobID + " "
 					+ event.getType().toString());
 
-			emailAddress.addAll(runner.getNotifyEmails());
+			
 			
 			// On Job success, we add the output props and then set up the next
 			// run.
 			if (event.getType() == Type.JOB_SUCCEEDED) {
 				logger.info("Job Succeeded " + jobID + " in "
 						+ (node.getEndTime() - node.getStartTime()) + " ms");
+				emailAddress.addAll(runner.getNotifyEmails());
+				jobsFinished.add(jobID);
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
 				flowRunner.handleSucceededJob(runner.getNode());
 			} else if (event.getType() == Type.JOB_FAILED) {
 				logger.info("Job Failed " + jobID + " in "
 						+ (node.getEndTime() - node.getStartTime()) + " ms");
+				emailAddress.addAll(runner.getNotifyEmails());
+				jobsFinished.add(jobID);
 				logger.info(jobID + " FAILED");
 				flowRunner.handleFailedJob(runner.getNode());
 			}
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 2d5a7ae..6f4e91e 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -15,6 +15,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.servlet.ServletRequest;
+
 import org.apache.log4j.Logger;
 import org.joda.time.Duration;
 import org.joda.time.format.PeriodFormat;
@@ -46,15 +48,20 @@ public class FlowRunnerManager {
 	private FlowRunnerEventListener eventListener;
 
 	private Mailman mailer;
-	private String defaultFailureEmail;
-	private String defaultSuccessEmail;
+	//private String defaultFailureEmail;
+	//private String defaultSuccessEmail;
 	private String senderAddress;
+	private String clientHostname;
+	private String clientPortNumber;
 	
 	public FlowRunnerManager(Props props, Mailman mailer) {
 		this.mailer = mailer;
 //		this.defaultFailureEmail = props.getString("job.failure.email");
 //		this.defaultSuccessEmail = props.getString("job.success.email");
 		this.senderAddress = props.getString("mail.sender");
+		this.clientHostname = props.getString("jetty.hostname", "localhost");
+		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+		
 		basePath = new File(props.getString("execution.directory"));
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
@@ -150,13 +157,13 @@ public class FlowRunnerManager {
 			FlowRunner runner = (FlowRunner)event.getRunner();
 			ExecutableFlow flow = runner.getFlow();
 			
-			List<String> emailList = new ArrayList<String>(runner.getEmails());
+			
 			
 			System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
 			if (event.getType() == Type.FLOW_FINISHED) {
 				if(flow.getStatus() == Status.SUCCEEDED)
-					sendSuccessEmail(flow, emailList);
-				else sendErrorEmail(flow, emailList);
+					sendSuccessEmail(runner);
+				else sendErrorEmail(runner);
 				
 				logger.info("Flow " + flow.getExecutionId() + " has finished.");
 				runningFlows.remove(flow.getExecutionId());
@@ -165,21 +172,49 @@ public class FlowRunnerManager {
 		}
 	}
 	
+	private List<String> getLogURLs(FlowRunner runner)
+	{
+		List<String> logURLs = new ArrayList<String>();
+		
+		String flowID = runner.getFlow().getFlowId();
+		String execID = runner.getFlow().getExecutionId();
+		List<String> jobIDs = runner.getJobsFinished();
+		
+		//first construct log URL;
+		String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "#log";
+		logURLs.add(logURL);
+		//then the individual jobs log URL that actually ran
+		for(String jobID : jobIDs) {
+			String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "&flow=" + flowID + "&job=" + jobID;
+			logURLs.add(jobLog);
+		}
+		
+		return logURLs;
+	}
+	
     /*
      * Wrap a single exception with the name of the scheduled job
      */
-    private void sendErrorEmail(ExecutableFlow flow, List<String> emailList) {
-        
+    private void sendErrorEmail(FlowRunner runner) {
+    	ExecutableFlow flow = runner.getFlow();
+    	List<String> emailList = new ArrayList<String>(runner.getEmails());
         if(emailList != null && !emailList.isEmpty() && mailer != null) {
+        	
+        	
+        	
+        	
             try {
+            	
+            	String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+            	String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n" ;
+            	for(String URL : getLogURLs(runner)) {
+            		body += (URL + "\n");
+            	}
+            	
                 mailer.sendEmailIfPossible(senderAddress,
                                              emailList,
-                                             "Flow '" + flow.getFlowId() + "' has completed on "
-                                                     + InetAddress.getLocalHost().getHostName()
-                                                     + "!",
-                                             "The Flow '"
-                                                     + flow.getFlowId()
-                                                     + "' failed.");
+                                             subject,
+                                             body);
             } catch(UnknownHostException uhe) {
                 logger.error(uhe);
             }
@@ -190,18 +225,24 @@ public class FlowRunnerManager {
     }
     
 
-    private void sendSuccessEmail(ExecutableFlow flow, List<String> emailList) {
+    private void sendSuccessEmail(FlowRunner runner) {
+    	
+    	ExecutableFlow flow = runner.getFlow();
+    	List<String> emailList = new ArrayList<String>(runner.getEmails());
         
         if(emailList != null && !emailList.isEmpty() && mailer != null) {
             try {
+            	
+            	String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+            	String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n" ;
+            	for(String URL : getLogURLs(runner)) {
+            		body += (URL + "\n");
+            	}
+            	
                 mailer.sendEmailIfPossible(senderAddress,
                                              emailList,
-                                             "Flow '" + flow.getFlowId() + "' has completed on "
-                                                     + InetAddress.getLocalHost().getHostName()
-                                                     + "!",
-                                             "The Flow '"
-                                                     + flow.getFlowId()
-                                                     + "' was successful.");
+                                             subject,
+                                             body);
             } catch(UnknownHostException uhe) {
                 logger.error(uhe);
             }
@@ -210,4 +251,5 @@ public class FlowRunnerManager {
             }
         }
     }
+    
 }