azkaban-uncached
Changes
README.md 4(+2 -2)
src/java/azkaban/executor/ExecutorMailer.java 50(+36 -14)
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
src/java/azkaban/utils/EmailMessage.java 18(+16 -2)
Details
README.md 4(+2 -2)
diff --git a/README.md b/README.md
index 7375cbe..d0a972f 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-# Azkaban2
+## Azkaban2
-For all Azkaban Plugins documentation, please go to
+For Azkaban documentation, please go to
[Azkaban Project Site](http://azkaban.github.io/azkaban2/)
There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d707c1f..09bf00a 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -389,12 +389,24 @@ public class FlowRunnerManager implements EventListener {
}
}
+ int numJobThreads = numJobThreadPerFlow;
+ if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+ try{
+ int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+ if(numJobs > 0 && numJobs <= numJobThreads) {
+ numJobThreads = numJobs;
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+ }
+ }
+
FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
.setGlobalProps(globalProps)
- .setNumJobThreads(numJobThreadPerFlow)
+ .setNumJobThreads(numJobThreads)
.addListener(this);
// Check again.
@@ -634,6 +646,12 @@ public class FlowRunnerManager implements EventListener {
public int getNumExecutingFlows() {
return runningFlows.size();
}
+
+ public String getRunningFlowIds() {
+ List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+ Collections.sort(ids);
+ return ids.toString();
+ }
public int getNumExecutingJobs() {
int jobCount = 0;
@@ -643,5 +661,7 @@ public class FlowRunnerManager implements EventListener {
return jobCount;
}
+
+
}
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index a91a6d0..687dd71 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -40,7 +40,7 @@ public class ExecutionOptions {
private Set<String> initiallyDisabledJobs = new HashSet<String>();
public void setFlowParameters(Map<String,String> flowParam) {
- flowParameters.get(flowParam);
+ flowParameters.putAll(flowParam);
}
public Map<String,String> getFlowParameters() {
src/java/azkaban/executor/ExecutorMailer.java 50(+36 -14)
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index c6c6f41..c7338cc 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -18,9 +18,22 @@ public class ExecutorMailer extends AbstractMailer {
private boolean testMode = false;
+ private int mailTimeout;
+ private int connectionTimeout;
+
public ExecutorMailer(Props props) {
- super(props);
+ this.azkabanName = props.getString("azkaban.name", "azkaban");
+ this.mailHost = props.getString("mail.host", "localhost");
+ this.mailUser = props.getString("mail.user", "");
+ this.mailPassword = props.getString("mail.password", "");
+ this.mailSender = props.getString("mail.sender", "");
+ this.mailTimeout = props.getInt("mail.timeout.millis", 10000);
+ this.connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
+
+ this.clientHostname = props.getString("jetty.hostname", "localhost");
+ this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+
testMode = props.getBoolean("test.mode", false);
}
@@ -30,10 +43,13 @@ public class ExecutorMailer extends AbstractMailer {
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = super.createEmailMessage(
- "Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(),
- "text/html",
- emailList);
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setTimeout(mailTimeout);
+ message.setConnectionTimeout(connectionTimeout);
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + getAzkabanName() + "</h2>");
@@ -83,10 +99,13 @@ public class ExecutorMailer extends AbstractMailer {
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = super.createEmailMessage(
- "Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(),
- "text/html",
- emailList);
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setTimeout(mailTimeout);
+ message.setConnectionTimeout(connectionTimeout);
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName() + "</h2>");
message.println("<table>");
@@ -129,10 +148,13 @@ public class ExecutorMailer extends AbstractMailer {
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = super.createEmailMessage(
- "Flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName(),
- "text/html",
- emailList);
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setTimeout(mailTimeout);
+ message.setConnectionTimeout(connectionTimeout);
+ message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName() + "</h2>");
message.println("<table>");
@@ -164,4 +186,4 @@ public class ExecutorMailer extends AbstractMailer {
return failedJobs;
}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..eab7ade 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -69,6 +69,7 @@ public class ExecutorManager {
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
+ private String updaterStage = "not started";
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
this.executorLoader = loader;
@@ -97,6 +98,10 @@ public class ExecutorManager {
return executingManager.getState();
}
+ public String getExecutorThreadStage() {
+ return updaterStage;
+ }
+
public boolean isThreadActive() {
return executingManager.isAlive();
}
@@ -172,6 +177,15 @@ public class ExecutorManager {
return flows;
}
+ public String getRunningFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ Collections.sort(allIds);
+ return allIds.toString();
+ }
+
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@@ -593,6 +607,8 @@ public class ExecutorManager {
try {
lastThreadCheckTime = System.currentTimeMillis();
+ updaterStage = "Starting update all flows.";
+
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
@@ -602,6 +618,10 @@ public class ExecutorManager {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
+ ConnectionInfo connection = entry.getKey();
+
+ updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+
// We pack the parameters of the same host together before we query.
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
@@ -612,7 +632,7 @@ public class ExecutorManager {
ConnectorParams.EXEC_ID_LIST_PARAM,
JSONUtils.toJSON(executionIdsList));
- ConnectionInfo connection = entry.getKey();
+
Map<String, Object> results = null;
try {
results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -620,6 +640,9 @@ public class ExecutorManager {
logger.error(e);
for (ExecutableFlow flow: entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+
+ updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+
if (pair != null) {
ExecutionReference ref = pair.getFirst();
int numErrors = ref.getNumErrors();
@@ -642,6 +665,9 @@ public class ExecutorManager {
for (Map<String,Object> updateMap: executionUpdates) {
try {
ExecutableFlow flow = updateExecution(updateMap);
+
+ updaterStage = "Updated flow " + flow.getExecutionId();
+
if (isFinished(flow)) {
finishedFlows.add(flow);
finalizeFlows.add(flow);
@@ -659,6 +685,8 @@ public class ExecutorManager {
}
}
+ updaterStage = "Evicting old recently finished flows.";
+
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
@@ -668,12 +696,16 @@ public class ExecutorManager {
recentlyFinished.put(flow.getExecutionId(), flow);
}
+ updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
+
// Kill error flows
for (ExecutableFlow flow: finalizeFlows) {
finalizeFlows(flow);
}
}
+ updaterStage = "Updated all active flows. Waiting for next round.";
+
synchronized(this) {
try {
if (runningFlows.size() > 0) {
@@ -688,7 +720,7 @@ public class ExecutorManager {
}
catch (Exception e) {
logger.error(e);
- }
+ }
}
}
}
@@ -696,6 +728,7 @@ public class ExecutorManager {
private void finalizeFlows(ExecutableFlow flow) {
int execId = flow.getExecutionId();
+ updaterStage = "finalizing flow " + execId;
// First we check if the execution in the datastore is complete
try {
ExecutableFlow dsFlow;
@@ -703,15 +736,18 @@ public class ExecutorManager {
dsFlow = flow;
}
else {
+ updaterStage = "finalizing flow " + execId + " loading from db";
dsFlow = executorLoader.fetchExecutableFlow(execId);
// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
if (!isFinished(dsFlow)) {
+ updaterStage = "finalizing flow " + execId + " failing the flow";
failEverything(dsFlow);
executorLoader.updateExecutableFlow(dsFlow);
}
}
+ updaterStage = "finalizing flow " + execId + " deleting active reference";
// Delete the executing reference.
if (flow.getEndTime() == -1) {
flow.setEndTime(System.currentTimeMillis());
@@ -719,6 +755,7 @@ public class ExecutorManager {
}
executorLoader.removeActiveExecutableReference(execId);
+ updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
@@ -728,6 +765,7 @@ public class ExecutorManager {
// TODO append to the flow log that we forced killed this flow because the target no longer had
// the reference.
+ updaterStage = "finalizing flow " + execId + " alerting and emailing";
ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
@@ -1006,4 +1044,8 @@ public class ExecutorManager {
cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
}
}
+
+
+
+
}
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public String getExecutorThreadState() {
return manager.getExecutorThreadState().toString();
}
+
+ @Override
+ public String getExecutorThreadStage() {
+ return manager.getExecutorThreadStage();
+ }
@Override
public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public List<String> getPrimaryExecutorHostPorts() {
return new ArrayList<String>(manager.getPrimaryServerHosts());
}
+
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
+
}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getNumRunningFlows")
public int getNumRunningFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getExecutorThreadState")
public String getExecutorThreadState();
+
+ @DisplayName("OPERATION: getExecutorThreadStage")
+ public String getExecutorThreadStage();
@DisplayName("OPERATION: isThreadActive")
public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
return manager.getNumExecutingJobs();
}
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
@DisplayName("OPERATION: getNumExecutingFlows")
public int getNumExecutingFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getTotalNumRunningJobs")
public int countTotalNumRunningJobs();
}
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
public Boolean isThreadActive() {
return manager.isThreadActive();
}
+
+ @Override
+ public String getScheduleThreadStage() {
+ return manager.getThreadStage();
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
@DisplayName("OPERATION: getScheduleThreadState")
String getScheduleThreadState();
+ @DisplayName("OPERATION: getScheduleThreadStage")
+ String getScheduleThreadStage();
+
@DisplayName("OPERATION: getNextScheduleTime")
Long getNextScheduleTime();
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..3e9c173 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -70,6 +70,7 @@ public class ScheduleManager {
// Used for mbeans to query Scheduler status
private long lastCheckTime = -1;
private long nextWakupTime = -1;
+ private String runnerStage = "not started";
/**
* Give the schedule manager a loader class that will properly load the
@@ -372,6 +373,8 @@ public class ScheduleManager {
synchronized (this) {
try {
lastCheckTime = System.currentTimeMillis();
+
+ runnerStage = "Starting schedule scan.";
// TODO clear up the exception handling
Schedule s = schedules.peek();
@@ -380,6 +383,7 @@ public class ScheduleManager {
// there's something to do. Most likely there will not be.
try {
logger.info("Nothing scheduled to run. Checking again soon.");
+ runnerStage = "Waiting for next round scan.";
nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
@@ -391,6 +395,8 @@ public class ScheduleManager {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
+ runnerStage = "Ready to run schedule " + runningSched.toString();
+
logger.info("Scheduler ready to run " + runningSched.toString());
// Execute the flow here
try {
@@ -406,7 +412,7 @@ public class ScheduleManager {
logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
}
-
+
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
@@ -428,6 +434,8 @@ public class ScheduleManager {
flowOptions.setSuccessEmails(flow.getSuccessEmails());
}
+ runnerStage = "Submitting flow " + exflow.getFlowId();
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -443,6 +451,7 @@ public class ScheduleManager {
SlaOptions slaOptions = runningSched.getSlaOptions();
if(slaOptions != null) {
logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+ runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
for(SlaSetting set : slaOptions.getSettings()) {
@@ -472,6 +481,7 @@ public class ScheduleManager {
logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
+ runnerStage = "Done running schedule for " + runningSched.toString();
removeRunnerSchedule(runningSched);
// Immediately reschedule if it's possible. Let
@@ -485,6 +495,7 @@ public class ScheduleManager {
removeSchedule(runningSched);
}
} else {
+ runnerStage = "Waiting for next round scan.";
// wait until flow run
long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
try {
@@ -542,4 +553,9 @@ public class ScheduleManager {
public boolean isThreadActive() {
return runner.isAlive();
}
+
+ public String getThreadStage() {
+ return runnerStage;
+ }
+
}
src/java/azkaban/utils/EmailMessage.java 18(+16 -2)
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..3bb0e04 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -32,7 +32,9 @@ public class EmailMessage {
private String _fromAddress;
private String _mimeType = "text/plain";
private StringBuffer _body = new StringBuffer();
-
+ private int _mailTimeout = 10000;
+ private int _connectionTimeout = 10000;
+
private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
public EmailMessage() {
@@ -44,7 +46,17 @@ public class EmailMessage {
_mailHost = host;
_mailPassword = password;
}
-
+
+ public EmailMessage setTimeout(int timeoutMillis) {
+ _mailTimeout = timeoutMillis;
+ return this;
+ }
+
+ public EmailMessage setConnectionTimeout(int timeoutMillis) {
+ _connectionTimeout = timeoutMillis;
+ return this;
+ }
+
public EmailMessage setMailHost(String host) {
_mailHost = host;
return this;
@@ -136,6 +148,8 @@ public class EmailMessage {
props.put("mail."+protocol+".auth", "true");
props.put("mail.user", _mailUser);
props.put("mail.password", _mailPassword);
+ props.put("mail."+protocol+".timeout", _mailTimeout);
+ props.put("mail."+protocol+".connectiontimeout", _connectionTimeout);
Session session = Session.getInstance(props, null);
Message message = new MimeMessage(session);