azkaban-uncached

Merge branch 'release-2.1' of github.com:azkaban/azkaban2

4/9/2013 3:23:47 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index f1c369b..eeaec11 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -351,33 +351,32 @@ public class ExecutorManager {
 
 			ExecutionOptions options = exflow.getExecutionOptions();
 			
+			if (options.getDisabledJobs() != null) {
+				// Disable jobs
+				for(String disabledId : options.getDisabledJobs()) {
+					ExecutableNode node = exflow.getExecutableNode(disabledId);
+					node.setStatus(Status.DISABLED);
+				}
+			}
+			
 			String message = "";
-			if (options != null) {
-				if (options.getDisabledJobs() != null) {
-					// Disable jobs
-					for(String disabledId : options.getDisabledJobs()) {
-						ExecutableNode node = exflow.getExecutableNode(disabledId);
-						node.setStatus(Status.DISABLED);
-					}
+			if (!running.isEmpty()) {
+				if (options.getConcurrentOption().equals("pipeline")) {
+					Collections.sort(running);
+					Integer runningExecId = running.get(running.size() - 1);
+					
+					options.setPipelineExecutionId(runningExecId);
+					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
 				}
-
-				if (!running.isEmpty()) {
-					if (options.getConcurrentOption().equals("pipeline")) {
-						Collections.sort(running);
-						Integer runningExecId = running.get(running.size() - 1);
-						
-						options.setPipelineExecutionId(runningExecId);
-						message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
-					}
-					else if (options.getConcurrentOption().equals("skip")) {
-						throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
-					}
-					else {
-						// The settings is to run anyways.
-						message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
-					}
+				else if (options.getConcurrentOption().equals("skip")) {
+					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+				}
+				else {
+					// The settings is to run anyways.
+					message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
 				}
 			}
+			
 			// The exflow id is set by the loader. So it's unavailable until after this call.
 			executorLoader.uploadExecutableFlow(exflow);
 			
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 78aec2f..75982d1 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -162,13 +162,18 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 		Map<String, Object> executorMBeans = new HashMap<String,Object>();
 		Set<String> primaryServerHosts = executorManager.getPrimaryServerHosts();
 		for (String hostPort: executorManager.getAllActiveExecutorServerHosts()) {
-			Map<String, Object> mbeans = executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
-
-			if (primaryServerHosts.contains(hostPort)) {
-				executorMBeans.put(hostPort, mbeans.get("mbeans"));
+			try {
+				Map<String, Object> mbeans = executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
+	
+				if (primaryServerHosts.contains(hostPort)) {
+					executorMBeans.put(hostPort, mbeans.get("mbeans"));
+				}
+				else {
+					executorMBeans.put(hostPort, mbeans.get("mbeans"));
+				}
 			}
-			else {
-				executorMBeans.put(hostPort, mbeans.get("mbeans"));
+			catch (IOException e) {
+				logger.error("Cannot contact executor " + hostPort, e);
 			}
 		}