azkaban-uncached
Changes
src/java/azkaban/executor/ExecutorManager.java 23(+11 -12)
Details
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 25bc8e5..3c8cbdb 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -618,7 +618,9 @@ public class FlowRunnerManager implements EventListener {
}
public String getRunningFlowIds() {
- return runningFlows.keySet().toString();
+ List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+ Collections.sort(ids);
+ return ids.toString();
}
public int getNumExecutingJobs() {
src/java/azkaban/executor/ExecutorManager.java 23(+11 -12)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5105cd8..42b4b41 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -67,6 +67,7 @@ public class ExecutorManager {
private long lastCleanerThreadCheckTime = -1;
private long lastThreadCheckTime = -1;
+ private String updaterStage = "not started";
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
this.executorLoader = loader;
@@ -96,7 +97,7 @@ public class ExecutorManager {
}
public String getExecutorThreadStage() {
- return executingManager.getStage();
+ return updaterStage;
}
public boolean isThreadActive() {
@@ -179,6 +180,7 @@ public class ExecutorManager {
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
allIds.add(ref.getSecond().getExecutionId());
}
+ Collections.sort(allIds);
return allIds.toString();
}
@@ -560,16 +562,11 @@ public class ExecutorManager {
private class ExecutingManagerUpdaterThread extends Thread {
private boolean shutdown = false;
- private String stage;
public ExecutingManagerUpdaterThread() {
this.setName("ExecutorManagerUpdaterThread");
}
- public String getStage() {
- return stage;
- }
-
// 10 mins recently finished threshold.
private long recentlyFinishedLifetimeMs = 600000;
private int waitTimeIdleMs = 2000;
@@ -589,7 +586,7 @@ public class ExecutorManager {
try {
lastThreadCheckTime = System.currentTimeMillis();
- stage = "Starting update all flows.";
+ updaterStage = "Starting update all flows.";
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
@@ -602,7 +599,7 @@ public class ExecutorManager {
ConnectionInfo connection = entry.getKey();
- stage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+ updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
// We pack the parameters of the same host together before we query.
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
@@ -623,7 +620,7 @@ public class ExecutorManager {
for (ExecutableFlow flow: entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
- stage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+ updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
if (pair != null) {
ExecutionReference ref = pair.getFirst();
@@ -648,7 +645,7 @@ public class ExecutorManager {
try {
ExecutableFlow flow = updateExecution(updateMap);
- stage = "Updated flow " + flow.getExecutionId();
+ updaterStage = "Updated flow " + flow.getExecutionId();
if (isFinished(flow)) {
finishedFlows.add(flow);
@@ -667,7 +664,7 @@ public class ExecutorManager {
}
}
- stage = "Evicting old recently finished flows.";
+ updaterStage = "Evicting old recently finished flows.";
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
@@ -675,7 +672,7 @@ public class ExecutorManager {
recentlyFinished.put(flow.getExecutionId(), flow);
}
- stage = "Finalizing " + finalizeFlows.size() + " error flows.";
+ updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
// Kill error flows
for (ExecutableFlow flow: finalizeFlows) {
@@ -683,6 +680,8 @@ public class ExecutorManager {
}
}
+ updaterStage = "Updated all active flows. Waiting for next round.";
+
synchronized(this) {
try {
if (runningFlows.size() > 0) {
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
public Boolean isThreadActive() {
return manager.isThreadActive();
}
+
+ @Override
+ public String getScheduleThreadStage() {
+ return manager.getThreadStage();
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
@DisplayName("OPERATION: getScheduleThreadState")
String getScheduleThreadState();
+ @DisplayName("OPERATION: getScheduleThreadStage")
+ String getScheduleThreadStage();
+
@DisplayName("OPERATION: getNextScheduleTime")
Long getNextScheduleTime();
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 5688f66..468a052 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -69,6 +69,7 @@ public class ScheduleManager {
// Used for mbeans to query Scheduler status
private long lastCheckTime = -1;
private long nextWakupTime = -1;
+ private String runnerStage = "not started";
/**
* Give the schedule manager a loader class that will properly load the
@@ -333,6 +334,8 @@ public class ScheduleManager {
synchronized (this) {
try {
lastCheckTime = System.currentTimeMillis();
+
+ runnerStage = "Starting schedule scan.";
// TODO clear up the exception handling
Schedule s = schedules.peek();
@@ -341,6 +344,7 @@ public class ScheduleManager {
// there's something to do. Most likely there will not be.
try {
logger.info("Nothing scheduled to run. Checking again soon.");
+ runnerStage = "Waiting for next round scan.";
nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
@@ -352,6 +356,8 @@ public class ScheduleManager {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
+ runnerStage = "Ready to run schedule " + runningSched.toString();
+
logger.info("Scheduler ready to run " + runningSched.toString());
// Execute the flow here
try {
@@ -367,7 +373,7 @@ public class ScheduleManager {
logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
}
-
+
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(runningSched.getSubmitUser());
@@ -387,6 +393,8 @@ public class ScheduleManager {
flowOptions.setSuccessEmails(flow.getSuccessEmails());
}
+ runnerStage = "Submitting flow " + exflow.getFlowId();
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -402,6 +410,7 @@ public class ScheduleManager {
SlaOptions slaOptions = runningSched.getSlaOptions();
if(slaOptions != null) {
logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+ runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
for(SlaSetting set : slaOptions.getSettings()) {
@@ -431,6 +440,7 @@ public class ScheduleManager {
logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
+ runnerStage = "Done running schedule for " + runningSched.toString();
removeRunnerSchedule(runningSched);
// Immediately reschedule if it's possible. Let
@@ -444,6 +454,7 @@ public class ScheduleManager {
removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
}
} else {
+ runnerStage = "Waiting for next round scan.";
// wait until flow run
long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
try {
@@ -501,4 +512,9 @@ public class ScheduleManager {
public boolean isThreadActive() {
return runner.isAlive();
}
+
+ public String getThreadStage() {
+ return runnerStage;
+ }
+
}