azkaban-uncached

Merge branch 'release-2.1' of github.com:azkaban/azkaban2

9/5/2013 7:51:39 PM

Details

README.md 4(+2 -2)

diff --git a/README.md b/README.md
index 7375cbe..d0a972f 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-# Azkaban2
+## Azkaban2
 
-For all Azkaban Plugins documentation, please go to
+For Azkaban documentation, please go to
 [Azkaban Project Site](http://azkaban.github.io/azkaban2/)
 There is a google groups: [Azkaban Group](https://groups.google.com/forum/?fromgroups#!forum/azkaban-dev)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d707c1f..09bf00a 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -389,12 +389,24 @@ public class FlowRunnerManager implements EventListener {
 			}
 		}
 
+		int numJobThreads = numJobThreadPerFlow;
+		if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+			try{
+				int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+				if(numJobs > 0 && numJobs <= numJobThreads) {
+					numJobThreads = numJobs;
+				}
+			} catch (Exception e) {
+				throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+			}
+		}
+		
 		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
 		runner.setFlowWatcher(watcher)
 			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
 			.setValidateProxyUser(validateProxyUser)
 			.setGlobalProps(globalProps)
-			.setNumJobThreads(numJobThreadPerFlow)
+			.setNumJobThreads(numJobThreads)
 			.addListener(this);
 		
 		// Check again.
@@ -634,6 +646,12 @@ public class FlowRunnerManager implements EventListener {
 	public int getNumExecutingFlows() {
 		return runningFlows.size();
 	}
+	
+	public String getRunningFlowIds() {
+		List<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+		Collections.sort(ids);
+		return ids.toString();
+	}
 
 	public int getNumExecutingJobs() {
 		int jobCount = 0;
@@ -643,5 +661,7 @@ public class FlowRunnerManager implements EventListener {
 		
 		return jobCount;
 	}
+
+	
 	
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 7e1f2fc..4f41afc 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -76,6 +76,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 	private long lastCleanerThreadCheckTime = -1;
 	
 	private long lastThreadCheckTime = -1;
+	private String updaterStage = "not started";
 	
 	private Map<String, Alerter> alerters;
 	
@@ -253,6 +254,10 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 		return executingManager.getState();
 	}
 	
+	public String getExecutorThreadStage() {
+		return updaterStage;
+	}
+	
 	@Override
 	public boolean isExecutorManagerThreadActive() {
 		return executingManager.isAlive();
@@ -336,7 +341,15 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 		return flows;
 	}
 	
-	@Override
+	public String getRunningFlowIds() {
+		List<Integer> allIds = new ArrayList<Integer>();
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			allIds.add(ref.getSecond().getExecutionId());
+		}
+		Collections.sort(allIds);
+		return allIds.toString();
+	}
+	
 	public List<ExecutableFlow> getRecentlyFinishedFlows() {
 		return new ArrayList<ExecutableFlow>(recentlyFinished.values());
 	}
