azkaban-uncached

improved jmx monitoring on flowrunner, executormanager,

7/31/2013 4:56:27 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 25bc8e5..3c8cbdb 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -618,7 +618,9 @@ public class FlowRunnerManager implements EventListener {
 	}
 	
 	public String getRunningFlowIds() {
-		return runningFlows.keySet().toString();
+		List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+		Collections.sort(ids);
+		return ids.toString();
 	}
 
 	public int getNumExecutingJobs() {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5105cd8..42b4b41 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -67,6 +67,7 @@ public class ExecutorManager {
 	private long lastCleanerThreadCheckTime = -1;
 	
 	private long lastThreadCheckTime = -1;
+	private String updaterStage = "not started";
 	
 	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
 		this.executorLoader = loader;
@@ -96,7 +97,7 @@ public class ExecutorManager {
 	}
 	
 	public String getExecutorThreadStage() {
-		return executingManager.getStage();
+		return updaterStage;
 	}
 	
 	public boolean isThreadActive() {
@@ -179,6 +180,7 @@ public class ExecutorManager {
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
 			allIds.add(ref.getSecond().getExecutionId());
 		}
+		Collections.sort(allIds);
 		return allIds.toString();
 	}
 	
@@ -560,16 +562,11 @@ public class ExecutorManager {
 	
 	private class ExecutingManagerUpdaterThread extends Thread {
 		private boolean shutdown = false;
-		private String stage;
 
 		public ExecutingManagerUpdaterThread() {
 			this.setName("ExecutorManagerUpdaterThread");
 		}
 		
-		public String getStage() {
-			return stage;
-		}
-
 		// 10 mins recently finished threshold.
 		private long recentlyFinishedLifetimeMs = 600000;
 		private int waitTimeIdleMs = 2000;
@@ -589,7 +586,7 @@ public class ExecutorManager {
 				try {
 					lastThreadCheckTime = System.currentTimeMillis();
 					
-					stage = "Starting update all flows.";
+					updaterStage = "Starting update all flows.";
 					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
@@ -602,7 +599,7 @@ public class ExecutorManager {
 						
 							ConnectionInfo connection = entry.getKey();
 							
-							stage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+							updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
 							
 							// We pack the parameters of the same host together before we query.
 							fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
@@ -623,7 +620,7 @@ public class ExecutorManager {
 								for (ExecutableFlow flow: entry.getValue()) {
 									Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
 									
-									stage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+									updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
 									
 									if (pair != null) {
 										ExecutionReference ref = pair.getFirst();
@@ -648,7 +645,7 @@ public class ExecutorManager {
 									try {
 										ExecutableFlow flow = updateExecution(updateMap);
 										
-										stage = "Updated flow " + flow.getExecutionId();
+										updaterStage = "Updated flow " + flow.getExecutionId();
 										
 										if (isFinished(flow)) {
 											finishedFlows.add(flow);
@@ -667,7 +664,7 @@ public class ExecutorManager {
 							}
 						}
 	
-						stage = "Evicting old recently finished flows.";
+						updaterStage = "Evicting old recently finished flows.";
 						
 						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
 						// Add new finished
@@ -675,7 +672,7 @@ public class ExecutorManager {
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
-						stage = "Finalizing " + finalizeFlows.size() + " error flows.";
+						updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
 						
 						// Kill error flows
 						for (ExecutableFlow flow: finalizeFlows) {
@@ -683,6 +680,8 @@ public class ExecutorManager {
 						}
 					}
 					
+					updaterStage = "Updated all active flows. Waiting for next round.";
+					
 					synchronized(this) {
 						try {
 							if (runningFlows.size() > 0) {
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
 	public Boolean isThreadActive() {
 		return manager.isThreadActive();
 	}
+
+	@Override
+	public String getScheduleThreadStage() {
+		return manager.getThreadStage();
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
 	@DisplayName("OPERATION: getScheduleThreadState")
 	String getScheduleThreadState();
 	
+	@DisplayName("OPERATION: getScheduleThreadStage")
+	String getScheduleThreadStage();
+	
 	@DisplayName("OPERATION: getNextScheduleTime")
 	Long getNextScheduleTime();
 	
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 5688f66..468a052 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -69,6 +69,7 @@ public class ScheduleManager {
 	// Used for mbeans to query Scheduler status
 	private long lastCheckTime = -1;
 	private long nextWakupTime = -1;
+	private String runnerStage = "not started";
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -333,6 +334,8 @@ public class ScheduleManager {
 				synchronized (this) {
 					try {
 						lastCheckTime = System.currentTimeMillis();
+						
+						runnerStage = "Starting schedule scan.";
 						// TODO clear up the exception handling
 						Schedule s = schedules.peek();
 
@@ -341,6 +344,7 @@ public class ScheduleManager {
 							// there's something to do. Most likely there will not be.
 							try {
 								logger.info("Nothing scheduled to run. Checking again soon.");
+								runnerStage = "Waiting for next round scan.";
 								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
@@ -352,6 +356,8 @@ public class ScheduleManager {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
 
+								runnerStage = "Ready to run schedule " + runningSched.toString();
+								
 								logger.info("Scheduler ready to run " + runningSched.toString());
 								// Execute the flow here
 								try {
@@ -367,7 +373,7 @@ public class ScheduleManager {
 										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
 										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
 									}
-
+									
 									// Create ExecutableFlow
 									ExecutableFlow exflow = new ExecutableFlow(flow);
 									exflow.setSubmitUser(runningSched.getSubmitUser());
@@ -387,6 +393,8 @@ public class ScheduleManager {
 										flowOptions.setSuccessEmails(flow.getSuccessEmails());
 									}
 									
+									runnerStage = "Submitting flow " + exflow.getFlowId();
+									
 									try {
 										executorManager.submitExecutableFlow(exflow);
 										logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -402,6 +410,7 @@ public class ScheduleManager {
 									SlaOptions slaOptions = runningSched.getSlaOptions();
 									if(slaOptions != null) {
 										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+										runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
 										// submit flow slas
 										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
 										for(SlaSetting set : slaOptions.getSettings()) {
@@ -431,6 +440,7 @@ public class ScheduleManager {
 									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 								}
 
+								runnerStage = "Done running schedule for " + runningSched.toString();
 								removeRunnerSchedule(runningSched);
 
 								// Immediately reschedule if it's possible. Let
@@ -444,6 +454,7 @@ public class ScheduleManager {
 									removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
 								}								
 							} else {
+								runnerStage = "Waiting for next round scan.";
 								// wait until flow run
 								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
 								try {
@@ -501,4 +512,9 @@ public class ScheduleManager {
 	public boolean isThreadActive() {
 		return runner.isAlive();
 	}
+
+	public String getThreadStage() {
+		return runnerStage;
+	}
+	
 }