azkaban-aplcache
Changes
src/java/azkaban/executor/FlowRunner.java 12(+11 -1)
src/java/azkaban/executor/FlowRunnerManager.java 82(+62 -20)
Details
src/java/azkaban/executor/FlowRunner.java 12(+11 -1)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index c26d050..dd4bae4 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -63,6 +63,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private Thread currentThread;
private Set<String> emailAddress;
+ private List<String> jobsFinished;
public enum FailedFlowOptions {
FINISH_RUNNING_JOBS, KILL_ALL
@@ -77,6 +78,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
this.emailAddress = new HashSet<String>();
+ this.jobsFinished = new ArrayList<String>();
createLogger();
}
@@ -89,6 +91,10 @@ public class FlowRunner extends EventHandler implements Runnable {
return emailAddress;
}
+ public List<String> getJobsFinished() {
+ return jobsFinished;
+ }
+
private void createLogger() {
// Create logger
String loggerName = System.currentTimeMillis() + "."
@@ -442,19 +448,23 @@ public class FlowRunner extends EventHandler implements Runnable {
System.out.println("Event " + jobID + " "
+ event.getType().toString());
- emailAddress.addAll(runner.getNotifyEmails());
+
// On Job success, we add the output props and then set up the next
// run.
if (event.getType() == Type.JOB_SUCCEEDED) {
logger.info("Job Succeeded " + jobID + " in "
+ (node.getEndTime() - node.getStartTime()) + " ms");
+ emailAddress.addAll(runner.getNotifyEmails());
+ jobsFinished.add(jobID);
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
flowRunner.handleSucceededJob(runner.getNode());
} else if (event.getType() == Type.JOB_FAILED) {
logger.info("Job Failed " + jobID + " in "
+ (node.getEndTime() - node.getStartTime()) + " ms");
+ emailAddress.addAll(runner.getNotifyEmails());
+ jobsFinished.add(jobID);
logger.info(jobID + " FAILED");
flowRunner.handleFailedJob(runner.getNode());
}
src/java/azkaban/executor/FlowRunnerManager.java 82(+62 -20)
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 2d5a7ae..6f4e91e 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -15,6 +15,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.servlet.ServletRequest;
+
import org.apache.log4j.Logger;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormat;
@@ -46,15 +48,20 @@ public class FlowRunnerManager {
private FlowRunnerEventListener eventListener;
private Mailman mailer;
- private String defaultFailureEmail;
- private String defaultSuccessEmail;
+ //private String defaultFailureEmail;
+ //private String defaultSuccessEmail;
private String senderAddress;
+ private String clientHostname;
+ private String clientPortNumber;
public FlowRunnerManager(Props props, Mailman mailer) {
this.mailer = mailer;
// this.defaultFailureEmail = props.getString("job.failure.email");
// this.defaultSuccessEmail = props.getString("job.success.email");
this.senderAddress = props.getString("mail.sender");
+ this.clientHostname = props.getString("jetty.hostname", "localhost");
+ this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+
basePath = new File(props.getString("execution.directory"));
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
@@ -150,13 +157,13 @@ public class FlowRunnerManager {
FlowRunner runner = (FlowRunner)event.getRunner();
ExecutableFlow flow = runner.getFlow();
- List<String> emailList = new ArrayList<String>(runner.getEmails());
+
System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
if (event.getType() == Type.FLOW_FINISHED) {
if(flow.getStatus() == Status.SUCCEEDED)
- sendSuccessEmail(flow, emailList);
- else sendErrorEmail(flow, emailList);
+ sendSuccessEmail(runner);
+ else sendErrorEmail(runner);
logger.info("Flow " + flow.getExecutionId() + " has finished.");
runningFlows.remove(flow.getExecutionId());
@@ -165,21 +172,49 @@ public class FlowRunnerManager {
}
}
+ private List<String> getLogURLs(FlowRunner runner)
+ {
+ List<String> logURLs = new ArrayList<String>();
+
+ String flowID = runner.getFlow().getFlowId();
+ String execID = runner.getFlow().getExecutionId();
+ List<String> jobIDs = runner.getJobsFinished();
+
+ //first construct log URL;
+ String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "#log";
+ logURLs.add(logURL);
+ //then the individual jobs log URL that actually ran
+ for(String jobID : jobIDs) {
+ String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "&flow=" + flowID + "&job=" + jobID;
+ logURLs.add(jobLog);
+ }
+
+ return logURLs;
+ }
+
/*
* Wrap a single exception with the name of the scheduled job
*/
- private void sendErrorEmail(ExecutableFlow flow, List<String> emailList) {
-
+ private void sendErrorEmail(FlowRunner runner) {
+ ExecutableFlow flow = runner.getFlow();
+ List<String> emailList = new ArrayList<String>(runner.getEmails());
if(emailList != null && !emailList.isEmpty() && mailer != null) {
+
+
+
+
try {
+
+ String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+ String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n" ;
+ for(String URL : getLogURLs(runner)) {
+ body += (URL + "\n");
+ }
+
mailer.sendEmailIfPossible(senderAddress,
emailList,
- "Flow '" + flow.getFlowId() + "' has completed on "
- + InetAddress.getLocalHost().getHostName()
- + "!",
- "The Flow '"
- + flow.getFlowId()
- + "' failed.");
+ subject,
+ body);
} catch(UnknownHostException uhe) {
logger.error(uhe);
}
@@ -190,18 +225,24 @@ public class FlowRunnerManager {
}
- private void sendSuccessEmail(ExecutableFlow flow, List<String> emailList) {
+ private void sendSuccessEmail(FlowRunner runner) {
+
+ ExecutableFlow flow = runner.getFlow();
+ List<String> emailList = new ArrayList<String>(runner.getEmails());
if(emailList != null && !emailList.isEmpty() && mailer != null) {
try {
+
+ String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+ String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n" ;
+ for(String URL : getLogURLs(runner)) {
+ body += (URL + "\n");
+ }
+
mailer.sendEmailIfPossible(senderAddress,
emailList,
- "Flow '" + flow.getFlowId() + "' has completed on "
- + InetAddress.getLocalHost().getHostName()
- + "!",
- "The Flow '"
- + flow.getFlowId()
- + "' was successful.");
+ subject,
+ body);
} catch(UnknownHostException uhe) {
logger.error(uhe);
}
@@ -210,4 +251,5 @@ public class FlowRunnerManager {
}
}
}
+
}