azkaban-uncached
Changes
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
Details
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b9aa2ec..25bc8e5 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -388,12 +388,24 @@ public class FlowRunnerManager implements EventListener {
}
}
+ int numJobThreads = numJobThreadPerFlow;
+ if(options.getFlowParameters().containsKey("flow.num.job.threads")) {
+ try{
+ int numJobs = Integer.valueOf(options.getFlowParameters().get("flow.num.job.threads"));
+ if(numJobs > 0 && numJobs <= numJobThreads) {
+ numJobThreads = numJobs;
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Failed to set the number of job threads " + options.getFlowParameters().get("flow.num.job.threads") + " for flow " + execId, e);
+ }
+ }
+
FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
.setGlobalProps(globalProps)
- .setNumJobThreads(numJobThreadPerFlow)
+ .setNumJobThreads(numJobThreads)
.addListener(this);
// Check again.
@@ -604,6 +616,10 @@ public class FlowRunnerManager implements EventListener {
public int getNumExecutingFlows() {
return runningFlows.size();
}
+
+ public String getRunningFlowIds() {
+ return runningFlows.keySet().toString();
+ }
public int getNumExecutingJobs() {
int jobCount = 0;
@@ -613,5 +629,7 @@ public class FlowRunnerManager implements EventListener {
return jobCount;
}
+
+
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5ca87eb..5105cd8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -95,6 +95,10 @@ public class ExecutorManager {
return executingManager.getState();
}
+ public String getExecutorThreadStage() {
+ return executingManager.getStage();
+ }
+
public boolean isThreadActive() {
return executingManager.isAlive();
}
@@ -170,6 +174,14 @@ public class ExecutorManager {
return flows;
}
+ public String getRunningFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ return allIds.toString();
+ }
+
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@@ -548,11 +560,16 @@ public class ExecutorManager {
private class ExecutingManagerUpdaterThread extends Thread {
private boolean shutdown = false;
+ private String stage;
public ExecutingManagerUpdaterThread() {
this.setName("ExecutorManagerUpdaterThread");
}
+ public String getStage() {
+ return stage;
+ }
+
// 10 mins recently finished threshold.
private long recentlyFinishedLifetimeMs = 600000;
private int waitTimeIdleMs = 2000;
@@ -572,6 +589,8 @@ public class ExecutorManager {
try {
lastThreadCheckTime = System.currentTimeMillis();
+ stage = "Starting update all flows.";
+
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
@@ -581,6 +600,10 @@ public class ExecutorManager {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
+ ConnectionInfo connection = entry.getKey();
+
+ stage = "Starting update flows on " + connection.getHost() + ":" + connection.getPort();
+
// We pack the parameters of the same host together before we query.
fillUpdateTimeAndExecId(entry.getValue(), executionIdsList, updateTimesList);
@@ -591,7 +614,7 @@ public class ExecutorManager {
ConnectorParams.EXEC_ID_LIST_PARAM,
JSONUtils.toJSON(executionIdsList));
- ConnectionInfo connection = entry.getKey();
+
Map<String, Object> results = null;
try {
results = callExecutorServer(connection.getHost(), connection.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
@@ -599,6 +622,9 @@ public class ExecutorManager {
logger.error(e);
for (ExecutableFlow flow: entry.getValue()) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(flow.getExecutionId());
+
+ stage = "Failed to get update. Doing some clean up for flow " + pair.getSecond().getExecutionId();
+
if (pair != null) {
ExecutionReference ref = pair.getFirst();
int numErrors = ref.getNumErrors();
@@ -621,6 +647,9 @@ public class ExecutorManager {
for (Map<String,Object> updateMap: executionUpdates) {
try {
ExecutableFlow flow = updateExecution(updateMap);
+
+ stage = "Updated flow " + flow.getExecutionId();
+
if (isFinished(flow)) {
finishedFlows.add(flow);
finalizeFlows.add(flow);
@@ -638,12 +667,16 @@ public class ExecutorManager {
}
}
+ stage = "Evicting old recently finished flows.";
+
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
recentlyFinished.put(flow.getExecutionId(), flow);
}
+ stage = "Finalizing " + finalizeFlows.size() + " error flows.";
+
// Kill error flows
for (ExecutableFlow flow: finalizeFlows) {
finalizeFlows(flow);
@@ -978,4 +1011,8 @@ public class ExecutorManager {
cleanOldExecutionLogs(DateTime.now().getMillis() - executionLogsRetentionMs);
}
}
+
+
+
+
}
src/java/azkaban/jmx/JmxExecutorManager.java 12(+12 -0)
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 123340d..37f52f8 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -21,6 +21,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public String getExecutorThreadState() {
return manager.getExecutorThreadState().toString();
}
+
+ @Override
+ public String getExecutorThreadStage() {
+ return manager.getExecutorThreadStage();
+ }
@Override
public boolean isThreadActive() {
@@ -36,4 +41,11 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public List<String> getPrimaryExecutorHostPorts() {
return new ArrayList<String>(manager.getPrimaryServerHosts());
}
+
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
+
}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
index b4a3888..b29d00a 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -6,8 +6,14 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getNumRunningFlows")
public int getNumRunningFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getExecutorThreadState")
public String getExecutorThreadState();
+
+ @DisplayName("OPERATION: getExecutorThreadStage")
+ public String getExecutorThreadStage();
@DisplayName("OPERATION: isThreadActive")
public boolean isThreadActive();
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index f4f59d3..3541140 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -54,4 +54,9 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
return manager.getNumExecutingJobs();
}
+ @Override
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 47c6a02..ed509ef 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -25,6 +25,9 @@ public interface JmxFlowRunnerManagerMBean {
@DisplayName("OPERATION: getNumExecutingFlows")
public int getNumExecutingFlows();
+ @DisplayName("OPERATION: getRunningFlows")
+ public String getRunningFlows();
+
@DisplayName("OPERATION: getTotalNumRunningJobs")
public int countTotalNumRunningJobs();
}