azkaban-uncached

Removing Sysprops from flowrunner.

1/17/2013 6:50:11 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 6712ba8..c3449d8 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -314,7 +314,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 		
 		// should have one prop with system secrets, the other user level props
-		JobRunner jobRunner = new JobRunner(node, new Props(), prop, path.getParentFile(), executorLoader, jobtypeManager);
+		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 66a8a76..c9567d9 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -43,7 +43,6 @@ public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 
 	private ExecutorLoader loader;
-	private Props sysProps;
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
@@ -62,8 +61,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private final JobTypeManager jobtypeManager;
 
-	public JobRunner(ExecutableNode node, Props sysProps, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
-		this.sysProps = sysProps;
+	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
@@ -199,9 +197,12 @@ public class JobRunner extends EventHandler implements Runnable {
 			logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
 			node.setStatus(Status.RUNNING);
 			props.put(AbstractProcessJob.JOB_FULLPATH, props.getSource());
-			props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+			// Ability to specify working directory
+			if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
+				props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+			}
 			//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
-			job = jobtypeManager.buildJobExecutor(node.getJobId(), sysProps, props, logger);
+			job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
 		}
 		
 		return true;
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 4bf08a6..106c218 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -100,8 +100,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
 		jobProps.put(ENV_PREFIX + JOB_NAME_ENV, getId());
 
 		files[1] = createOutputPropsFile(getId(), _cwd);
-		jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
-				files[1].getAbsolutePath());
+		jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE, files[1].getAbsolutePath());
 
 		return files;
 	}
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 845f3e9..0090839 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -179,7 +179,7 @@ public class JobTypeManager
 	}
 	
 	@SuppressWarnings("unchecked")
-	public void loadJob(File dir, File jobConfFile, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+	private void loadJob(File dir, File jobConfFile, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
 		Props conf = null;
 		Props sysConf = null;
 		File confFile = findFilefromDir(dir, jobtypeConfFile);
@@ -244,7 +244,7 @@ public class JobTypeManager
 		
 	}
 	
-	public Job buildJobExecutor(String jobId, Props sysProps, Props jobProps, Logger logger) throws JobTypeManagerException
+	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
 	{
 
 		Job job;
@@ -264,9 +264,16 @@ public class JobTypeManager
 						String.format("Could not construct job[%s] of type[%s].", jobProps, jobType));
 			}
 			
-			Props sysConf = jobtypeSysProps.containsKey(jobType) ? new Props(sysProps, jobtypeSysProps.get(jobType)) : sysProps;
+			Props sysConf = jobtypeSysProps.get(jobType);
+			
+			// THIS IS WRONG!!! We're just overriding values!
 			Props jobConf = jobtypeJobProps.containsKey(jobType) ? new Props(jobProps, jobtypeJobProps.get(jobType)) : jobProps;
-			sysConf = PropsUtils.resolveProps(sysConf);
+			if (sysConf != null) {
+				sysConf = PropsUtils.resolveProps(sysConf);
+			}
+			else {
+				sysConf = new Props();
+			}
 			jobConf = PropsUtils.resolveProps(jobConf);
 			
 //			logger.info("sysConf is " + sysConf);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 4009f87..74138c5 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -261,7 +261,7 @@ public class JobRunnerTest {
 		
 		Props props = createProps(time, fail);
 		
-		JobRunner runner = new JobRunner(node, props, props, workingDir, loader, jobtypeManager);
+		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
 		runner.addListener(listener);
 		return runner;
 	}