azkaban-aplcache
Changes
src/java/azkaban/executor/FlowRunner.java 21(+21 -0)
src/java/azkaban/executor/FlowRunnerManager.java 119(+118 -1)
src/java/azkaban/executor/JobRunner.java 19(+15 -4)
Details
src/java/azkaban/executor/FlowRunner.java 21(+21 -0)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index e27a65d..e0865a3 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -5,8 +5,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -59,6 +61,9 @@ public class FlowRunner extends EventHandler implements Runnable {
private Appender flowAppender;
private Thread currentThread;
+
+ private Set<String> emailAddress;
+ private List<String> jobsFinished;
public enum FailedFlowOptions {
FINISH_RUNNING_JOBS, KILL_ALL
@@ -72,6 +77,8 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
+ this.emailAddress = new HashSet<String>();
+ this.jobsFinished = new ArrayList<String>();
createLogger();
}
@@ -80,6 +87,14 @@ public class FlowRunner extends EventHandler implements Runnable {
return flow;
}
+ public Set<String> getEmails() {
+ return emailAddress;
+ }
+
+ public List<String> getJobsFinished() {
+ return jobsFinished;
+ }
+
private void createLogger() {
// Create logger
String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
@@ -432,17 +447,23 @@ public class FlowRunner extends EventHandler implements Runnable {
System.out.println("Event " + jobID + " "
+ event.getType().toString());
+
+
// On Job success, we add the output props and then set up the next
// run.
if (event.getType() == Type.JOB_SUCCEEDED) {
logger.info("Job Succeeded " + jobID + " in "
+ (node.getEndTime() - node.getStartTime()) + " ms");
+ emailAddress.addAll(runner.getNotifyEmails());
+ jobsFinished.add(jobID);
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
flowRunner.handleSucceededJob(runner.getNode());
} else if (event.getType() == Type.JOB_FAILED) {
logger.info("Job Failed " + jobID + " in "
+ (node.getEndTime() - node.getStartTime()) + " ms");
+ emailAddress.addAll(runner.getNotifyEmails());
+ jobsFinished.add(jobID);
logger.info(jobID + " FAILED");
flowRunner.handleFailedJob(runner.getNode());
}
src/java/azkaban/executor/FlowRunnerManager.java 119(+118 -1)
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 28d78cf..6f4e91e 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -1,18 +1,33 @@
package azkaban.executor;
import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.servlet.ServletRequest;
+
import org.apache.log4j.Logger;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormat;
+import azkaban.utils.Utils;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventListener;
import azkaban.utils.ExecutableFlowLoader;
+import azkaban.utils.Mailman;
import azkaban.utils.Props;
/**
@@ -31,8 +46,22 @@ public class FlowRunnerManager {
private ExecutorService executorService;
private SubmitterThread submitterThread;
private FlowRunnerEventListener eventListener;
+
+ private Mailman mailer;
+ //private String defaultFailureEmail;
+ //private String defaultSuccessEmail;
+ private String senderAddress;
+ private String clientHostname;
+ private String clientPortNumber;
- public FlowRunnerManager(Props props) {
+ public FlowRunnerManager(Props props, Mailman mailer) {
+ this.mailer = mailer;
+// this.defaultFailureEmail = props.getString("job.failure.email");
+// this.defaultSuccessEmail = props.getString("job.success.email");
+ this.senderAddress = props.getString("mail.sender");
+ this.clientHostname = props.getString("jetty.hostname", "localhost");
+ this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+
basePath = new File(props.getString("execution.directory"));
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
@@ -128,11 +157,99 @@ public class FlowRunnerManager {
FlowRunner runner = (FlowRunner)event.getRunner();
ExecutableFlow flow = runner.getFlow();
+
+
System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
if (event.getType() == Type.FLOW_FINISHED) {
+ if(flow.getStatus() == Status.SUCCEEDED)
+ sendSuccessEmail(runner);
+ else sendErrorEmail(runner);
+
logger.info("Flow " + flow.getExecutionId() + " has finished.");
runningFlows.remove(flow.getExecutionId());
+
}
}
}
+
+ private List<String> getLogURLs(FlowRunner runner)
+ {
+ List<String> logURLs = new ArrayList<String>();
+
+ String flowID = runner.getFlow().getFlowId();
+ String execID = runner.getFlow().getExecutionId();
+ List<String> jobIDs = runner.getJobsFinished();
+
+ //first construct log URL;
+ String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "#log";
+ logURLs.add(logURL);
+ //then the individual jobs log URL that actually ran
+ for(String jobID : jobIDs) {
+ String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "&flow=" + flowID + "&job=" + jobID;
+ logURLs.add(jobLog);
+ }
+
+ return logURLs;
+ }
+
+ /*
+ * Wrap a single exception with the name of the scheduled job
+ */
+ private void sendErrorEmail(FlowRunner runner) {
+ ExecutableFlow flow = runner.getFlow();
+ List<String> emailList = new ArrayList<String>(runner.getEmails());
+ if(emailList != null && !emailList.isEmpty() && mailer != null) {
+
+
+
+
+ try {
+
+ String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+ String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n" ;
+ for(String URL : getLogURLs(runner)) {
+ body += (URL + "\n");
+ }
+
+ mailer.sendEmailIfPossible(senderAddress,
+ emailList,
+ subject,
+ body);
+ } catch(UnknownHostException uhe) {
+ logger.error(uhe);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+
+
+ private void sendSuccessEmail(FlowRunner runner) {
+
+ ExecutableFlow flow = runner.getFlow();
+ List<String> emailList = new ArrayList<String>(runner.getEmails());
+
+ if(emailList != null && !emailList.isEmpty() && mailer != null) {
+ try {
+
+ String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
+ String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n" ;
+ for(String URL : getLogURLs(runner)) {
+ body += (URL + "\n");
+ }
+
+ mailer.sendEmailIfPossible(senderAddress,
+ emailList,
+ subject,
+ body);
+ } catch(UnknownHostException uhe) {
+ logger.error(uhe);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+
}
src/java/azkaban/executor/JobRunner.java 19(+15 -4)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 11acd34..ac6bcc9 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -2,6 +2,9 @@ package azkaban.executor;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -23,7 +26,9 @@ import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
+
+ private static final String EMAILLIST = "notify.emails";
+
private Props props;
private Props outputProps;
private ExecutableNode node;
@@ -101,9 +106,9 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
-/*
+
// Run Job
- boolean succeeded = true;
+ //boolean succeeded = true;
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
JobWrappingFactory factory = JobWrappingFactory.getJobWrappingFactory();
@@ -116,7 +121,7 @@ public class JobRunner extends EventHandler implements Runnable {
//logger.error("job run failed!");
e.printStackTrace();
}
- */
+
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
@@ -159,4 +164,10 @@ public class JobRunner extends EventHandler implements Runnable {
public Props getOutputProps() {
return outputProps;
}
+
+ public List<String> getNotifyEmails() {
+ List<String> emails = new ArrayList<String>();
+ emails = this.props.getStringList(EMAILLIST);
+ return emails;
+ }
}
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index ee86bab..8eed304 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -223,6 +223,7 @@ public class FileProjectManager implements ProjectManager {
List<String> errors = new ArrayList<String>();
DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
loader.loadProjectFlow(dir);
+ errors.addAll(loader.getErrors());
Map<String, Flow> flows = loader.getFlowMap();
File projectPath = new File(projectDirectory, projectName);
@@ -253,7 +254,9 @@ public class FileProjectManager implements ProjectManager {
dir.renameTo(destDirectory);
// We install only if the project is not forced install or has no errors
- if (force || errors.isEmpty()) {
+ //if (force || errors.isEmpty()) {
+ // We don't do force install any more
+ if (errors.isEmpty()) {
// We synchronize on project so that we don't collide when
// uploading.
synchronized (project) {
diff --git a/src/java/azkaban/utils/DirectoryFlowLoader.java b/src/java/azkaban/utils/DirectoryFlowLoader.java
index acc0b9e..d376522 100644
--- a/src/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/src/java/azkaban/utils/DirectoryFlowLoader.java
@@ -41,6 +41,10 @@ public class DirectoryFlowLoader {
return flowMap;
}
+ public Set<String> getErrors() {
+ return errors;
+ }
+
public void loadProjectFlow(File baseDirectory) {
propsList = new ArrayList<Props>();
flowPropsList = new ArrayList<FlowProps>();
@@ -59,6 +63,7 @@ public class DirectoryFlowLoader {
// Create the flows.
buildFlowsFromDependencies();
+
}
private void loadProjectFromDir(String base, File dir) {
@@ -99,8 +104,11 @@ public class DirectoryFlowLoader {
prop.setSource(relative);
Node node = new Node(jobName);
- String type = prop.getString("type", "none");
- errors.add("Job doesn't have type set '" + jobName + "'.");
+ String type = prop.getString("type", null);
+ if(type == null) {
+ errors.add("Job doesn't have type set '" + jobName + "'.");
+ }
+
node.setType(type);
node.setJobSource(relative);
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index e850dd8..f55d511 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -39,6 +39,7 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import azkaban.utils.Mailman;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.FlowRunnerManager;
@@ -67,6 +68,8 @@ public class AzkabanExecutorServer {
private Props props;
private File tempDir;
private Server server;
+
+ private final Mailman mailer;
/**
* Constructor
@@ -89,8 +92,14 @@ public class AzkabanExecutorServer {
ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
root.addServlet(executorHolder, "/executor");
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
- runnerManager = new FlowRunnerManager(props);
+ mailer = new Mailman(props.getString("mail.host", "localhost"),
+ props.getString("mail.user", ""),
+ props.getString("mail.password", ""),
+ props.getString("mail.sender", ""));
+
+ runnerManager = new FlowRunnerManager(props, mailer);
+
try {
server.start();
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index cc32319..9a16b9e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -56,6 +56,7 @@
<th>Flow</th>
<th>Project</th>
<th>User</th>
+ <th>Submitted By</th>
<th class="date">First Scheduled to Run</th>
<th class="date">Next Execution Time</th>
<th class="date">Repeats Every</th>
@@ -76,6 +77,7 @@
<a href="${context}/manager?project=${sched.projectId}">${sched.projectId}</a>
</td>
<td>${sched.user}</td>
+ <td>${sched.userSubmit}</td>
<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
<td>$utils.formatDateTime(${sched.nextExecTime})</td>
<td>$utils.formatPeriod(${sched.period})</td>