azkaban-memoizeit
Changes
src/java/azkaban/executor/JobRunner.java 101(+47 -54)
src/java/azkaban/jobExecutor/JavaJobRunnerMain.java 495(+262 -233)
src/java/azkaban/jobExecutor/JavaProcessJob.java 28(+14 -14)
src/java/azkaban/jobExecutor/Job.java 78(+40 -38)
src/java/azkaban/jobExecutor/NoopJob.java 73(+34 -39)
src/java/azkaban/jobExecutor/PigProcessJob.java 34(+17 -17)
src/java/azkaban/jobExecutor/ProcessJob.java 605(+299 -306)
src/java/azkaban/jobExecutor/PythonJob.java 24(+9 -15)
src/java/azkaban/jobExecutor/RubyJob.java 21(+9 -12)
src/java/azkaban/jobExecutor/ScriptJob.java 29(+14 -15)
src/java/azkaban/jobExecutor/SecurePigWrapper.java 106(+55 -51)
unit/executions/exectest1/exec1.flow 154(+154 -0)
unit/executions/exectest1/job1.job 4(+4 -0)
unit/executions/exectest1/job10.job 5(+5 -0)
unit/executions/exectest1/job2.job 5(+5 -0)
unit/executions/exectest1/job3.job 5(+5 -0)
unit/executions/exectest1/job4.job 5(+5 -0)
unit/executions/exectest1/job5.job 5(+5 -0)
unit/executions/exectest1/job6.job 5(+5 -0)
unit/executions/exectest1/job7.job 5(+5 -0)
unit/executions/exectest1/job8.job 5(+5 -0)
unit/executions/exectest1/job9.job 5(+5 -0)
unit/java/azkaban/test/executor/JobRunnerTest.java 249(+249 -0)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 5fb5355..7d0efba 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -22,7 +22,7 @@ public class ExecutableFlow {
private long lastCheckedTime;
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
- private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
+ private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
private ArrayList<String> startNodes;
private ArrayList<String> endNodes;
@@ -452,8 +452,7 @@ public class ExecutableFlow {
this.flow = flow;
}
- private ExecutableNode() {
-
+ public ExecutableNode() {
}
public String getId() {
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 9fc657a..d479d83 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -31,6 +31,7 @@ import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
import azkaban.executor.event.EventListener;
import azkaban.flow.FlowProps;
+import azkaban.jobExecutor.utils.JobWrappingFactory;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
@@ -61,7 +62,6 @@ public class FlowRunner extends EventHandler implements Runnable {
private Thread currentThread;
- private Set<String> emailAddress;
private List<String> jobsFinished;
public enum FailedFlowOptions {
@@ -69,14 +69,13 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
-
+
public FlowRunner(ExecutableFlow flow) {
this.flow = flow;
this.basePath = new File(flow.getExecutionPath());
this.executorService = Executors.newFixedThreadPool(numThreads);
this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
- this.emailAddress = new HashSet<String>();
this.jobsFinished = new ArrayList<String>();
createLogger();
@@ -85,10 +84,6 @@ public class FlowRunner extends EventHandler implements Runnable {
public ExecutableFlow getFlow() {
return flow;
}
-
- public Set<String> getEmails() {
- return emailAddress;
- }
public List<String> getJobsFinished() {
return jobsFinished;
@@ -459,7 +454,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (event.getType() == Type.JOB_SUCCEEDED) {
logger.info("Job Succeeded " + jobID + " in "
+ (node.getEndTime() - node.getStartTime()) + " ms");
- emailAddress.addAll(runner.getNotifyEmails());
+
jobsFinished.add(jobID);
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
@@ -468,7 +463,7 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Job Failed " + jobID + " in "
+ (node.getEndTime() - node.getStartTime()) + " ms");
- emailAddress.addAll(runner.getNotifyEmails());
+
jobsFinished.add(jobID);
logger.info(jobID + " FAILED");
flowRunner.handleFailedJob(runner.getNode());
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 37048f0..2e41f11 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -4,22 +4,14 @@ import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import javax.servlet.ServletRequest;
-
import org.apache.log4j.Logger;
-import org.joda.time.Duration;
-import org.joda.time.format.PeriodFormat;
import azkaban.utils.Utils;
import azkaban.executor.ExecutableFlow.Status;
@@ -48,8 +40,6 @@ public class FlowRunnerManager {
private FlowRunnerEventListener eventListener;
private Mailman mailer;
- //private String defaultFailureEmail;
- //private String defaultSuccessEmail;
private String senderAddress;
private String clientHostname;
private String clientPortNumber;
@@ -58,8 +48,7 @@ public class FlowRunnerManager {
public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
this.mailer = mailer;
-// this.defaultFailureEmail = props.getString("job.failure.email");
-// this.defaultSuccessEmail = props.getString("job.success.email");
+
this.senderAddress = props.getString("mail.sender");
this.clientHostname = props.getString("jetty.hostname", "localhost");
this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
@@ -200,7 +189,7 @@ public class FlowRunnerManager {
*/
private void sendErrorEmail(FlowRunner runner) {
ExecutableFlow flow = runner.getFlow();
- List<String> emailList = new ArrayList<String>(runner.getEmails());
+ List<String> emailList = flow.getFailureEmails();
if(emailList != null && !emailList.isEmpty() && mailer != null) {
@@ -231,7 +220,8 @@ public class FlowRunnerManager {
private void sendSuccessEmail(FlowRunner runner) {
ExecutableFlow flow = runner.getFlow();
- List<String> emailList = new ArrayList<String>(runner.getEmails());
+
+ List<String> emailList = flow.getSuccessEmails();
if(emailList != null && !emailList.isEmpty() && mailer != null) {
try {
src/java/azkaban/executor/JobRunner.java 101(+47 -54)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 9c22d94..71e4e87 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -3,7 +3,6 @@ package azkaban.executor;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.log4j.Appender;
@@ -12,7 +11,6 @@ import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-
import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
@@ -26,19 +24,19 @@ import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-
- private static final String EMAILLIST = "notify.emails";
-
+
private Props props;
private Props outputProps;
private ExecutableNode node;
private File workingDir;
-
+
private Logger logger = null;
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender jobAppender;
+ private File logFile;
private Job job;
+ private String executionId = null;
private static final Object logCreatorLock = new Object();
@@ -46,23 +44,35 @@ public class JobRunner extends EventHandler implements Runnable {
this.props = props;
this.node = node;
this.workingDir = workingDir;
+ this.executionId = node.getFlow().getExecutionId();
}
+ public JobRunner(String executionId, ExecutableNode node, Props props, File workingDir) {
+ this.props = props;
+ this.node = node;
+ this.workingDir = workingDir;
+ this.executionId = executionId;
+ }
+
public ExecutableNode getNode() {
return node;
}
-
+
+ public String getLogFilePath() {
+ return logFile == null ? null : logFile.getPath();
+ }
+
private void createLogger() {
// Create logger
- synchronized(logCreatorLock) {
- String loggerName = System.currentTimeMillis() + "." + node.getFlow().getExecutionId() + "." + node.getId();
+ synchronized (logCreatorLock) {
+ String loggerName = System.currentTimeMillis() + "." + executionId + "." + node.getId();
logger = Logger.getLogger(loggerName);
-
+
// Create file appender
- String logName = "_job." + node.getFlow().getExecutionId() + "." + node.getId() + ".log";
- File logFile = new File(workingDir, logName);
+ String logName = "_job." + executionId + "." + node.getId() + ".log";
+ logFile = new File(workingDir, logName);
String absolutePath = logFile.getAbsolutePath();
-
+
jobAppender = null;
try {
jobAppender = new FileAppender(loggerLayout, absolutePath, false);
@@ -79,52 +89,41 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender.close();
}
}
-
+
@Override
public void run() {
+ node.setStartTime(System.currentTimeMillis());
if (node.getStatus() == Status.DISABLED) {
node.setStatus(Status.SKIPPED);
+ node.setEndTime(System.currentTimeMillis());
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
return;
} else if (node.getStatus() == Status.KILLED) {
+ node.setEndTime(System.currentTimeMillis());
this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
return;
}
-
+
createLogger();
this.node.setStatus(Status.WAITING);
- node.setStartTime(System.currentTimeMillis());
+
logInfo("Starting job " + node.getId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
-
+
boolean succeeded = true;
-// synchronized(this) {
-// try {
-// wait(5000);
-// }
-// catch (InterruptedException e) {
-// logger.info("Job cancelled.");
-// succeeded = false;
-// }
-// }
+ props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+ job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getId(), props, logger);
- // Run Job
- //boolean succeeded = true;
+ try {
+ job.run();
+ } catch (Throwable e) {
+ succeeded = false;
+ logError("Job run failed!");
+ e.printStackTrace();
+ }
- props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
- JobWrappingFactory factory = JobWrappingFactory.getJobWrappingFactory();
- job = factory.buildJobExecutor(props, logger);
-
- try {
- job.run();
- } catch (Exception e) {
- succeeded = false;
- logError("Job run failed!");
- e.printStackTrace();
- }
-
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
@@ -142,21 +141,21 @@ public class JobRunner extends EventHandler implements Runnable {
public synchronized void cancel() {
// Cancel code here
- if(job == null) {
- logError("Job doesn't exisit!");
- return;
+ if (job == null) {
+ logError("Job doesn't exist!");
+ return;
}
try {
- job.cancel();
+ job.cancel();
} catch (Exception e) {
logError("Failed trying to cancel job!");
- e.printStackTrace();
+ e.printStackTrace();
}
// will just interrupt, I guess, until the code is finished.
this.notifyAll();
-
+
node.setStatus(Status.KILLED);
}
@@ -168,22 +167,16 @@ public class JobRunner extends EventHandler implements Runnable {
return outputProps;
}
- public List<String> getNotifyEmails() {
- List<String> emails = new ArrayList<String>();
- emails = this.props.getStringList(EMAILLIST);
- return emails;
- }
-
private void logError(String message) {
if (logger != null) {
logger.error(message);
}
}
-
+
private void logInfo(String message) {
if (logger != null) {
logger.info(message);
}
}
-
+
}
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 26ff388..5f03d7f 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -59,8 +59,8 @@ public abstract class AbstractProcessJob extends AbstractJob {
private volatile Props generatedPropeties;
- protected AbstractProcessJob(final Props props, final Logger log) {
- super(props.getString(JOB_ID, "unkownjob"), log);
+ protected AbstractProcessJob(String jobid, final Props props, final Logger log) {
+ super(jobid, log);
_props = props;
_jobPath = props.getString(JOB_FULLPATH, new File(".").getAbsolutePath());
diff --git a/src/java/azkaban/jobExecutor/JavaJob.java b/src/java/azkaban/jobExecutor/JavaJob.java
index c951de1..6533875 100644
--- a/src/java/azkaban/jobExecutor/JavaJob.java
+++ b/src/java/azkaban/jobExecutor/JavaJob.java
@@ -42,8 +42,8 @@ public class JavaJob extends JavaProcessJob {
private Object _javaObject = null;
private String props;
- public JavaJob(Props props, Logger log) {
- super(props, log);
+ public JavaJob(String jobid, Props props, Logger log) {
+ super(jobid, props, log);
}
@Override
src/java/azkaban/jobExecutor/JavaJobRunnerMain.java 495(+262 -233)
diff --git a/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
index eaff543..0ee1126 100644
--- a/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
+++ b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
@@ -34,246 +34,275 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
public class JavaJobRunnerMain {
- public static final String JOB_CLASS = "job.class";
- public static final String DEFAULT_RUN_METHOD = "run";
- public static final String DEFAULT_CANCEL_METHOD = "cancel";
-
- // This is the Job interface method to get the properties generated by the job.
- public static final String GET_GENERATED_PROPERTIES_METHOD = "getJobGeneratedProperties";
-
- public static final String CANCEL_METHOD_PARAM = "method.cancel";
- public static final String RUN_METHOD_PARAM = "method.run";
- public static final String PROPS_CLASS = "azkaban.utils.Props";
-
- private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");
-
- public final Logger _logger;
-
- public String _cancelMethod;
- public String _jobName;
- public Object _javaObject;
- private boolean _isFinished = false;
-
- public static void main(String[] args) throws Exception {
- @SuppressWarnings("unused")
- JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
- }
-
- public JavaJobRunnerMain() throws Exception {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- cancelJob();
- }
- }
- );
-
- try {
- _jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
- String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
-
- _logger = Logger.getRootLogger();
- _logger.removeAllAppenders();
- ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
- appender.activateOptions();
- _logger.addAppender(appender);
-
- Properties prop = new Properties();
- prop.load(new BufferedReader(new FileReader(propsFile)));
-
- _logger.info("Running job " + _jobName);
- String className = prop.getProperty(JOB_CLASS);
- if(className == null) {
- throw new Exception("Class name is not set.");
- }
- _logger.info("Class name " + className);
-
- // Create the object using proxy
- if (SecurityUtils.shouldProxy(prop)) {
- _javaObject = getObjectAsProxyUser(prop, _logger, _jobName, className);
- }
- else {
- _javaObject = getObject(_jobName, className, prop);
- }
- if(_javaObject == null) {
- _logger.info("Could not create java object to run job: " + className);
- throw new Exception("Could not create running object");
- }
-
- _cancelMethod = prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);
-
- final String runMethod = prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
- _logger.info("Invoking method " + runMethod);
-
- if(SecurityUtils.shouldProxy(prop)) {
- _logger.info("Proxying enabled.");
- runMethodAsProxyUser(prop, _javaObject, runMethod);
- } else {
- _logger.info("Proxy check failed, not proxying run.");
- runMethod(_javaObject, runMethod);
- }
- _isFinished = true;
-
- // Get the generated properties and store them to disk, to be read by ProcessJob.
- try {
- final Method generatedPropertiesMethod = _javaObject.getClass().getMethod(GET_GENERATED_PROPERTIES_METHOD, new Class<?>[]{});
- Props outputGendProps = (Props) generatedPropertiesMethod.invoke(_javaObject, new Object[] {});
- outputGeneratedProperties(outputGendProps);
- } catch (NoSuchMethodException e) {
- _logger.info(
- String.format(
- "Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
- GET_GENERATED_PROPERTIES_METHOD,
- _javaObject
- )
- );
- outputGeneratedProperties(new Props());
- }
- } catch (Exception e) {
- _isFinished = true;
- throw e;
- }
- }
-
- private void runMethodAsProxyUser(Properties prop, final Object obj, final String runMethod) throws IOException, InterruptedException {
- SecurityUtils.getProxiedUser(prop, _logger, new Configuration()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- runMethod(obj, runMethod);
- return null;
- }
- });
- }
-
-
-
- private void runMethod(Object obj, String runMethod) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
- obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
- }
-
- private void outputGeneratedProperties(Props outputProperties)
- {
- _logger.info("Outputting generated properties to " + ProcessJob.JOB_OUTPUT_PROP_FILE);
- if (outputProperties == null) {
- _logger.info(" no gend props");
- return;
- }
- for (String key : outputProperties.getKeySet()) {
- _logger.info(" gend prop " + key + " value:" + outputProperties.get(key));
- }
- String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
- if (outputFileStr == null) {
- return;
- }
-
- Map<String, String> properties = new LinkedHashMap<String, String>();
- for (String key : outputProperties.getKeySet()) {
- properties.put(key, outputProperties.get(key));
- }
-
- OutputStream writer = null;
- try {
- writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));
- // Manually serialize into JSON instead of adding org.json to external classpath
- writer.write("{\n".getBytes());
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- writer.write(String.format(
- " '%s':'%s',\n",
- entry.getKey().replace("'", "\\'"),
- entry.getValue().replace("'", "\\'")
- ).getBytes());
- }
- writer.write("}".getBytes());
- }
- catch (Exception e) {
- new RuntimeException("Unable to store output properties to: " + outputFileStr);
- }
- finally {
- try {
+ public static final String JOB_CLASS = "job.class";
+ public static final String DEFAULT_RUN_METHOD = "run";
+ public static final String DEFAULT_CANCEL_METHOD = "cancel";
+
+ // This is the Job interface method to get the properties generated by the
+ // job.
+ public static final String GET_GENERATED_PROPERTIES_METHOD = "getJobGeneratedProperties";
+
+ public static final String CANCEL_METHOD_PARAM = "method.cancel";
+ public static final String RUN_METHOD_PARAM = "method.run";
+ public static final String[] PROPS_CLASSES = new String[] { "azkaban.utils.Props", "azkaban.common.utils.Props" };
+
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");
+
+ public final Logger _logger;
+
+ public String _cancelMethod;
+ public String _jobName;
+ public Object _javaObject;
+ private boolean _isFinished = false;
+
+ public static void main(String[] args) throws Exception {
+ @SuppressWarnings("unused")
+ JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
+ }
+
+ public JavaJobRunnerMain() throws Exception {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ cancelJob();
+ }
+ });
+
+ try {
+ _jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
+ String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
+
+ _logger = Logger.getRootLogger();
+ _logger.removeAllAppenders();
+ ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
+ appender.activateOptions();
+ _logger.addAppender(appender);
+
+ Properties prop = new Properties();
+ prop.load(new BufferedReader(new FileReader(propsFile)));
+
+ _logger.info("Running job " + _jobName);
+ String className = prop.getProperty(JOB_CLASS);
+ if (className == null) {
+ throw new Exception("Class name is not set.");
+ }
+ _logger.info("Class name " + className);
+
+ // Create the object using proxy
+ if (SecurityUtils.shouldProxy(prop)) {
+ _javaObject = getObjectAsProxyUser(prop, _logger, _jobName, className);
+ } else {
+ _javaObject = getObject(_jobName, className, prop, _logger);
+ }
+ if (_javaObject == null) {
+ _logger.info("Could not create java object to run job: " + className);
+ throw new Exception("Could not create running object");
+ }
+
+ _cancelMethod = prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);
+
+ final String runMethod = prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
+ _logger.info("Invoking method " + runMethod);
+
+ if (SecurityUtils.shouldProxy(prop)) {
+ _logger.info("Proxying enabled.");
+ runMethodAsProxyUser(prop, _javaObject, runMethod);
+ } else {
+ _logger.info("Proxy check failed, not proxying run.");
+ runMethod(_javaObject, runMethod);
+ }
+ _isFinished = true;
+
+ // Get the generated properties and store them to disk, to be read
+ // by ProcessJob.
+ try {
+ final Method generatedPropertiesMethod = _javaObject.getClass().getMethod(
+ GET_GENERATED_PROPERTIES_METHOD, new Class<?>[] {});
+ Props outputGendProps = (Props) generatedPropertiesMethod.invoke(_javaObject, new Object[] {});
+ outputGeneratedProperties(outputGendProps);
+ } catch (NoSuchMethodException e) {
+ _logger.info(String.format(
+ "Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
+ GET_GENERATED_PROPERTIES_METHOD, _javaObject));
+ outputGeneratedProperties(new Props());
+ }
+ } catch (Exception e) {
+ _isFinished = true;
+ throw e;
+ }
+ }
+
+ private void runMethodAsProxyUser(Properties prop, final Object obj, final String runMethod) throws IOException,
+ InterruptedException {
+ SecurityUtils.getProxiedUser(prop, _logger, new Configuration()).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ runMethod(obj, runMethod);
+ return null;
+ }
+ });
+ }
+
+ private void runMethod(Object obj, String runMethod) throws IllegalAccessException, InvocationTargetException,
+ NoSuchMethodException {
+ obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
+ }
+
+ private void outputGeneratedProperties(Props outputProperties) {
+ _logger.info("Outputting generated properties to " + ProcessJob.JOB_OUTPUT_PROP_FILE);
+
+ if (outputProperties == null) {
+ _logger.info(" no gend props");
+ return;
+ }
+ for (String key : outputProperties.getKeySet()) {
+ _logger.info(" gend prop " + key + " value:" + outputProperties.get(key));
+ }
+
+ String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
+ if (outputFileStr == null) {
+ return;
+ }
+
+ Map<String, String> properties = new LinkedHashMap<String, String>();
+ for (String key : outputProperties.getKeySet()) {
+ properties.put(key, outputProperties.get(key));
+ }
+
+ OutputStream writer = null;
+ try {
+ writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));
+ // Manually serialize into JSON instead of adding org.json to
+ // external classpath
+ writer.write("{\n".getBytes());
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ writer.write(String.format(" '%s':'%s',\n", entry.getKey().replace("'", "\\'"),
+ entry.getValue().replace("'", "\\'")).getBytes());
+ }
+ writer.write("}".getBytes());
+ } catch (Exception e) {
+ new RuntimeException("Unable to store output properties to: " + outputFileStr);
+ } finally {
+ try {
writer.close();
- } catch (IOException e) {
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ public void cancelJob() {
+ if (_isFinished) {
+ return;
+ }
+ _logger.info("Attempting to call cancel on this job");
+ if (_javaObject != null) {
+ Method method = null;
+
+ try {
+ method = _javaObject.getClass().getMethod(_cancelMethod);
+ } catch (SecurityException e) {
+ } catch (NoSuchMethodException e) {
+ }
+
+ if (method != null)
+ try {
+ method.invoke(_javaObject);
+ } catch (Exception e) {
+ if (_logger != null) {
+ _logger.error("Cancel method failed! ", e);
+ }
+ }
+ else {
+ throw new RuntimeException("Job " + _jobName + " does not have cancel method " + _cancelMethod);
+ }
+ }
+ }
+
+ private static Object getObjectAsProxyUser(final Properties prop, final Logger logger, final String jobName,
+ final String className) throws Exception {
+
+ Object obj = SecurityUtils.getProxiedUser(prop, logger, new Configuration()).doAs(
+ new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ return getObject(jobName, className, prop, logger);
+ }
+ });
+
+ return obj;
+ }
+
+ private static Object getObject(String jobName, String className, Properties properties, Logger logger)
+ throws Exception {
+
+ Class<?> runningClass = JavaJobRunnerMain.class.getClassLoader().loadClass(className);
+
+ if (runningClass == null) {
+ throw new Exception("Class " + className + " was not found. Cannot run job.");
+ }
+
+ Class<?> propsClass = null;
+ for (String propClassName : PROPS_CLASSES) {
+ propsClass = JavaJobRunnerMain.class.getClassLoader().loadClass(propClassName);
+ if (propsClass != null) {
+ break;
+ }
+ }
+
+ Object obj = null;
+ if (propsClass != null && getConstructor(runningClass, String.class, propsClass) != null) {
+ // Create props class
+ Constructor<?> propsCon = getConstructor(propsClass, propsClass, Properties[].class);
+ Object props = propsCon.newInstance(null, new Properties[] { properties });
+
+ Constructor<?> con = getConstructor(runningClass, String.class, propsClass);
+ logger.info("Constructor found " + con.toGenericString());
+ obj = con.newInstance(jobName, props);
+ } else if (getConstructor(runningClass, String.class, Properties.class) != null) {
+
+ Constructor<?> con = getConstructor(runningClass, String.class, Properties.class);
+ logger.info("Constructor found " + con.toGenericString());
+ obj = con.newInstance(jobName, properties);
+ } else if (getConstructor(runningClass, String.class, Map.class) != null) {
+ Constructor<?> con = getConstructor(runningClass, String.class, Map.class);
+ logger.info("Constructor found " + con.toGenericString());
+
+ @SuppressWarnings("rawtypes")
+ HashMap map = new HashMap();
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ obj = con.newInstance(jobName, map);
+ } else if (getConstructor(runningClass, String.class) != null) {
+ Constructor<?> con = getConstructor(runningClass, String.class);
+ logger.info("Constructor found " + con.toGenericString());
+ obj = con.newInstance(jobName);
+ } else if (getConstructor(runningClass) != null) {
+ Constructor<?> con = getConstructor(runningClass);
+ logger.info("Constructor found " + con.toGenericString());
+ obj = con.newInstance();
+ } else {
+ logger.error("Constructor not found. Listing available Constructors.");
+ for (Constructor<?> c : runningClass.getConstructors()) {
+ logger.info(c.toGenericString());
}
- }
- }
-
- public void cancelJob() {
- if (_isFinished) {
- return;
- }
- _logger.info("Attempting to call cancel on this job");
- if (_javaObject != null) {
- Method method = null;
-
- try {
- method = _javaObject.getClass().getMethod(_cancelMethod);
- } catch(SecurityException e) {
- } catch(NoSuchMethodException e) {
- }
-
- if (method != null)
- try {
- method.invoke(_javaObject);
- } catch(Exception e) {
- if (_logger != null) {
- _logger.error("Cancel method failed! ", e);
- }
- }
- else {
- throw new RuntimeException("Job " + _jobName + " does not have cancel method " + _cancelMethod);
- }
- }
- }
-
- private static Object getObjectAsProxyUser(final Properties prop, final Logger logger, final String jobName, final String className) throws Exception{
- Object obj = SecurityUtils.getProxiedUser(prop, logger, new Configuration()).doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- return getObject(jobName, className, prop);
- }
- });
-
- return obj;
- }
-
- private static Object getObject(String jobName, String className, Properties properties)
- throws Exception {
- Class<?> runningClass = JavaJobRunnerMain.class.getClassLoader().loadClass(className);
-
- if(runningClass == null) {
- throw new Exception("Class " + className + " was not found. Cannot run job.");
- }
-
- Class<?> propsClass = JavaJobRunnerMain.class.getClassLoader().loadClass(PROPS_CLASS);
-
- Object obj = null;
- if(propsClass != null && getConstructor(runningClass, String.class, propsClass) != null) {
- // This case covers the use of azkaban.common.utils.Props with the
- Constructor<?> con = getConstructor(propsClass, propsClass, Properties[].class);
- Object props = con.newInstance(null, new Properties[] { properties });
- obj = getConstructor(runningClass, String.class, propsClass).newInstance(jobName, props);
- } else if(getConstructor(runningClass, String.class, Properties.class) != null) {
- obj = getConstructor(runningClass, String.class, Properties.class).newInstance(jobName,
- properties);
- } else if(getConstructor(runningClass, String.class) != null) {
- obj = getConstructor(runningClass, String.class).newInstance(jobName);
- } else if(getConstructor(runningClass) != null) {
- obj = getConstructor(runningClass).newInstance();
- }
- return obj;
- }
-
- private static Constructor<?> getConstructor(Class<?> c, Class<?>... args) {
- try {
- Constructor<?> cons = c.getConstructor(args);
- return cons;
- } catch(NoSuchMethodException e) {
- return null;
- }
- }
+ }
+ return obj;
+ }
+
+ private static Constructor<?> getConstructor(Class<?> c, Class<?>... args) {
+ try {
+ Constructor<?> cons = c.getConstructor(args);
+ return cons;
+ } catch (NoSuchMethodException e) {
+ return null;
+ }
+ }
}
src/java/azkaban/jobExecutor/JavaProcessJob.java 28(+14 -14)
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
index bd41935..db9b52c 100644
--- a/src/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -36,15 +36,15 @@ public class JavaProcessJob extends ProcessJob {
public static final String MAX_MEMORY_SIZE = "Xmx";
public static final String MAIN_ARGS = "main.args";
public static final String JVM_PARAMS = "jvm.args";
- public static final String GLOBAL_JVM_PARAMS = "global.jvm.args";
-
+ public static final String GLOBAL_JVM_PARAMS = "global.jvm.args";
+
public static final String DEFAULT_INITIAL_MEMORY_SIZE = "64M";
public static final String DEFAULT_MAX_MEMORY_SIZE = "256M";
public static String JAVA_COMMAND = "java";
- public JavaProcessJob(Props prop, Logger logger) {
- super(prop, logger);
+ public JavaProcessJob(String jobid, Props prop, Logger logger) {
+ super(jobid, prop, logger);
}
@Override
@@ -98,8 +98,8 @@ public class JavaProcessJob extends ProcessJob {
for (File file : parent.listFiles()) {
if (file.getName().endsWith(".jar")) {
- //log.info("Adding to classpath:" + file.getName());
- classpathList.add(file.getName());
+ // log.info("Adding to classpath:" + file.getName());
+ classpathList.add(file.getName());
}
}
}
@@ -123,15 +123,15 @@ public class JavaProcessJob extends ProcessJob {
return getProps().getString(MAIN_ARGS, "");
}
- protected String getJVMArguments() {
- String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
-
- if (globalJVMArgs == null) {
- return getProps().getString(JVM_PARAMS, "");
- }
+ protected String getJVMArguments() {
+ String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
- return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
- }
+ if (globalJVMArgs == null) {
+ return getProps().getString(JVM_PARAMS, "");
+ }
+
+ return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
+ }
protected String createArguments(List<String> arguments, String separator) {
if (arguments != null && arguments.size() > 0) {
src/java/azkaban/jobExecutor/Job.java 78(+40 -38)
diff --git a/src/java/azkaban/jobExecutor/Job.java b/src/java/azkaban/jobExecutor/Job.java
index a7c59e5..3649095 100644
--- a/src/java/azkaban/jobExecutor/Job.java
+++ b/src/java/azkaban/jobExecutor/Job.java
@@ -18,8 +18,6 @@ package azkaban.jobExecutor;
import azkaban.utils.Props;
-
-
/**
* This interface defines a Raw Job interface. Each job defines
* <ul>
@@ -33,43 +31,47 @@ import azkaban.utils.Props;
public interface Job {
- /**
- * Returns a unique(should be checked in xml) string name/id for the Job.
- *
- * @return
- */
- public String getId();
+ /**
+ * Returns a unique(should be checked in xml) string name/id for the Job.
+ *
+ * @return
+ */
+ public String getId();
+
+ /**
+ * Run the job. In general this method can only be run once. Must either
+ * succeed or throw an exception.
+ */
+ public void run() throws Exception;
+
+ /**
+ * Best effort attempt to cancel the job.
+ *
+ * @throws Exception
+ * If cancel fails
+ */
+ public void cancel() throws Exception;
- /**
- * Run the job. In general this method can only be run once. Must either
- * succeed or throw an exception.
- */
- public void run() throws Exception;
+ /**
+ * Returns a progress report between [0 - 1.0] to indicate the percentage
+ * complete
+ *
+ * @throws Exception
+ * If getting progress fails
+ */
+ public double getProgress() throws Exception;
- /**
- * Best effort attempt to cancel the job.
- *
- * @throws Exception If cancel fails
- */
- public void cancel() throws Exception;
+ /**
+ * Get the generated properties from this job.
+ *
+ * @return
+ */
+ public Props getJobGeneratedProperties();
- /**
- * Returns a progress report between [0 - 1.0] to indicate the percentage
- * complete
- *
- * @throws Exception If getting progress fails
- */
- public double getProgress() throws Exception;
-
- /**
- * Get the generated properties from this job.
- * @return
- */
- public Props getJobGeneratedProperties();
-
- /**
- * Determine if the job was cancelled.
- * @return
- */
- public boolean isCanceled();
+ /**
+ * Determine if the job was cancelled.
+ *
+ * @return
+ */
+ public boolean isCanceled();
}
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 3bee6d5..7306a4e 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -40,13 +40,13 @@ public abstract class LongArgJob extends AbstractProcessJob {
private final AzkabanProcessBuilder builder;
private volatile AzkabanProcess process;
- public LongArgJob(String[] command, Props prop, Logger log) {
- this(command, prop, log, new HashSet<String>(0));
+ public LongArgJob(String jobid, String[] command, Props prop, Logger log) {
+ this(jobid, command, prop, log, new HashSet<String>(0));
}
- public LongArgJob(String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
+ public LongArgJob(String jobid, String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
//super(command, desc);
- super(prop, log);
+ super(jobid, prop, log);
//String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
this.builder = new AzkabanProcessBuilder(command).
src/java/azkaban/jobExecutor/NoopJob.java 73(+34 -39)
diff --git a/src/java/azkaban/jobExecutor/NoopJob.java b/src/java/azkaban/jobExecutor/NoopJob.java
index e364b2f..2b7b09b 100644
--- a/src/java/azkaban/jobExecutor/NoopJob.java
+++ b/src/java/azkaban/jobExecutor/NoopJob.java
@@ -22,43 +22,38 @@ import azkaban.utils.Props;
/**
*
*/
-public class NoopJob implements Job
-{
- public NoopJob(Props props, Logger log)
- {
-
- }
-
- @Override
- public String getId()
- {
- return "Azkaban!! -- " + getClass().getName();
- }
-
- @Override
- public void run() throws Exception
- {
- }
-
- @Override
- public void cancel() throws Exception
- {
- }
-
- @Override
- public double getProgress() throws Exception
- {
- return 0;
- }
-
- @Override
- public Props getJobGeneratedProperties()
- {
- return new Props();
- }
-
- @Override
- public boolean isCanceled() {
- return false;
- }
+public class NoopJob implements Job {
+ private String jobId;
+
+ public NoopJob(String jobid, Props props, Logger log) {
+ this.jobId = jobid;
+ }
+
+ @Override
+ public String getId() {
+ return this.jobId;
+ }
+
+ @Override
+ public void run() throws Exception {
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ }
+
+ @Override
+ public double getProgress() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Props getJobGeneratedProperties() {
+ return new Props();
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return false;
+ }
}
src/java/azkaban/jobExecutor/PigProcessJob.java 34(+17 -17)
diff --git a/src/java/azkaban/jobExecutor/PigProcessJob.java b/src/java/azkaban/jobExecutor/PigProcessJob.java
index 3a5a15d..b0d8ae8 100644
--- a/src/java/azkaban/jobExecutor/PigProcessJob.java
+++ b/src/java/azkaban/jobExecutor/PigProcessJob.java
@@ -46,8 +46,8 @@ public class PigProcessJob extends JavaProcessJob {
public static final String PIG_JAVA_CLASS = "org.apache.pig.Main";
public static final String SECURE_PIG_WRAPPER = "azkaban.jobExecutor.SecurePigWrapper";
- public PigProcessJob(Props props, Logger log) {
- super(props, log);
+ public PigProcessJob(String jobid, Props props, Logger log) {
+ super(jobid, props, log);
}
@Override
@@ -160,20 +160,20 @@ public class PigProcessJob extends JavaProcessJob {
private static String getSourcePathFromClass(Class containedClass) {
- File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
-
- if (!file.isDirectory() && file.getName().endsWith(".class")) {
- String name = containedClass.getName();
- StringTokenizer tokenizer = new StringTokenizer(name, ".");
- while(tokenizer.hasMoreTokens()) {
- tokenizer.nextElement();
- file = file.getParentFile();
- }
-
- return file.getPath();
- }
- else {
- return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
- }
+ File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ String name = containedClass.getName();
+ StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while (tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+ file = file.getParentFile();
+ }
+
+ return file.getPath();
+ } else {
+ return containedClass.getProtectionDomain().getCodeSource()
+ .getLocation().getPath();
+ }
}
}
src/java/azkaban/jobExecutor/ProcessJob.java 605(+299 -306)
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 06050c8..42a1925 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -37,311 +37,304 @@ import azkaban.utils.Props;
*/
public class ProcessJob extends AbstractProcessJob implements Job {
- public static final String COMMAND = "command";
- public static final int CLEAN_UP_TIME_MS = 1000;
-
- private volatile Process _process;
- private volatile boolean _isComplete;
- private volatile boolean _isCancelled;
-
- public ProcessJob(final Props props, final Logger log) {
- super(props, log);
- }
-
- @Override
- public void run() {
- synchronized (this) {
- _isCancelled = false;
- }
- resolveProps();
-
- // Sets a list of all the commands that need to be run.
- List<String> commands = getCommandList();
- info(commands.size() + " commands to execute.");
-
- File[] propFiles = initPropsFiles();
-
- // System.err.println("in process job outputFile=" +propFiles[1]);
-
- // For each of the jobs, set up a process and run them.
- for (String command : commands) {
- info("Executing command: " + command);
- String[] cmdPieces = partitionCommandLine(command);
-
- ProcessBuilder builder = new ProcessBuilder(cmdPieces);
-
- builder.directory(new File(getCwd()));
- builder.environment().putAll(getEnvironmentVariables());
-
- try {
- _process = builder.start();
- } catch (IOException e) {
- for (File file : propFiles) {
- if (file != null && file.exists()) {
- file.delete();
- }
- }
- throw new RuntimeException(e);
- }
- LoggingGobbler outputGobbler = new LoggingGobbler(
- new InputStreamReader(_process.getInputStream()),
- Level.INFO);
- LoggingGobbler errorGobbler = new LoggingGobbler(
- new InputStreamReader(_process.getErrorStream()),
- Level.ERROR);
-
- int processId = getProcessId();
- if (processId == 0) {
- info("Spawned thread. Unknowned processId");
- } else {
- info("Spawned thread with processId " + processId);
- }
- outputGobbler.start();
- errorGobbler.start();
- int exitCode = -999;
- try {
- exitCode = _process.waitFor();
-
- _isComplete = true;
- if (exitCode != 0) {
- for (File file : propFiles) {
- if (file != null && file.exists()) {
- file.delete();
- }
- }
- throw new RuntimeException(
- "Processes ended with exit code " + exitCode + ".");
- }
-
- // try to wait for everything to get logged out before exiting
- outputGobbler.join(1000);
- errorGobbler.join(1000);
- } catch (InterruptedException e) {
- } finally {
- outputGobbler.close();
- errorGobbler.close();
- }
- }
-
- // Get the output properties from this job.
- generateProperties(propFiles[1]);
-
- for (File file : propFiles) {
- if (file != null && file.exists()) {
- file.delete();
- }
- }
-
- }
-
- protected List<String> getCommandList() {
- List<String> commands = new ArrayList<String>();
- commands.add(_props.getString(COMMAND));
- for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
- commands.add(_props.getString(COMMAND + "." + i));
- }
-
- return commands;
- }
-
- @Override
- public void cancel() throws Exception {
- if (_process != null) {
- int processId = getProcessId();
- if (processId != 0) {
- warn("Attempting to kill the process " + processId);
- try {
- Runtime.getRuntime().exec("kill " + processId);
- synchronized (this) {
- wait(CLEAN_UP_TIME_MS);
- }
- } catch (InterruptedException e) {
- // Do nothing. We don't really care.
- }
- if (!_isComplete) {
- error("After "
- + CLEAN_UP_TIME_MS
- + " ms, the job hasn't terminated. Will force terminate the job.");
- }
- } else {
- info("Cound not get process id");
- }
-
- if (!_isComplete) {
- warn("Force kill the process");
- _process.destroy();
- }
- synchronized (this) {
- _isCancelled = true;
- }
- }
- }
-
- public int getProcessId() {
- int processId = 0;
-
- try {
- Field f = _process.getClass().getDeclaredField("pid");
- f.setAccessible(true);
-
- processId = f.getInt(_process);
- } catch (Throwable e) {
- }
-
- return processId;
- }
-
- @Override
- public double getProgress() {
- return _isComplete ? 1.0 : 0.0;
- }
-
- private class LoggingGobbler extends Thread {
-
- private final BufferedReader _inputReader;
- private final Level _loggingLevel;
-
- public LoggingGobbler(final InputStreamReader inputReader,
- final Level level) {
- _inputReader = new BufferedReader(inputReader);
- _loggingLevel = level;
- }
-
- public void close() {
- if (_inputReader != null) {
- try {
- _inputReader.close();
- } catch (IOException e) {
- error("Error cleaning up logging stream reader:", e);
- }
- }
- }
-
- @Override
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- String line = _inputReader.readLine();
- if (line == null) {
- return;
- }
-
- logMessage(line);
- }
- } catch (IOException e) {
- error("Error reading from logging stream:", e);
- }
- }
-
- private void logMessage(final String message) {
- if (message.startsWith(Level.DEBUG.toString())) {
- String newMsg = message.substring(Level.DEBUG.toString()
- .length());
- getLog().debug(newMsg);
- } else if (message.startsWith(Level.ERROR.toString())) {
- String newMsg = message.substring(Level.ERROR.toString()
- .length());
- getLog().error(newMsg);
- } else if (message.startsWith(Level.INFO.toString())) {
- String newMsg = message.substring(Level.INFO.toString()
- .length());
- getLog().info(newMsg);
- } else if (message.startsWith(Level.WARN.toString())) {
- String newMsg = message.substring(Level.WARN.toString()
- .length());
- getLog().warn(newMsg);
- } else if (message.startsWith(Level.FATAL.toString())) {
- String newMsg = message.substring(Level.FATAL.toString()
- .length());
- getLog().fatal(newMsg);
- } else if (message.startsWith(Level.TRACE.toString())) {
- String newMsg = message.substring(Level.TRACE.toString()
- .length());
- getLog().trace(newMsg);
- } else {
- getLog().log(_loggingLevel, message);
- }
-
- }
- }
-
- @Override
- public Props getProps() {
- return _props;
- }
-
- public String getPath() {
- return _jobPath;
- }
-
- public String getJobName() {
- return getId();
- }
-
-
- /**
- * Splits the command into a unix like command line structure. Quotes and
- * single quotes are treated as nested strings.
- *
- * @param command
- * @return
- */
- public static String[] partitionCommandLine(final String command) {
- ArrayList<String> commands = new ArrayList<String>();
-
- int index = 0;
-
- StringBuffer buffer = new StringBuffer(command.length());
-
- boolean isApos = false;
- boolean isQuote = false;
- while (index < command.length()) {
- char c = command.charAt(index);
-
- switch (c) {
- case ' ':
- if (!isQuote && !isApos) {
- String arg = buffer.toString();
- buffer = new StringBuffer(command.length() - index);
- if (arg.length() > 0) {
- commands.add(arg);
- }
- } else {
- buffer.append(c);
- }
- break;
- case '\'':
- if (!isQuote) {
- isApos = !isApos;
- } else {
- buffer.append(c);
- }
- break;
- case '"':
- if (!isApos) {
- isQuote = !isQuote;
- } else {
- buffer.append(c);
- }
- break;
- default:
- buffer.append(c);
- }
-
- index++;
- }
-
- if (buffer.length() > 0) {
- String arg = buffer.toString();
- commands.add(arg);
- }
-
- return commands.toArray(new String[commands.size()]);
- }
-
- @Override
- public synchronized boolean isCanceled() {
- return _isCancelled;
- }
+ public static final String COMMAND = "command";
+ public static final int CLEAN_UP_TIME_MS = 1000;
+
+ private volatile Process _process;
+ private volatile boolean _isComplete;
+ private volatile boolean _isCancelled;
+
+ public ProcessJob(final String jobId, final Props props, final Logger log) {
+ super(jobId, props, log);
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ _isCancelled = false;
+ }
+ resolveProps();
+
+ // Sets a list of all the commands that need to be run.
+ List<String> commands = getCommandList();
+ info(commands.size() + " commands to execute.");
+
+ File[] propFiles = initPropsFiles();
+
+ // System.err.println("in process job outputFile=" +propFiles[1]);
+
+ // For each of the jobs, set up a process and run them.
+ for (String command : commands) {
+ info("Executing command: " + command);
+ String[] cmdPieces = partitionCommandLine(command);
+
+ ProcessBuilder builder = new ProcessBuilder(cmdPieces);
+
+ builder.directory(new File(getCwd()));
+ builder.environment().putAll(getEnvironmentVariables());
+
+ try {
+ _process = builder.start();
+ } catch (IOException e) {
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+ throw new RuntimeException(e);
+ }
+ LoggingGobbler outputGobbler = new LoggingGobbler(
+ new InputStreamReader(_process.getInputStream()),
+ Level.INFO);
+ LoggingGobbler errorGobbler = new LoggingGobbler(
+ new InputStreamReader(_process.getErrorStream()),
+ Level.ERROR);
+
+ int processId = getProcessId();
+ if (processId == 0) {
+ info("Spawned thread. Unknowned processId");
+ } else {
+ info("Spawned thread with processId " + processId);
+ }
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -999;
+ try {
+ exitCode = _process.waitFor();
+
+ _isComplete = true;
+ if (exitCode != 0) {
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+ throw new RuntimeException(
+ "Processes ended with exit code " + exitCode + ".");
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.join(1000);
+ errorGobbler.join(1000);
+ } catch (InterruptedException e) {
+ } finally {
+ outputGobbler.close();
+ errorGobbler.close();
+ }
+ }
+
+ // Get the output properties from this job.
+ generateProperties(propFiles[1]);
+
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+
+ }
+
+ protected List<String> getCommandList() {
+ List<String> commands = new ArrayList<String>();
+ commands.add(_props.getString(COMMAND));
+ for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
+ commands.add(_props.getString(COMMAND + "." + i));
+ }
+
+ return commands;
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ if (_process != null) {
+ int processId = getProcessId();
+ if (processId != 0) {
+ warn("Attempting to kill the process " + processId);
+ try {
+ Runtime.getRuntime().exec("kill " + processId);
+ synchronized (this) {
+ wait(CLEAN_UP_TIME_MS);
+ }
+ } catch (InterruptedException e) {
+ // Do nothing. We don't really care.
+ }
+ if (!_isComplete) {
+ error("After "
+ + CLEAN_UP_TIME_MS
+ + " ms, the job hasn't terminated. Will force terminate the job.");
+ }
+ } else {
+ info("Could not get process id");
+ }
+
+ if (!_isComplete) {
+ warn("Force kill the process");
+ _process.destroy();
+ }
+ synchronized (this) {
+ _isCancelled = true;
+ }
+ }
+ }
+
+ public int getProcessId() {
+ int processId = 0;
+
+ try {
+ Field f = _process.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+
+ processId = f.getInt(_process);
+ } catch (Throwable e) {
+ }
+
+ return processId;
+ }
+
+ @Override
+ public double getProgress() {
+ return _isComplete ? 1.0 : 0.0;
+ }
+
+ private class LoggingGobbler extends Thread {
+
+ private final BufferedReader _inputReader;
+ private final Level _loggingLevel;
+
+ public LoggingGobbler(final InputStreamReader inputReader,
+ final Level level) {
+ _inputReader = new BufferedReader(inputReader);
+ _loggingLevel = level;
+ }
+
+ public void close() {
+ if (_inputReader != null) {
+ try {
+ _inputReader.close();
+ } catch (IOException e) {
+ error("Error cleaning up logging stream reader:", e);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ String line = _inputReader.readLine();
+ if (line == null) {
+ return;
+ }
+
+ logMessage(line);
+ }
+ } catch (IOException e) {
+ error("Error reading from logging stream:", e);
+ }
+ }
+
+ private void logMessage(final String message) {
+ if (message.startsWith(Level.DEBUG.toString())) {
+ String newMsg = message.substring(Level.DEBUG.toString().length());
+ getLog().debug(newMsg);
+ } else if (message.startsWith(Level.ERROR.toString())) {
+ String newMsg = message.substring(Level.ERROR.toString().length());
+ getLog().error(newMsg);
+ } else if (message.startsWith(Level.INFO.toString())) {
+ String newMsg = message.substring(Level.INFO.toString().length());
+ getLog().info(newMsg);
+ } else if (message.startsWith(Level.WARN.toString())) {
+ String newMsg = message.substring(Level.WARN.toString().length());
+ getLog().warn(newMsg);
+ } else if (message.startsWith(Level.FATAL.toString())) {
+ String newMsg = message.substring(Level.FATAL.toString().length());
+ getLog().fatal(newMsg);
+ } else if (message.startsWith(Level.TRACE.toString())) {
+ String newMsg = message.substring(Level.TRACE.toString().length());
+ getLog().trace(newMsg);
+ } else {
+ getLog().log(_loggingLevel, message);
+ }
+
+ }
+ }
+
+ @Override
+ public Props getProps() {
+ return _props;
+ }
+
+ public String getPath() {
+ return _jobPath;
+ }
+
+ public String getJobName() {
+ return getId();
+ }
+
+ /**
+ * Splits the command into a unix like command line structure. Quotes and
+ * single quotes are treated as nested strings.
+ *
+ * @param command
+ * @return
+ */
+ public static String[] partitionCommandLine(final String command) {
+ ArrayList<String> commands = new ArrayList<String>();
+
+ int index = 0;
+
+ StringBuffer buffer = new StringBuffer(command.length());
+
+ boolean isApos = false;
+ boolean isQuote = false;
+ while (index < command.length()) {
+ char c = command.charAt(index);
+
+ switch (c) {
+ case ' ':
+ if (!isQuote && !isApos) {
+ String arg = buffer.toString();
+ buffer = new StringBuffer(command.length() - index);
+ if (arg.length() > 0) {
+ commands.add(arg);
+ }
+ } else {
+ buffer.append(c);
+ }
+ break;
+ case '\'':
+ if (!isQuote) {
+ isApos = !isApos;
+ } else {
+ buffer.append(c);
+ }
+ break;
+ case '"':
+ if (!isApos) {
+ isQuote = !isQuote;
+ } else {
+ buffer.append(c);
+ }
+ break;
+ default:
+ buffer.append(c);
+ }
+
+ index++;
+ }
+
+ if (buffer.length() > 0) {
+ String arg = buffer.toString();
+ commands.add(arg);
+ }
+
+ return commands.toArray(new String[commands.size()]);
+ }
+
+ @Override
+ public synchronized boolean isCanceled() {
+ return _isCancelled;
+ }
}
src/java/azkaban/jobExecutor/PythonJob.java 24(+9 -15)
diff --git a/src/java/azkaban/jobExecutor/PythonJob.java b/src/java/azkaban/jobExecutor/PythonJob.java
index ad83c38..2cb44c3 100644
--- a/src/java/azkaban/jobExecutor/PythonJob.java
+++ b/src/java/azkaban/jobExecutor/PythonJob.java
@@ -21,23 +21,17 @@ import com.google.common.collect.ImmutableSet;
import azkaban.utils.Props;
-
-
public class PythonJob extends LongArgJob {
-
- private static final String PYTHON_BINARY_KEY = "python";
- private static final String SCRIPT_KEY = "script";
-
-
- public PythonJob(Props props, Logger log) {
- super(new String[]{props.getString(PYTHON_BINARY_KEY, "python"),
- props.getString(SCRIPT_KEY)},
- props,
- log,
- ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
- }
+ private static final String PYTHON_BINARY_KEY = "python";
+ private static final String SCRIPT_KEY = "script";
+ public PythonJob(String jobid, Props props, Logger log) {
+ super(jobid,
+ new String[] { props.getString(PYTHON_BINARY_KEY, "python"),props.getString(SCRIPT_KEY) },
+ props,
+ log,
+ ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+ }
-
}
src/java/azkaban/jobExecutor/RubyJob.java 21(+9 -12)
diff --git a/src/java/azkaban/jobExecutor/RubyJob.java b/src/java/azkaban/jobExecutor/RubyJob.java
index 7f7febc..a65c272 100644
--- a/src/java/azkaban/jobExecutor/RubyJob.java
+++ b/src/java/azkaban/jobExecutor/RubyJob.java
@@ -21,20 +21,17 @@ import azkaban.utils.Props;
import com.google.common.collect.ImmutableSet;
-
public class RubyJob extends LongArgJob {
- private static final String RUBY_BINARY_KEY = "ruby";
- private static final String SCRIPT_KEY = "script";
+ private static final String RUBY_BINARY_KEY = "ruby";
+ private static final String SCRIPT_KEY = "script";
- public RubyJob(Props props, Logger log) {
- super(new String[]{props.getString(RUBY_BINARY_KEY, "ruby"),
- props.getString(SCRIPT_KEY)},
- props,
- log,
- ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
- }
+ public RubyJob(String jobid, Props props, Logger log) {
+ super(jobid,
+ new String[] { props.getString(RUBY_BINARY_KEY, "ruby"), props.getString(SCRIPT_KEY) },
+ props,
+ log,
+ ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+ }
-
-
}
src/java/azkaban/jobExecutor/ScriptJob.java 29(+14 -15)
diff --git a/src/java/azkaban/jobExecutor/ScriptJob.java b/src/java/azkaban/jobExecutor/ScriptJob.java
index 1879afe..1a8f16b 100644
--- a/src/java/azkaban/jobExecutor/ScriptJob.java
+++ b/src/java/azkaban/jobExecutor/ScriptJob.java
@@ -22,25 +22,24 @@ import com.google.common.collect.ImmutableSet;
import azkaban.utils.Props;
/**
- * A script job issues a command of the form
- * [EXECUTABLE] [SCRIPT] --key1 val1 ... --key2 val2
- * executable -- the interpretor command to execute
- * script -- the script to pass in (requried)
+ * A script job issues a command of the form [EXECUTABLE] [SCRIPT] --key1 val1
+ * ... --key2 val2 executable -- the interpretor command to execute script --
+ * the script to pass in (requried)
*
* @author jkreps
- *
+ *
*/
public class ScriptJob extends LongArgJob {
- private static final String DEFAULT_EXECUTABLE_KEY = "executable";
- private static final String SCRIPT_KEY = "script";
-
- public ScriptJob(Props props, Logger log) {
- super(new String[] {props.getString(DEFAULT_EXECUTABLE_KEY), props.getString(SCRIPT_KEY)},
- props,
- log,
- ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
- }
-
+ private static final String DEFAULT_EXECUTABLE_KEY = "executable";
+ private static final String SCRIPT_KEY = "script";
+
+ public ScriptJob(String jobid, Props props, Logger log) {
+ super(jobid,
+ new String[] { props.getString(DEFAULT_EXECUTABLE_KEY),props.getString(SCRIPT_KEY) },
+ props,
+ log,
+ ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
+ }
}
src/java/azkaban/jobExecutor/SecurePigWrapper.java 106(+55 -51)
diff --git a/src/java/azkaban/jobExecutor/SecurePigWrapper.java b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
index c210b40..0103ddb 100644
--- a/src/java/azkaban/jobExecutor/SecurePigWrapper.java
+++ b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
@@ -35,62 +35,66 @@ import static azkaban.jobExecutor.utils.SecurityUtils.getProxiedUser;
public class SecurePigWrapper {
- public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
- public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
+ public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+ public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
- public static void main(final String[] args) throws IOException, InterruptedException {
- final Logger logger = Logger.getRootLogger();
- final Properties p = System.getProperties();
- final Configuration conf = new Configuration();
+ public static void main(final String[] args) throws IOException, InterruptedException {
+ final Logger logger = Logger.getRootLogger();
+ final Properties p = System.getProperties();
+ final Configuration conf = new Configuration();
- getProxiedUser(p, logger, conf).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- prefetchToken();
- org.apache.pig.Main.main(args);
- return null;
- }
+ getProxiedUser(p, logger, conf).doAs(
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ prefetchToken();
+ org.apache.pig.Main.main(args);
+ return null;
+ }
- // For Pig jobs that need to do extra communication with the JobTracker,
- // it's necessary to pre-fetch a token and include it in the credentials
- // cache
- private void prefetchToken() throws InterruptedException, IOException {
- String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
- if(shouldPrefetch != null && shouldPrefetch.equals("true") ) {
- logger.info("Pre-fetching token");
- Job job = new Job(conf, "totally phony, extremely fake, not real job");
+ // For Pig jobs that need to do extra communication with the
+ // JobTracker,
+ // it's necessary to pre-fetch a token and include it in the
+ // credentials
+ // cache
+ private void prefetchToken() throws InterruptedException,
+ IOException {
+ String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
+ if (shouldPrefetch != null && shouldPrefetch.equals("true")) {
+ logger.info("Pre-fetching token");
+ Job job = new Job(conf,"totally phony, extremely fake, not real job");
- JobConf jc = new JobConf(conf);
- JobClient jobClient = new JobClient(jc);
- logger.info("Pre-fetching: Got new JobClient: " + jc);
- Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
- job.getCredentials().addToken(new Text("howdy"), mrdt);
+ JobConf jc = new JobConf(conf);
+ JobClient jobClient = new JobClient(jc);
+ logger.info("Pre-fetching: Got new JobClient: " + jc);
+ Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
+ job.getCredentials().addToken(new Text("howdy"), mrdt);
- File temp = File.createTempFile("mr-azkaban", ".token");
- temp.deleteOnExit();
+ File temp = File.createTempFile("mr-azkaban", ".token");
+ temp.deleteOnExit();
- FileOutputStream fos = null;
- DataOutputStream dos = null;
- try {
- fos = new FileOutputStream(temp);
- dos = new DataOutputStream(fos);
- job.getCredentials().writeTokenStorageToStream(dos);
- } finally {
- if(dos != null) {
- dos.close();
- }
- if(fos != null) {
- fos.close();
- }
- }
- logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
- System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
- } else {
- logger.info("Not pre-fetching token");
- }
- }
- });
+ FileOutputStream fos = null;
+ DataOutputStream dos = null;
+ try {
+ fos = new FileOutputStream(temp);
+ dos = new DataOutputStream(fos);
+ job.getCredentials().writeTokenStorageToStream(
+ dos);
+ } finally {
+ if (dos != null) {
+ dos.close();
+ }
+ if (fos != null) {
+ fos.close();
+ }
+ }
+ logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
+ System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
+ } else {
+ logger.info("Not pre-fetching token");
+ }
+ }
+ });
- }
+ }
}
-
diff --git a/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
index b93196c..014c231 100644
--- a/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
+++ b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
@@ -28,7 +28,6 @@ import azkaban.jobExecutor.ScriptJob;
import azkaban.utils.Props;
import azkaban.utils.Utils;
import azkaban.jobExecutor.utils.JobExecutionException;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
@@ -42,8 +41,7 @@ public class JobWrappingFactory
//private String _defaultType;
private Map<String, Class<? extends Job>> _jobToClass;
- private JobWrappingFactory(final Map<String, Class<? extends Job>> jobTypeToClassMap
- )
+ protected JobWrappingFactory(final Map<String, Class<? extends Job>> jobTypeToClassMap)
{
//this._defaultType = defaultType;
this._jobToClass = jobTypeToClassMap;
@@ -71,7 +69,7 @@ public class JobWrappingFactory
_jobToClass = newJobExecutors;
}
- public Job buildJobExecutor(Props props, Logger logger)
+ public Job buildJobExecutor(String jobId, Props props, Logger logger)
{
Job job;
@@ -93,7 +91,7 @@ public class JobWrappingFactory
));
}
- job = (Job)Utils.callConstructor(executorClass, props, logger);
+ job = (Job)Utils.callConstructor(executorClass, jobId, props, logger);
}
catch (Exception e) {
unit/executions/exectest1/exec1.flow 154(+154 -0)
diff --git a/unit/executions/exectest1/exec1.flow b/unit/executions/exectest1/exec1.flow
new file mode 100644
index 0000000..4684cd6
--- /dev/null
+++ b/unit/executions/exectest1/exec1.flow
@@ -0,0 +1,154 @@
+{
+ "id" : "derived-member-data",
+ "success.email" : [],
+ "edges" : [ {
+ "source" : "job1",
+ "target" : "job2"
+ }, {
+ "source" : "job2",
+ "target" : "job3"
+ },{
+ "source" : "job2",
+ "target" : "job4"
+ }, {
+ "source" : "job3",
+ "target" : "job5"
+ },{
+ "source" : "job4",
+ "target" : "job5"
+ },{
+ "source" : "job5",
+ "target" : "job7"
+ },{
+ "source" : "job1",
+ "target" : "job6"
+ },{
+ "source" : "job6",
+ "target" : "job7"
+ },{
+ "source" : "job7",
+ "target" : "job8"
+ },{
+ "source" : "job7",
+ "target" : "job9"
+ },
+ {
+ "source" : "job8",
+ "target" : "job10"
+ },
+ {
+ "source" : "job9",
+ "target" : "job10"
+ }
+ ],
+ "failure.email" : [],
+ "nodes" : [ {
+ "propSource" : "prop2.properties",
+ "id" : "job1",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job1.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job2",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job2.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job3",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job3.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job4",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job4.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job5",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job5.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job6",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job6.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job7",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job7.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job8",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job8.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job9",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job9.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job10",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job10.job",
+ "expectedRuntime" : 1
+ }
+ ],
+ "layedout" : false,
+ "type" : "flow",
+ "props" : [ {
+ "inherits" : "prop1.properties",
+ "source" : "prop2.properties"
+ },{
+ "source" : "prop1.properties"
+ }]
+}
\ No newline at end of file
unit/executions/exectest1/job1.job 4(+4 -0)
diff --git a/unit/executions/exectest1/job1.job b/unit/executions/exectest1/job1.job
new file mode 100644
index 0000000..0a60dc4
--- /dev/null
+++ b/unit/executions/exectest1/job1.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
unit/executions/exectest1/job10.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job10.job b/unit/executions/exectest1/job10.job
new file mode 100644
index 0000000..218f774
--- /dev/null
+++ b/unit/executions/exectest1/job10.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job8,job9
+seconds=5
+fail=false
unit/executions/exectest1/job2.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job2.job b/unit/executions/exectest1/job2.job
new file mode 100644
index 0000000..3c918c8
--- /dev/null
+++ b/unit/executions/exectest1/job2.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job1
+seconds=2
+fail=false
unit/executions/exectest1/job3.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job3.job b/unit/executions/exectest1/job3.job
new file mode 100644
index 0000000..b26a76c
--- /dev/null
+++ b/unit/executions/exectest1/job3.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job2
+seconds=3
+fail=false
unit/executions/exectest1/job4.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job4.job b/unit/executions/exectest1/job4.job
new file mode 100644
index 0000000..1cbac6f
--- /dev/null
+++ b/unit/executions/exectest1/job4.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job2
+seconds=4
+fail=false
unit/executions/exectest1/job5.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job5.job b/unit/executions/exectest1/job5.job
new file mode 100644
index 0000000..8dd934d
--- /dev/null
+++ b/unit/executions/exectest1/job5.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job3,job4
+seconds=5
+fail=false
unit/executions/exectest1/job6.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job6.job b/unit/executions/exectest1/job6.job
new file mode 100644
index 0000000..fc4474d
--- /dev/null
+++ b/unit/executions/exectest1/job6.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job1
+seconds=1
+fail=false
unit/executions/exectest1/job7.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job7.job b/unit/executions/exectest1/job7.job
new file mode 100644
index 0000000..d01cf79
--- /dev/null
+++ b/unit/executions/exectest1/job7.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job5,job6
+seconds=2
+fail=false
unit/executions/exectest1/job8.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job8.job b/unit/executions/exectest1/job8.job
new file mode 100644
index 0000000..643598c
--- /dev/null
+++ b/unit/executions/exectest1/job8.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job7
+seconds=3
+fail=false
unit/executions/exectest1/job9.job 5(+5 -0)
diff --git a/unit/executions/exectest1/job9.job b/unit/executions/exectest1/job9.job
new file mode 100644
index 0000000..5d6dda9
--- /dev/null
+++ b/unit/executions/exectest1/job9.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job7
+seconds=4
+fail=false
diff --git a/unit/executions/exectest1/prop1.properties b/unit/executions/exectest1/prop1.properties
new file mode 100644
index 0000000..fb37c8b
--- /dev/null
+++ b/unit/executions/exectest1/prop1.properties
@@ -0,0 +1,2 @@
+a=0
+c=0
\ No newline at end of file
diff --git a/unit/executions/exectest1/prop2.properties b/unit/executions/exectest1/prop2.properties
new file mode 100644
index 0000000..86fbde0
--- /dev/null
+++ b/unit/executions/exectest1/prop2.properties
@@ -0,0 +1,2 @@
+a=1
+b=2
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/EventCollectorListener.java b/unit/java/azkaban/test/executor/EventCollectorListener.java
new file mode 100644
index 0000000..6e00681
--- /dev/null
+++ b/unit/java/azkaban/test/executor/EventCollectorListener.java
@@ -0,0 +1,53 @@
+package azkaban.test.executor;
+
+import java.util.ArrayList;
+
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.event.EventListener;
+
+public class EventCollectorListener implements EventListener {
+ private ArrayList<Event> eventList = new ArrayList<Event>();
+
+ @Override
+ public void handleEvent(Event event) {
+ eventList.add(event);
+ }
+
+ public ArrayList<Event> getEventList() {
+ return eventList;
+ }
+
+ public boolean checkOrdering() {
+ long time = 0;
+ for (Event event: eventList) {
+ if (time > event.getTime()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void checkEventExists(Type[] types) {
+ int index = 0;
+ for (Event event: eventList) {
+ if (event.getRunner() == null) {
+ continue;
+ }
+
+ if (index >= types.length) {
+ throw new RuntimeException("More events than expected. Got " + event.getType());
+ }
+ Type type = types[index++];
+
+ if (type != event.getType()) {
+ throw new RuntimeException("Got " + event.getType() + ", expected " + type + " index:" + index);
+ }
+ }
+
+ if (types.length != index) {
+ throw new RuntimeException("Not enough events.");
+ }
+ }
+}
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
new file mode 100644
index 0000000..f6d6786
--- /dev/null
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -0,0 +1,69 @@
+package azkaban.test.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.FlowRunner;
+import azkaban.flow.Flow;
+import azkaban.utils.JSONUtils;
+
+public class FlowRunnerTest {
+ private File workingDir;
+ private Logger logger = Logger.getLogger(FlowRunnerTest.class);
+
+ public FlowRunnerTest() {
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("Create temp dir");
+ workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ System.out.println("Teardown temp dir");
+ if (workingDir != null) {
+ FileUtils.deleteDirectory(workingDir);
+ workingDir = null;
+ }
+ }
+
+ @Test
+ public void exec1() throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(testDir, "exec1");
+
+ FlowRunner runner = new FlowRunner(exFlow);
+
+
+ }
+
+ private ExecutableFlow prepareExecDir(File execDir, String execName) throws IOException {
+ FileUtils.copyDirectory(execDir, workingDir);
+
+ File jsonFlowFile = new File(workingDir, execName + ".flow");
+ HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ ExecutableFlow execFlow = new ExecutableFlow(execName, flow);
+ execFlow.setExecutionPath(workingDir.getPath());
+ return execFlow;
+ }
+
+}
unit/java/azkaban/test/executor/JobRunnerTest.java 249(+249 -0)
diff --git a/unit/java/azkaban/test/executor/JobRunnerTest.java b/unit/java/azkaban/test/executor/JobRunnerTest.java
new file mode 100644
index 0000000..7804d21
--- /dev/null
+++ b/unit/java/azkaban/test/executor/JobRunnerTest.java
@@ -0,0 +1,249 @@
+package azkaban.test.executor;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.JobRunner;
+import azkaban.jobExecutor.JavaJob;
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.utils.Props;
+
+public class JobRunnerTest {
+ private File workingDir;
+
+ public JobRunnerTest() {
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("Create temp dir");
+ workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ System.out.println("Teardown temp dir");
+ if (workingDir != null) {
+ FileUtils.deleteDirectory(workingDir);
+ workingDir = null;
+ }
+ }
+
+ @Test
+ public void testBasicRun() {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ Props props = createProps(1, false);
+ ExecutableNode node = new ExecutableNode();
+ node.setId("myjobid");
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+ JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+ runner.addListener(eventCollector);
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+
+ runner.run();
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_SUCCEEDED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue(node.getStatus() == Status.SUCCEEDED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
+
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps != null);
+ Assert.assertTrue(outputProps.getKeySet().isEmpty());
+ Assert.assertTrue(logFile.exists());
+
+ Assert.assertTrue(eventCollector.checkOrdering());
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_SUCCEEDED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFailedRun() {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ Props props = createProps(1, true);
+ ExecutableNode node = new ExecutableNode();
+ node.setId("myjobid");
+
+ JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+ runner.addListener(eventCollector);
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+ eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+ runner.run();
+ eventCollector.handleEvent(Event.create(null, Type.JOB_FAILED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue(node.getStatus() == Status.FAILED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
+
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps == null);
+ Assert.assertTrue(logFile.exists());
+ Assert.assertTrue(eventCollector.checkOrdering());
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FAILED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDisabledRun() {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ Props props = createProps(1, true);
+ ExecutableNode node = new ExecutableNode();
+ node.setId("myjobid");
+
+ node.setStatus(Status.DISABLED);
+ JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+ runner.addListener(eventCollector);
+
+ // Should be disabled.
+ Assert.assertTrue(runner.getStatus() == Status.DISABLED);
+ eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+ runner.run();
+ eventCollector.handleEvent(Event.create(null, Type.JOB_SUCCEEDED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue(node.getStatus() == Status.SKIPPED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ // Give it 10 ms to fail.
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
+
+ // Log file and output files should not exist.
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps == null);
+ Assert.assertTrue(runner.getLogFilePath() == null);
+ Assert.assertTrue(eventCollector.checkOrdering());
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_SUCCEEDED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreKilledRun() {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ Props props = createProps(1, true);
+ ExecutableNode node = new ExecutableNode();
+ node.setId("myjobid");
+
+ node.setStatus(Status.KILLED);
+ JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+ runner.addListener(eventCollector);
+
+ // Should be killed.
+ Assert.assertTrue(runner.getStatus() == Status.KILLED);
+ eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+ runner.run();
+ eventCollector.handleEvent(Event.create(null, Type.JOB_KILLED));
+
+ // Should just skip the run and not change
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue(node.getStatus() == Status.KILLED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ // Give it 10 ms to fail.
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
+
+ // Log file and output files should not exist.
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps == null);
+ Assert.assertTrue(runner.getLogFilePath() == null);
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_KILLED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCancelRun() {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ Props props = createProps(5, true);
+ ExecutableNode node = new ExecutableNode();
+ node.setId("myjobid");
+
+ JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+ runner.addListener(eventCollector);
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+
+ eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+ Thread thread = new Thread(runner);
+ thread.run();
+
+ eventCollector.handleEvent(Event.create(null, Type.JOB_KILLED));
+ synchronized(this) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ runner.cancel();
+ }
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue(node.getStatus() == Status.KILLED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ // Give it 10 ms to fail.
+ Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
+
+ // Log file and output files should not exist.
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps == null);
+ Assert.assertTrue(logFile.exists());
+ Assert.assertTrue(eventCollector.checkOrdering());
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FAILED});
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private Props createProps( int sleepSec, boolean fail) {
+ Props props = new Props();
+ props.put("type", "java");
+ props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
+ props.put("seconds", 1);
+ props.put(ProcessJob.WORKING_DIR, workingDir.getPath());
+ props.put("fail", String.valueOf(fail));
+
+ return props;
+ }
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
new file mode 100644
index 0000000..1ff0ec8
--- /dev/null
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -0,0 +1,36 @@
+package azkaban.test.executor;
+
+import java.util.Map;
+
+import azkaban.utils.Props;
+
+public class SleepJavaJob {
+ private Props props;
+ @SuppressWarnings("unchecked")
+ public SleepJavaJob(String id, Map<String, String> parameters) {
+ props = new Props(null, parameters);
+
+ System.out.println("Properly created");
+ }
+
+ public void run() throws Exception {
+ int sec = props.getInt("seconds");
+ boolean fail = props.getBoolean("fail", false);
+ synchronized(this) {
+ try {
+ this.wait(sec*1000);
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted");
+ }
+ }
+
+ if (fail) {
+ throw new Exception("I failed because I had to.");
+ }
+ }
+
+ public void cancel() throws Exception {
+ System.out.println("Cancelled called");
+ this.notifyAll();
+ }
+}
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
index d9edc3d..ffe68d7 100644
--- a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -95,7 +95,7 @@ public class JavaJobTest
//
// EasyMock.replay(descriptor);
- job = new JavaJob(props, log);
+ job = new JavaJob("jestJava", props, log);
// EasyMock.verify(descriptor);
}
@@ -112,6 +112,18 @@ public class JavaJobTest
}
@Test
+ public void testJavaJobHashmap() {
+ /* initialize the Props */
+ props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
+ props.put("seconds", 1);
+ props.put(ProcessJob.WORKING_DIR, ".");
+ props.put("input", inputFile);
+ props.put("output", outputFile);
+ props.put("classpath", classPaths);
+ job.run();
+ }
+
+ @Test
public void testFailedJavaJob() {
props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
props.put(ProcessJob.WORKING_DIR, ".");
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index 861601c..1ec8c5b 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -34,7 +34,7 @@ public class ProcessJobTest
//
// EasyMock.replay(props);
- job = new ProcessJob(props, log);
+ job = new ProcessJob("TestProcess", props, log);
}
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
index f86ed90..0881eff 100644
--- a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -90,7 +90,7 @@ public class PythonJobTest
// EasyMock.expect(descriptor.getProps()).andReturn(props).times(3);
// EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
// EasyMock.replay(descriptor);
- job = new PythonJob(props, log);
+ job = new PythonJob("TestProcess", props, log);
// EasyMock.verify(descriptor);
try
{