azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5f65731..7d68d80 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -94,7 +94,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.flow = flow;
 		this.executorLoader = executorLoader;
 		this.projectLoader = projectLoader;
-		this.executorService = Executors.newFixedThreadPool(numJobThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
 
@@ -115,6 +114,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return this;
 	}
 	
+	public FlowRunner setNumJobThreads(int jobs) {
+		numJobThreads = jobs;
+		return this;
+	}
+	
 	public FlowRunner setJobLogSettings(String jobLogFileSize, int jobLogNumFiles) {
 		this.jobLogFileSize = jobLogFileSize;
 		this.jobLogNumFiles = jobLogNumFiles;
@@ -133,6 +137,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	public void run() {
 		try {
+			if (this.executorService == null) {
+				this.executorService = Executors.newFixedThreadPool(numJobThreads);
+			}
 			setupFlowExecution();
 			flow.setStartTime(System.currentTimeMillis());
 			
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index f7ce2c7..b9aa2ec 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -74,6 +74,7 @@ public class FlowRunnerManager implements EventListener {
 	private ExecutorService executorService;
 	private SubmitterThread submitterThread;
 	private CleanerThread cleanerThread;
+	private int numJobThreadPerFlow = 10;
 	
 	private ExecutorLoader executorLoader;
 	private ProjectLoader projectLoader;
@@ -118,6 +119,7 @@ public class FlowRunnerManager implements EventListener {
 		
 		//azkaban.temp.dir
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+		numJobThreadPerFlow = props.getInt("flow.num.job.threads", numJobThreadPerFlow);
 		executorService = Executors.newFixedThreadPool(numThreads);
 		
 		this.executorLoader = executorLoader;
@@ -387,11 +389,12 @@ public class FlowRunnerManager implements EventListener {
 		}
 
 		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
-		runner.setFlowWatcher(watcher);
-		runner.setJobLogSettings(jobLogChunkSize, jobLogNumFiles);
-		runner.setValidateProxyUser(validateProxyUser);
-		runner.setGlobalProps(globalProps);
-		runner.addListener(this);
+		runner.setFlowWatcher(watcher)
+			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
+			.setValidateProxyUser(validateProxyUser)
+			.setGlobalProps(globalProps)
+			.setNumJobThreads(numJobThreadPerFlow)
+			.addListener(this);
 		
 		// Check again.
 		if (runningFlows.containsKey(execId)) {