diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5f65731..7d68d80 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -94,7 +94,6 @@ public class FlowRunner extends EventHandler implements Runnable {
this.flow = flow;
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
- this.executorService = Executors.newFixedThreadPool(numJobThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
@@ -115,6 +114,11 @@ public class FlowRunner extends EventHandler implements Runnable {
return this;
}
+ public FlowRunner setNumJobThreads(int jobs) {
+ numJobThreads = jobs;
+ return this;
+ }
+
public FlowRunner setJobLogSettings(String jobLogFileSize, int jobLogNumFiles) {
this.jobLogFileSize = jobLogFileSize;
this.jobLogNumFiles = jobLogNumFiles;
@@ -133,6 +137,9 @@ public class FlowRunner extends EventHandler implements Runnable {
public void run() {
try {
+ if (this.executorService == null) {
+ this.executorService = Executors.newFixedThreadPool(numJobThreads);
+ }
setupFlowExecution();
flow.setStartTime(System.currentTimeMillis());
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index f7ce2c7..b9aa2ec 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -74,6 +74,7 @@ public class FlowRunnerManager implements EventListener {
private ExecutorService executorService;
private SubmitterThread submitterThread;
private CleanerThread cleanerThread;
+ private int numJobThreadPerFlow = 10;
private ExecutorLoader executorLoader;
private ProjectLoader projectLoader;
@@ -118,6 +119,7 @@ public class FlowRunnerManager implements EventListener {
//azkaban.temp.dir
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+ numJobThreadPerFlow = props.getInt("flow.num.job.threads", numJobThreadPerFlow);
executorService = Executors.newFixedThreadPool(numThreads);
this.executorLoader = executorLoader;
@@ -387,11 +389,12 @@ public class FlowRunnerManager implements EventListener {
}
FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
- runner.setFlowWatcher(watcher);
- runner.setJobLogSettings(jobLogChunkSize, jobLogNumFiles);
- runner.setValidateProxyUser(validateProxyUser);
- runner.setGlobalProps(globalProps);
- runner.addListener(this);
+ runner.setFlowWatcher(watcher)
+ .setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
+ .setValidateProxyUser(validateProxyUser)
+ .setGlobalProps(globalProps)
+ .setNumJobThreads(numJobThreadPerFlow)
+ .addListener(this);
// Check again.
if (runningFlows.containsKey(execId)) {