azkaban-uncached
Changes
src/java/azkaban/execapp/JobRunner.java 11(+6 -5)
src/java/azkaban/jobtype/JobTypeManager.java 15(+11 -4)
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 6712ba8..c3449d8 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -314,7 +314,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
// should have one prop with system secrets, the other user level props
- JobRunner jobRunner = new JobRunner(node, new Props(), prop, path.getParentFile(), executorLoader, jobtypeManager);
+ JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
jobRunner.addListener(listener);
return jobRunner;
src/java/azkaban/execapp/JobRunner.java 11(+6 -5)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 66a8a76..c9567d9 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -43,7 +43,6 @@ public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
private ExecutorLoader loader;
- private Props sysProps;
private Props props;
private Props outputProps;
private ExecutableNode node;
@@ -62,8 +61,7 @@ public class JobRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
- public JobRunner(ExecutableNode node, Props sysProps, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
- this.sysProps = sysProps;
+ public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
this.props = props;
this.node = node;
this.workingDir = workingDir;
@@ -199,9 +197,12 @@ public class JobRunner extends EventHandler implements Runnable {
logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
props.put(AbstractProcessJob.JOB_FULLPATH, props.getSource());
- props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+ // Ability to specify working directory
+ if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
+ props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+ }
//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
- job = jobtypeManager.buildJobExecutor(node.getJobId(), sysProps, props, logger);
+ job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
}
return true;
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 4bf08a6..106c218 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -100,8 +100,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
jobProps.put(ENV_PREFIX + JOB_NAME_ENV, getId());
files[1] = createOutputPropsFile(getId(), _cwd);
- jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
- files[1].getAbsolutePath());
+ jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE, files[1].getAbsolutePath());
return files;
}
src/java/azkaban/jobtype/JobTypeManager.java 15(+11 -4)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 845f3e9..0090839 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -179,7 +179,7 @@ public class JobTypeManager
}
@SuppressWarnings("unchecked")
- public void loadJob(File dir, File jobConfFile, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+ private void loadJob(File dir, File jobConfFile, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
Props conf = null;
Props sysConf = null;
File confFile = findFilefromDir(dir, jobtypeConfFile);
@@ -244,7 +244,7 @@ public class JobTypeManager
}
- public Job buildJobExecutor(String jobId, Props sysProps, Props jobProps, Logger logger) throws JobTypeManagerException
+ public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
{
Job job;
@@ -264,9 +264,16 @@ public class JobTypeManager
String.format("Could not construct job[%s] of type[%s].", jobProps, jobType));
}
- Props sysConf = jobtypeSysProps.containsKey(jobType) ? new Props(sysProps, jobtypeSysProps.get(jobType)) : sysProps;
+ Props sysConf = jobtypeSysProps.get(jobType);
+
+ // THIS IS WRONG!!! We're just overriding values!
Props jobConf = jobtypeJobProps.containsKey(jobType) ? new Props(jobProps, jobtypeJobProps.get(jobType)) : jobProps;
- sysConf = PropsUtils.resolveProps(sysConf);
+ if (sysConf != null) {
+ sysConf = PropsUtils.resolveProps(sysConf);
+ }
+ else {
+ sysConf = new Props();
+ }
jobConf = PropsUtils.resolveProps(jobConf);
// logger.info("sysConf is " + sysConf);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 4009f87..74138c5 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -261,7 +261,7 @@ public class JobRunnerTest {
Props props = createProps(time, fail);
- JobRunner runner = new JobRunner(node, props, props, workingDir, loader, jobtypeManager);
+ JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
runner.addListener(listener);
return runner;
}