azkaban-uncached
Changes
src/java/azkaban/executor/ExecutorManager.java 45(+22 -23)
Details
src/java/azkaban/executor/ExecutorManager.java 45(+22 -23)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index f1c369b..eeaec11 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -351,33 +351,32 @@ public class ExecutorManager {
ExecutionOptions options = exflow.getExecutionOptions();
+ if (options.getDisabledJobs() != null) {
+ // Disable jobs
+ for(String disabledId : options.getDisabledJobs()) {
+ ExecutableNode node = exflow.getExecutableNode(disabledId);
+ node.setStatus(Status.DISABLED);
+ }
+ }
+
String message = "";
- if (options != null) {
- if (options.getDisabledJobs() != null) {
- // Disable jobs
- for(String disabledId : options.getDisabledJobs()) {
- ExecutableNode node = exflow.getExecutableNode(disabledId);
- node.setStatus(Status.DISABLED);
- }
+ if (!running.isEmpty()) {
+ if (options.getConcurrentOption().equals("pipeline")) {
+ Collections.sort(running);
+ Integer runningExecId = running.get(running.size() - 1);
+
+ options.setPipelineExecutionId(runningExecId);
+ message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
}
-
- if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals("pipeline")) {
- Collections.sort(running);
- Integer runningExecId = running.get(running.size() - 1);
-
- options.setPipelineExecutionId(runningExecId);
- message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
- }
- else if (options.getConcurrentOption().equals("skip")) {
- throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
- }
- else {
- // The settings is to run anyways.
- message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
- }
+ else if (options.getConcurrentOption().equals("skip")) {
+ throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+ }
+ else {
+ // The settings is to run anyways.
+ message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
}
}
+
// The exflow id is set by the loader. So it's unavailable until after this call.
executorLoader.uploadExecutableFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 78aec2f..75982d1 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -162,13 +162,18 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
Map<String, Object> executorMBeans = new HashMap<String,Object>();
Set<String> primaryServerHosts = executorManager.getPrimaryServerHosts();
for (String hostPort: executorManager.getAllActiveExecutorServerHosts()) {
- Map<String, Object> mbeans = executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
-
- if (primaryServerHosts.contains(hostPort)) {
- executorMBeans.put(hostPort, mbeans.get("mbeans"));
+ try {
+ Map<String, Object> mbeans = executorManager.callExecutorJMX(hostPort, JMX_GET_MBEANS, null);
+
+ if (primaryServerHosts.contains(hostPort)) {
+ executorMBeans.put(hostPort, mbeans.get("mbeans"));
+ }
+ else {
+ executorMBeans.put(hostPort, mbeans.get("mbeans"));
+ }
}
- else {
- executorMBeans.put(hostPort, mbeans.get("mbeans"));
+ catch (IOException e) {
+ logger.error("Cannot contact executor " + hostPort, e);
}
}