azkaban-aplcache
Changes
src/java/azkaban/executor/FlowRunner.java 11(+11 -0)
src/java/azkaban/executor/JobRunner.java 19(+15 -4)
Details
src/java/azkaban/executor/FlowRunner.java 11(+11 -0)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 09628b4..c26d050 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -5,8 +5,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -59,6 +61,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private Appender flowAppender;
private Thread currentThread;
+
+ private Set<String> emailAddress;
public enum FailedFlowOptions {
FINISH_RUNNING_JOBS, KILL_ALL
@@ -72,6 +76,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
+ this.emailAddress = new HashSet<String>();
createLogger();
}
@@ -80,6 +85,10 @@ public class FlowRunner extends EventHandler implements Runnable {
return flow;
}
+ public Set<String> getEmails() {
+ return emailAddress;
+ }
+
private void createLogger() {
// Create logger
String loggerName = System.currentTimeMillis() + "."
@@ -433,6 +442,8 @@ public class FlowRunner extends EventHandler implements Runnable {
System.out.println("Event " + jobID + " "
+ event.getType().toString());
+ emailAddress.addAll(runner.getNotifyEmails());
+
// On Job success, we add the output props and then set up the next
// run.
if (event.getType() == Type.JOB_SUCCEEDED) {
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 28d78cf..2d5a7ae 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -1,6 +1,14 @@
package azkaban.executor;
import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -8,11 +16,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormat;
+import azkaban.utils.Utils;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventListener;
import azkaban.utils.ExecutableFlowLoader;
+import azkaban.utils.Mailman;
import azkaban.utils.Props;
/**
@@ -31,8 +44,17 @@ public class FlowRunnerManager {
private ExecutorService executorService;
private SubmitterThread submitterThread;
private FlowRunnerEventListener eventListener;
+
+ private Mailman mailer;
+ private String defaultFailureEmail;
+ private String defaultSuccessEmail;
+ private String senderAddress;
- public FlowRunnerManager(Props props) {
+ public FlowRunnerManager(Props props, Mailman mailer) {
+ this.mailer = mailer;
+// this.defaultFailureEmail = props.getString("job.failure.email");
+// this.defaultSuccessEmail = props.getString("job.success.email");
+ this.senderAddress = props.getString("mail.sender");
basePath = new File(props.getString("execution.directory"));
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
@@ -128,11 +150,64 @@ public class FlowRunnerManager {
FlowRunner runner = (FlowRunner)event.getRunner();
ExecutableFlow flow = runner.getFlow();
+ List<String> emailList = new ArrayList<String>(runner.getEmails());
+
System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
if (event.getType() == Type.FLOW_FINISHED) {
+ if(flow.getStatus() == Status.SUCCEEDED)
+ sendSuccessEmail(flow, emailList);
+ else sendErrorEmail(flow, emailList);
+
logger.info("Flow " + flow.getExecutionId() + " has finished.");
runningFlows.remove(flow.getExecutionId());
+
}
}
}
+
+ /*
+ * Wrap a single exception with the name of the scheduled job
+ */
+ private void sendErrorEmail(ExecutableFlow flow, List<String> emailList) {
+
+ if(emailList != null && !emailList.isEmpty() && mailer != null) {
+ try {
+ mailer.sendEmailIfPossible(senderAddress,
+ emailList,
+ "Flow '" + flow.getFlowId() + "' has completed on "
+ + InetAddress.getLocalHost().getHostName()
+ + "!",
+ "The Flow '"
+ + flow.getFlowId()
+ + "' failed.");
+ } catch(UnknownHostException uhe) {
+ logger.error(uhe);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+
+
+ private void sendSuccessEmail(ExecutableFlow flow, List<String> emailList) {
+
+ if(emailList != null && !emailList.isEmpty() && mailer != null) {
+ try {
+ mailer.sendEmailIfPossible(senderAddress,
+ emailList,
+ "Flow '" + flow.getFlowId() + "' has completed on "
+ + InetAddress.getLocalHost().getHostName()
+ + "!",
+ "The Flow '"
+ + flow.getFlowId()
+ + "' was successful.");
+ } catch(UnknownHostException uhe) {
+ logger.error(uhe);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
}
src/java/azkaban/executor/JobRunner.java 19(+15 -4)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 11acd34..ac6bcc9 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -2,6 +2,9 @@ package azkaban.executor;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -23,7 +26,9 @@ import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+
+ private static final String EMAILLIST = "notify.emails";
+
private Props props;
private Props outputProps;
private ExecutableNode node;
@@ -101,9 +106,9 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
-/*
+
// Run Job
- boolean succeeded = true;
+ //boolean succeeded = true;
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
JobWrappingFactory factory = JobWrappingFactory.getJobWrappingFactory();
@@ -116,7 +121,7 @@ public class JobRunner extends EventHandler implements Runnable {
//logger.error("job run failed!");
e.printStackTrace();
}
- */
+
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
@@ -159,4 +164,10 @@ public class JobRunner extends EventHandler implements Runnable {
public Props getOutputProps() {
return outputProps;
}
+
+ public List<String> getNotifyEmails() {
+ List<String> emails = new ArrayList<String>();
+ emails = this.props.getStringList(EMAILLIST);
+ return emails;
+ }
}
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index e850dd8..f55d511 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -39,6 +39,7 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import azkaban.utils.Mailman;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.FlowRunnerManager;
@@ -67,6 +68,8 @@ public class AzkabanExecutorServer {
private Props props;
private File tempDir;
private Server server;
+
+ private final Mailman mailer;
/**
* Constructor
@@ -89,8 +92,14 @@ public class AzkabanExecutorServer {
ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
root.addServlet(executorHolder, "/executor");
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
- runnerManager = new FlowRunnerManager(props);
+ mailer = new Mailman(props.getString("mail.host", "localhost"),
+ props.getString("mail.user", ""),
+ props.getString("mail.password", ""),
+ props.getString("mail.sender", ""));
+
+ runnerManager = new FlowRunnerManager(props, mailer);
+
try {
server.start();
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index cc32319..9a16b9e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -56,6 +56,7 @@
<th>Flow</th>
<th>Project</th>
<th>User</th>
+ <th>Submitted By</th>
<th class="date">First Scheduled to Run</th>
<th class="date">Next Execution Time</th>
<th class="date">Repeats Every</th>
@@ -76,6 +77,7 @@
<a href="${context}/manager?project=${sched.projectId}">${sched.projectId}</a>
</td>
<td>${sched.user}</td>
+ <td>${sched.userSubmit}</td>
<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
<td>$utils.formatDateTime(${sched.nextExecTime})</td>
<td>$utils.formatPeriod(${sched.period})</td>