azkaban-uncached

added more jmx status to track the executorManager update thread allow

7/30/2013 7:58:46 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b9aa2ec..25bc8e5 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -388,12 +388,24 @@ public class FlowRunnerManager implements EventListener {
 			}
 		}
 
+		int numJobThreads = numJobThreadPerFlow;
+		if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+			try{
+				int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+				if(numJobs > 0 && numJobs <= numJobThreads) {
+					numJobThreads = numJobs;
+				}
+			} catch (Exception e) {
+				throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+			}
+		}
+		
 		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
 		runner.setFlowWatcher(watcher)
 			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
 			.setValidateProxyUser(validateProxyUser)
 			.setGlobalProps(globalProps)
-			.setNumJobThreads(numJobThreadPerFlow)
+			.setNumJobThreads(numJobThreads)
 			.addListener(this);
 		
 		// Check again.
@@ -604,6 +616,10 @@ public class FlowRunnerManager implements EventListener {
 	public int getNumExecutingFlows() {
 		return runningFlows.size();
 	}
+	
+	public String getRunningFlowIds() {
+		return runningFlows.keySet().toString();
+	}
 
 	public int getNumExecutingJobs() {
 		int jobCount = 0;
@@ -613,5 +629,7 @@ public class FlowRunnerManager implements EventListener {
 		
 		return jobCount;
 	}
+
+	
 	
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5ca87eb..5105cd8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -95,6 +95,10 @@ public class ExecutorManager {
 		return executingManager.getState();
 	}
 	
+	public String getExecutorThreadStage() {
+		return executingManager.getStage();
+	}
+	
 	public boolean isThreadActive() {
 		return executingManager.isAlive();
 	}
@@ -170,6 +174,14 @@ public class ExecutorManager {
 		return flows;
 	}
 	
+	public String getRunningFlowIds() {
+		List<Integer> allIds = new ArrayList<Integer>();
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			allIds.add(ref.getSecond().getExecutionId());
+		}
+		return allIds.toString();
+	}
+	
 	public List<ExecutableFlow> getRecentlyFinishedFlows() {
 		return new ArrayList<ExecutableFlow>(recentlyFinished.values());
 	}
@@ -548,11 +560,16 @@ public class ExecutorManager {
 	
 	private class ExecutingManagerUpdaterThread extends Thread {
 		private boolean shutdown = false;
+		private String stage;
 
 		public ExecutingManagerUpdaterThread() {
 			this.setName("ExecutorManagerUpdaterThread");
 		}
 		
+		public String getStage() {
+			return stage;
+		}
+
 		// 10 mins recently finished threshold.
 		private long recentlyFinishedLifetimeMs = 600000;
 		private int waitTimeIdleMs = 2000;
@@ -572,6 +589,8 @@ public class ExecutorManager {
 				try {
 					lastThreadCheckTime = System.currentTimeMillis();
 					
+					stage = "Starting update all flows.";
+					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
 					ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
@@ -581,6 +600,10 @@ public class ExecutorManager {
 							List<Long> updateTimesList = new ArrayList<Long>();
 							List<Integer> executionIdsList = new ArrayList<Integer>();
 						
+							ConnectionInfo connection = entry.getKey();
+							
+							stage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+							
 							// We pack the parameters of the same host together before we query.
 							fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
 							
@@ -591,7 +614,7 @@ public class ExecutorManager {
 									ConnectorParams.EXEC_ID_LIST_PARAM, 
 									JSONUtils.toJSON(executionIdsList));
 							
-							ConnectionInfo connection = entry.getKey();
+							
 							Map<String, Object> results = null;
 							try {
 								results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -599,6 +622,9 @@ public class ExecutorManager {
 								logger.error(e);
 								for (ExecutableFlow flow: entry.getValue()) {
 									Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+									
+									stage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+									
 									if (pair != null) {
 										ExecutionReference ref = pair.getFirst();
 										int numErrors = ref.getNumErrors();
@@ -621,6 +647,9 @@ public class ExecutorManager {
 								for (Map<String,Object> updateMap: executionUpdates) {
 									try {
 										ExecutableFlow flow = updateExecution(updateMap);
+										
+										stage = "Updated flow " + flow.getExecutionId();
+										
 										if (isFinished(flow)) {
 											finishedFlows.add(flow);
 											finalizeFlows.add(flow);
@@ -638,12 +667,16 @@ public class ExecutorManager {
 							}
 						}
 	
+						stage = "Evicting old recently finished flows.";
+						
 						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
+						stage = "Finalizing " + finalizeFlows.size() + " error flows.";
+						
 						// Kill error flows
 						for (ExecutableFlow flow: finalizeFlows) {
 							finalizeFlows(flow);
@@ -978,4 +1011,8 @@ public class ExecutorManager {
 			cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
 		}
 	}
+
+	
+
+	
 }
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 	public String getExecutorThreadState() {
 		return manager.getExecutorThreadState().toString();
 	}
+	
+	@Override
+	public String getExecutorThreadStage() {
+		return manager.getExecutorThreadStage();
+	}
 
 	@Override
 	public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 	public List<String> getPrimaryExecutorHostPorts() {
 		return new ArrayList<String>(manager.getPrimaryServerHosts());
 	}
+
+	@Override
+	public String getRunningFlows() {
+		return manager.getRunningFlowIds();
+	}
+
+	
 }
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
 	@DisplayName("OPERATION: getNumRunningFlows")
 	public int getNumRunningFlows();
 	
+	@DisplayName("OPERATION: getRunningFlows")
+	public String getRunningFlows();
+	
 	@DisplayName("OPERATION: getExecutorThreadState")
 	public String getExecutorThreadState();
+	
+	@DisplayName("OPERATION: getExecutorThreadStage")
+	public String getExecutorThreadStage();
 
 	@DisplayName("OPERATION: isThreadActive")
 	public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
 		return manager.getNumExecutingJobs();
 	}
 
+	@Override
+	public String getRunningFlows() {
+		return manager.getRunningFlowIds();
+	}
+
 }
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
 	@DisplayName("OPERATION: getNumExecutingFlows")
 	public int getNumExecutingFlows();
 	
+	@DisplayName("OPERATION: getRunningFlows")
+	public String getRunningFlows();
+	
 	@DisplayName("OPERATION: getTotalNumRunningJobs")
 	public int countTotalNumRunningJobs();
 }