@@ -779,8 +792,10 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 			while(!shutdown) {
 				try {
 					lastThreadCheckTime = System.currentTimeMillis();
-					
+
 //					loadRunningFlows();
+
+					updaterStage = "Starting update all flows.";
 					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
@@ -791,6 +806,10 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 							List<Long> updateTimesList = new ArrayList<Long>();
 							List<Integer> executionIdsList = new ArrayList<Integer>();
 						
+							ConnectionInfo connection = entry.getKey();
+							
+							updaterStage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+							
 							// We pack the parameters of the same host together before we query.
 							fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
 							
@@ -801,7 +820,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 									ConnectorParams.EXEC_ID_LIST_PARAM, 
 									JSONUtils.toJSON(executionIdsList));
 							
-							ConnectionInfo connection = entry.getKey();
+							
 							Map<String, Object> results = null;
 							try {
 								results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -809,6 +828,9 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 								logger.error(e);
 								for (ExecutableFlow flow: entry.getValue()) {
 									Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+									
+									updaterStage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+									
 									if (pair != null) {
 										ExecutionReference ref = pair.getFirst();
 										int numErrors = ref.getNumErrors();
@@ -831,6 +853,9 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 								for (Map<String,Object> updateMap: executionUpdates) {
 									try {
 										ExecutableFlow flow = updateExecution(updateMap);
+										
+										updaterStage = "Updated flow " + flow.getExecutionId();
+										
 										if (isFinished(flow)) {
 											finishedFlows.add(flow);
 											finalizeFlows.add(flow);
@@ -848,6 +873,8 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 							}
 						}
 	
+						updaterStage = "Evicting old recently finished flows.";
+						
 						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
@@ -857,12 +884,16 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
+						updaterStage = "Finalizing " + finalizeFlows.size() + " error flows.";
+						
 						// Kill error flows
 						for (ExecutableFlow flow: finalizeFlows) {
 							finalizeFlows(flow);
 						}
 					}
 					
+					updaterStage = "Updated all active flows. Waiting for next round.";
+					
 					synchronized(this) {
 						try {
 							if (runningFlows.size() > 0) {
@@ -1259,4 +1290,8 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 			cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
 		}
 	}
+
+	
+
+	
 }
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 	public String getExecutorThreadState() {
 		return manager.getExecutorThreadState().toString();
 	}
+	
+	@Override
+	public String getExecutorThreadStage() {
+		return manager.getExecutorThreadStage();
+	}
 
 	@Override
 	public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 	public List<String> getPrimaryExecutorHostPorts() {
 		return new ArrayList<String>(manager.getPrimaryServerHosts());
 	}
+
+	@Override
+	public String getRunningFlows() {
+		return manager.getRunningFlowIds();
+	}
+
+	
 }
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
 	@DisplayName("OPERATION: getNumRunningFlows")
 	public int getNumRunningFlows();
 	
+	@DisplayName("OPERATION: getRunningFlows")
+	public String getRunningFlows();
+	
 	@DisplayName("OPERATION: getExecutorThreadState")
 	public String getExecutorThreadState();
+	
+	@DisplayName("OPERATION: getExecutorThreadStage")
+	public String getExecutorThreadStage();
 
 	@DisplayName("OPERATION: isThreadActive")
 	public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
 		return manager.getNumExecutingJobs();
 	}
 
+	@Override
+	public String getRunningFlows() {
+		return manager.getRunningFlowIds();
+	}
+
 }
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
 	@DisplayName("OPERATION: getNumExecutingFlows")
 	public int getNumExecutingFlows();
 	
+	@DisplayName("OPERATION: getRunningFlows")
+	public String getRunningFlows();
+	
 	@DisplayName("OPERATION: getTotalNumRunningJobs")
 	public int countTotalNumRunningJobs();
 }
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
index 73bcf98..8efc576 100644
--- a/src/java/azkaban/jmx/JmxScheduler.java
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -28,4 +28,9 @@ public class JmxScheduler implements JmxSchedulerMBean {
 	public Boolean isThreadActive() {
 		return manager.isThreadActive();
 	}
+
+	@Override
+	public String getScheduleThreadStage() {
+		return manager.getThreadStage();
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
index 19a8b70..4de54ef 100644
--- a/src/java/azkaban/jmx/JmxSchedulerMBean.java
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -4,6 +4,9 @@ public interface JmxSchedulerMBean {
 	@DisplayName("OPERATION: getScheduleThreadState")
 	String getScheduleThreadState();
 	
+	@DisplayName("OPERATION: getScheduleThreadStage")
+	String getScheduleThreadStage();
+	
 	@DisplayName("OPERATION: getNextScheduleTime")
 	Long getNextScheduleTime();
 	
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index fcf079d..29dbdeb 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -73,7 +73,13 @@ public class ScheduleManager implements TriggerAgent {
 	private final ScheduleRunner runner;
 	
 	// Used for mbeans to query Scheduler status
-	
+//<<<<<<< HEAD
+//	
+//=======
+//	private long lastCheckTime = -1;
+//	private long nextWakupTime = -1;
+//	private String runnerStage = "not started";
+//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -483,6 +489,8 @@ public class ScheduleManager implements TriggerAgent {
 				synchronized (this) {
 					try {
 						lastCheckTime = System.currentTimeMillis();
+						
+//						runnerStage = "Starting schedule scan.";
 						// TODO clear up the exception handling
 						Schedule s = schedules.peek();
 
@@ -491,6 +499,7 @@ public class ScheduleManager implements TriggerAgent {
 							// there's something to do. Most likely there will not be.
 							try {
 								logger.info("Nothing scheduled to run. Checking again soon.");
+//								runnerStage = "Waiting for next round scan.";
 								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
@@ -502,6 +511,8 @@ public class ScheduleManager implements TriggerAgent {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
 
+//								runnerStage = "Ready to run schedule " + runningSched.toString();
+								
 								logger.info("Scheduler ready to run " + runningSched.toString());
 								// Execute the flow here
 								try {
@@ -517,7 +528,7 @@ public class ScheduleManager implements TriggerAgent {
 										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
 										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
 									}
-
+									
 									// Create ExecutableFlow
 									ExecutableFlow exflow = new ExecutableFlow(flow);
 									System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
@@ -539,6 +550,8 @@ public class ScheduleManager implements TriggerAgent {
 										flowOptions.setSuccessEmails(flow.getSuccessEmails());
 									}
 									
+//									runnerStage = "Submitting flow " + exflow.getFlowId();
+									
 									try {
 										executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
 										logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -550,6 +563,7 @@ public class ScheduleManager implements TriggerAgent {
 										e.printStackTrace();
 										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
 									}
+
 //									SlaOptions slaOptions = runningSched.getSlaOptions();
 //									if(slaOptions != null) {
 //										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
@@ -581,6 +595,7 @@ public class ScheduleManager implements TriggerAgent {
 									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 								}
 
+//								runnerStage = "Done running schedule for " + runningSched.toString();
 								removeRunnerSchedule(runningSched);
 
 								// Immediately reschedule if it's possible. Let
@@ -594,6 +609,7 @@ public class ScheduleManager implements TriggerAgent {
 									removeSchedule(runningSched);
 								}								
 							} else {
+//								runnerStage = "Waiting for next round scan.";
 								// wait until flow run
 								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
 								try {
@@ -668,8 +684,15 @@ public class ScheduleManager implements TriggerAgent {
 		}
 	}
 
+//<<<<<<< HEAD
+//	
+//
+//	
+//
+//=======
+//	public String getThreadStage() {
+//		return runnerStage;
+//	}
 	
-
-	
-
+//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
 }