azkaban-uncached
Changes
src/java/azkaban/jobExecutor/JavaProcessJob.java 22(+10 -12)
src/java/azkaban/jobExecutor/LongArgJob.java 194(+100 -94)
Details
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index f6a49c1..4bf08a6 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -37,7 +37,6 @@ import azkaban.utils.PropsUtils;
*
*/
public abstract class AbstractProcessJob extends AbstractJob {
-
private final Logger log;
public static final String ENV_PREFIX = "env.";
public static final String ENV_PREFIX_UCASE = "ENV.";
@@ -66,11 +65,6 @@ public abstract class AbstractProcessJob extends AbstractJob {
this.log = log;
}
- @Deprecated
- public Props getProps() {
- return jobProps;
- }
-
public Props getJobProps() {
return jobProps;
}
@@ -117,14 +111,14 @@ public abstract class AbstractProcessJob extends AbstractJob {
}
public Map<String, String> getEnvironmentVariables() {
- Props props = getProps();
+ Props props = getJobProps();
Map<String, String> envMap = props.getMapByPrefix(ENV_PREFIX);
envMap.putAll(props.getMapByPrefix(ENV_PREFIX_UCASE));
return envMap;
}
public String getWorkingDirectory() {
- String workingDir = getProps().getString(WORKING_DIR, _jobPath);
+ String workingDir = getJobProps().getString(WORKING_DIR, _jobPath);
if (workingDir == null) {
return "";
}
src/java/azkaban/jobExecutor/JavaProcessJob.java 22(+10 -12)
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
index 0009e32..5749a96 100644
--- a/src/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
import azkaban.utils.Props;
public class JavaProcessJob extends ProcessJob {
-
public static final String CLASSPATH = "classpath";
public static final String GLOBAL_CLASSPATH = "global.classpaths";
public static final String JAVA_CLASS = "java.class";
@@ -64,7 +63,7 @@ public class JavaProcessJob extends ProcessJob {
}
protected String getJavaClass() {
- return getProps().getString(JAVA_CLASS);
+ return getJobProps().getString(JAVA_CLASS);
}
protected String getClassPathParam() {
@@ -77,12 +76,12 @@ public class JavaProcessJob extends ProcessJob {
}
protected List<String> getClassPaths() {
- List<String> classPaths = getProps().getStringList(CLASSPATH, null, ",");
+ List<String> classPaths = getJobProps().getStringList(CLASSPATH, null, ",");
ArrayList<String> classpathList = new ArrayList<String>();
// Adding global properties used system wide.
- if (getProps().containsKey(GLOBAL_CLASSPATH)) {
- List<String> globalClasspath = getProps().getStringList(GLOBAL_CLASSPATH);
+ if (getJobProps().containsKey(GLOBAL_CLASSPATH)) {
+ List<String> globalClasspath = getJobProps().getStringList(GLOBAL_CLASSPATH);
for (String global: globalClasspath) {
getLog().info("Adding to global classpath:" + global);
classpathList.add(global);
@@ -110,26 +109,25 @@ public class JavaProcessJob extends ProcessJob {
}
protected String getInitialMemorySize() {
- return getProps().getString(INITIAL_MEMORY_SIZE,
- DEFAULT_INITIAL_MEMORY_SIZE);
+ return getJobProps().getString(INITIAL_MEMORY_SIZE, DEFAULT_INITIAL_MEMORY_SIZE);
}
protected String getMaxMemorySize() {
- return getProps().getString(MAX_MEMORY_SIZE, DEFAULT_MAX_MEMORY_SIZE);
+ return getJobProps().getString(MAX_MEMORY_SIZE, DEFAULT_MAX_MEMORY_SIZE);
}
protected String getMainArguments() {
- return getProps().getString(MAIN_ARGS, "");
+ return getJobProps().getString(MAIN_ARGS, "");
}
protected String getJVMArguments() {
- String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
+ String globalJVMArgs = getJobProps().getString(GLOBAL_JVM_PARAMS, null);
if (globalJVMArgs == null) {
- return getProps().getString(JVM_PARAMS, "");
+ return getJobProps().getString(JVM_PARAMS, "");
}
- return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
+ return globalJVMArgs + " " + getJobProps().getString(JVM_PARAMS, "");
}
protected String createArguments(List<String> arguments, String separator) {
src/java/azkaban/jobExecutor/LongArgJob.java 194(+100 -94)
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 4d28f08..f646866 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -23,110 +23,116 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import azkaban.utils.Props;
-import azkaban.utils.UndefinedPropertyException;
import azkaban.jobExecutor.utils.process.AzkabanProcess;
import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
-
/**
- * A job that passes all the job properties as command line arguments in "long" format,
- * e.g. --key1 value1 --key2 value2 ...
+ * A job that passes all the job properties as command line arguments in "long"
+ * format, e.g. --key1 value1 --key2 value2 ...
*
*/
public abstract class LongArgJob extends AbstractProcessJob {
-
- private static final long KILL_TIME_MS = 5000;
- private final AzkabanProcessBuilder builder;
- private volatile AzkabanProcess process;
-
- public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProps, Logger log) {
- this(jobid, command, sysProps, jobProps, log, new HashSet<String>(0));
- }
-
- public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProp, Logger log, Set<String> suppressedKeys) {
- //super(command, desc);
- super(jobid, sysProps, jobProp, log);
- //String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
-
- this.builder = new AzkabanProcessBuilder(command).
- setEnv(getProps().getMapByPrefix(ENV_PREFIX)).
- setWorkingDir(getCwd()).
- setLogger(getLog());
- appendProps(suppressedKeys);
- }
-
- public void run() throws Exception {
-
+
+ private static final long KILL_TIME_MS = 5000;
+ private final AzkabanProcessBuilder builder;
+ private volatile AzkabanProcess process;
+
+ public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProps, Logger log) {
+ this(jobid, command, sysProps, jobProps, log, new HashSet<String>(0));
+ }
+
+ public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProp, Logger log, Set<String> suppressedKeys) {
+ // super(command, desc);
+ super(jobid, sysProps, jobProp, log);
+ // String cwd = descriptor.getProps().getString(WORKING_DIR, new
+ // File(descriptor.getFullPath()).getParent());
+
+ this.builder = new AzkabanProcessBuilder(command)
+ .setEnv(getJobProps()
+ .getMapByPrefix(ENV_PREFIX))
+ .setWorkingDir(getCwd())
+ .setLogger(getLog());
+ appendProps(suppressedKeys);
+ }
+
+ public void run() throws Exception {
try {
resolveProps();
- }
- catch (Exception e) {
+ } catch (Exception e) {
error("Bad property definition! " + e.getMessage());
-
+ }
+
+ long startMs = System.currentTimeMillis();
+ info("Command: " + builder.getCommandString());
+ if (builder.getEnv().size() > 0) {
+ info("Environment variables: " + builder.getEnv());
+ }
+ info("Working directory: " + builder.getWorkingDir());
+
+ File[] propFiles = initPropsFiles();
+ // System.err.println("outputfile=" + propFiles[1]);
+
+ boolean success = false;
+ this.process = builder.build();
+ try {
+ this.process.run();
+ success = true;
+ } catch (Exception e) {
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+ throw new RuntimeException(e);
+ } finally {
+ this.process = null;
+ info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
+ + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+ }
+
+ // Get the output properties from this job.
+ generateProperties(propFiles[1]);
+
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+ }
+
+ /**
+ * This gives access to the process builder used to construct the process.
+ * An overriding class can use this to add to the command being executed.
+ */
+ protected AzkabanProcessBuilder getBuilder() {
+ return this.builder;
+ }
+
+ @Override
+ public void cancel() throws InterruptedException {
+ if (process == null) {
+ throw new IllegalStateException("Not started.");
}
- long startMs = System.currentTimeMillis();
- info("Command: " + builder.getCommandString());
- if(builder.getEnv().size() > 0)
- info("Environment variables: " + builder.getEnv());
- info("Working directory: " + builder.getWorkingDir());
-
- File [] propFiles = initPropsFiles();
- //System.err.println("outputfile=" + propFiles[1]);
-
- boolean success = false;
- this.process = builder.build();
- try {
- this.process.run();
- success = true;
- }
- catch (Exception e) {
- for (File file: propFiles) if (file != null && file.exists()) file.delete();
- throw new RuntimeException (e);
- }
- finally {
- this.process = null;
- info("Process completed " + (success? "successfully" : "unsuccessfully") + " in " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
- }
-
- // Get the output properties from this job.
- generateProperties(propFiles[1]);
-
- for (File file: propFiles)
- if (file != null && file.exists()) file.delete();
- }
-
-
-
- /**
- * This gives access to the process builder used to construct the process. An overriding class can use this to
- * add to the command being executed.
- */
- protected AzkabanProcessBuilder getBuilder() {
- return this.builder;
- }
-
- @Override
- public void cancel() throws InterruptedException {
- if(process == null)
- throw new IllegalStateException("Not started.");
- boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
- if(!killed) {
- warn("Kill with signal TERM failed. Killing with KILL signal.");
- process.hardKill();
- }
- }
-
- @Override
- public double getProgress() {
- return process != null && process.isComplete()? 1.0 : 0.0;
- }
-
- private void appendProps(Set<String> suppressed) {
- AzkabanProcessBuilder builder = this.getBuilder();
- Props props = getProps();
- for(String key: props.getKeySet())
- if(!suppressed.contains(key))
- builder.addArg("--" + key, props.get(key));
- }
+ boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+ if (!killed) {
+ warn("Kill with signal TERM failed. Killing with KILL signal.");
+ process.hardKill();
+ }
+ }
+
+ @Override
+ public double getProgress() {
+ return process != null && process.isComplete() ? 1.0 : 0.0;
+ }
+
+ private void appendProps(Set<String> suppressed) {
+ AzkabanProcessBuilder builder = this.getBuilder();
+ Props props = getJobProps();
+ for (String key : props.getKeySet()) {
+ if (!suppressed.contains(key)) {
+ builder.addArg("--" + key, props.get(key));
+ }
+ }
+ }
}
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 50233c0..b3da1dd 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -125,19 +125,10 @@ public class ProcessJob extends AbstractProcessJob {
return process.getProcessId();
}
- @Override
- public Props getProps() {
- return jobProps;
- }
-
public String getPath() {
return _jobPath == null ? "" : _jobPath;
}
- public String getJobName() {
- return getId();
- }
-
/**
* Splits the command into a unix like command line structure. Quotes and
* single quotes are treated as nested strings.