azkaban-uncached
Details
README.md 4(+2 -2)
diff --git a/README.md b/README.md
index 7375cbe..d0a972f 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-# Azkaban2
+## Azkaban2
-For all Azkaban Plugins documentation, please go to
+For Azkaban documentation, please go to
[Azkaban Project Site](http://azkaban.github.io/azkaban2/)
There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d707c1f..09bf00a 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -389,12 +389,24 @@ public class FlowRunnerManager implements EventListener {
}
}
+ int numJobThreads = numJobThreadPerFlow;
+ if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+ try{
+ int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+ if(numJobs > 0 && numJobs <= numJobThreads) {
+ numJobThreads = numJobs;
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+ }
+ }
+
FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
.setGlobalProps(globalProps)
- .setNumJobThreads(numJobThreadPerFlow)
+ .setNumJobThreads(numJobThreads)
.addListener(this);
// Check again.
@@ -634,6 +646,12 @@ public class FlowRunnerManager implements EventListener {
public int getNumExecutingFlows() {
return runningFlows.size();
}
+
+ public String getRunningFlowIds() {
+ List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+ Collections.sort(ids);
+ return ids.toString();
+ }
public int getNumExecutingJobs() {
int jobCount = 0;
@@ -643,5 +661,7 @@ public class FlowRunnerManager implements EventListener {
return jobCount;
}
+
+
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 7e1f2fc..4f41afc 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -76,6 +76,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
+ private String updaterStage = "not started";
private Map<String, Alerter> alerters;
@@ -253,6 +254,10 @@ public class ExecutorManager implements ExecutorManagerAdapter {
return executingManager.getState();
}
+ public String getExecutorThreadStage() {
+ return updaterStage;
+ }
+
@Override
public boolean isExecutorManagerThreadActive() {
return executingManager.isAlive();
@@ -336,7 +341,15 @@ public class ExecutorManager implements ExecutorManagerAdapter {
return flows;
}
- @Override
+ public String getRunningFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ Collections.sort(allIds);
+ return allIds.toString();
+ }
+
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@@ -779,8 +792,10 @@ public class ExecutorManager implements ExecutorManagerAdapter {
while(!shutdown) {
try {
lastThreadCheckTime = System.currentTimeMillis();
-
+
// loadRunningFlows();
+
+ updaterStage = "Starting update all flows.";
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
@@ -791,6 +806,10 @@ public class ExecutorManager implements ExecutorManagerAdapter {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
+ ConnectionInfo connection = entry.getKey();
+
+ updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+
// We pack the parameters of the same host together before we query.
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
@@ -801,7 +820,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
ConnectorParams.EXEC_ID_LIST_PARAM,
JSONUtils.toJSON(executionIdsList));
- ConnectionInfo connection = entry.getKey();
+
Map<String, Object> results = null;
try {
results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -809,6 +828,9 @@ public class ExecutorManager implements ExecutorManagerAdapter {
logger.error(e);
for (ExecutableFlow flow: entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+
+ updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+
if (pair != null) {
ExecutionReference ref = pair.getFirst();
int numErrors = ref.getNumErrors();
@@ -831,6 +853,9 @@ public class ExecutorManager implements ExecutorManagerAdapter {
for (Map<String,Object> updateMap: executionUpdates) {
try {
ExecutableFlow flow = updateExecution(updateMap);
+
+ updaterStage = "Updated flow " + flow.getExecutionId();
+
if (isFinished(flow)) {
finishedFlows.add(flow);
finalizeFlows.add(flow);
@@ -848,6 +873,8 @@ public class ExecutorManager implements ExecutorManagerAdapter {
}
}
+ updaterStage = "Evicting old recently finished flows.";
+
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
@@ -857,12 +884,16 @@ public class ExecutorManager implements ExecutorManagerAdapter {
recentlyFinished.put(flow.getExecutionId(), flow);
}
+ updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
+
// Kill error flows
for (ExecutableFlow flow: finalizeFlows) {
finalizeFlows(flow);
}
}
+ updaterStage = "Updated all active flows. Waiting for next round.";
+
synchronized(this) {
try {
if (runningFlows.size() > 0) {
@@ -1259,4 +1290,8 @@ public class ExecutorManager implements ExecutorManagerAdapter {
cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
}
}
+
+
+
+
}
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public String getExecutorThreadState() {
return manager.getExecutorThreadState().toString();
}
+
+ @Override
+ public String getExecutorThreadStage() {
+ return manager.getExecutorThreadStage();
+ }
@Override
public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public List<String> getPrimaryExecutorHostPorts() {
return new ArrayList<String>(manager.getPrimaryServerHosts());
}
+
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
+
}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getNumRunningFlows")
public int getNumRunningFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getExecutorThreadState")
public String getExecutorThreadState();
+
+ @DisplayName("OPERATION: getExecutorThreadStage")
+ public String getExecutorThreadStage();
@DisplayName("OPERATION: isThreadActive")
public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
return manager.getNumExecutingJobs();
}
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
@DisplayName("OPERATION: getNumExecutingFlows")
public int getNumExecutingFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getTotalNumRunningJobs")
public int countTotalNumRunningJobs();
}
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
public Boolean isThreadActive() {
return manager.isThreadActive();
}
+
+ @Override
+ public String getScheduleThreadStage() {
+ return manager.getThreadStage();
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
@DisplayName("OPERATION: getScheduleThreadState")
String getScheduleThreadState();
+ @DisplayName("OPERATION: getScheduleThreadStage")
+ String getScheduleThreadStage();
+
@DisplayName("OPERATION: getNextScheduleTime")
Long getNextScheduleTime();
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index fcf079d..29dbdeb 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -73,7 +73,13 @@ public class ScheduleManager implements TriggerAgent {
private final ScheduleRunner runner;
// Used for mbeans to query Scheduler status
-
+//<<<<<<< HEAD
+//
+//=======
+// private long lastCheckTime = -1;
+// private long nextWakupTime = -1;
+// private String runnerStage = "not started";
+//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
/**
* Give the schedule manager a loader class that will properly load the
@@ -483,6 +489,8 @@ public class ScheduleManager implements TriggerAgent {
synchronized (this) {
try {
lastCheckTime = System.currentTimeMillis();
+
+// runnerStage = "Starting schedule scan.";
// TODO clear up the exception handling
Schedule s = schedules.peek();
@@ -491,6 +499,7 @@ public class ScheduleManager implements TriggerAgent {
// there's something to do. Most likely there will not be.
try {
logger.info("Nothing scheduled to run. Checking again soon.");
+// runnerStage = "Waiting for next round scan.";
nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
@@ -502,6 +511,8 @@ public class ScheduleManager implements TriggerAgent {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
+// runnerStage = "Ready to run schedule " + runningSched.toString();
+
logger.info("Scheduler ready to run " + runningSched.toString());
// Execute the flow here
try {
@@ -517,7 +528,7 @@ public class ScheduleManager implements TriggerAgent {
logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
}
-
+
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
@@ -539,6 +550,8 @@ public class ScheduleManager implements TriggerAgent {
flowOptions.setSuccessEmails(flow.getSuccessEmails());
}
+// runnerStage = "Submitting flow " + exflow.getFlowId();
+
try {
executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -550,6 +563,7 @@ public class ScheduleManager implements TriggerAgent {
e.printStackTrace();
throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
}
+
// SlaOptions slaOptions = runningSched.getSlaOptions();
// if(slaOptions != null) {
// logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
@@ -581,6 +595,7 @@ public class ScheduleManager implements TriggerAgent {
logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
+// runnerStage = "Done running schedule for " + runningSched.toString();
removeRunnerSchedule(runningSched);
// Immediately reschedule if it's possible. Let
@@ -594,6 +609,7 @@ public class ScheduleManager implements TriggerAgent {
removeSchedule(runningSched);
}
} else {
+// runnerStage = "Waiting for next round scan.";
// wait until flow run
long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
try {
@@ -668,8 +684,15 @@ public class ScheduleManager implements TriggerAgent {
}
}
+//<<<<<<< HEAD
+//
+//
+//
+//
+//=======
+// public String getThreadStage() {
+// return runnerStage;
+// }
-
-
-
+//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
}