azkaban-aplcache
Changes
build.xml 7(+7 -0)
src/java/azkaban/executor/JobRunner.java 70(+43 -27)
src/java/azkaban/utils/Mailman.java 46(+23 -23)
Details
build.xml 7(+7 -0)
diff --git a/build.xml b/build.xml
index df62df6..222687b 100644
--- a/build.xml
+++ b/build.xml
@@ -33,6 +33,13 @@
<delete dir="${dist.classes.dir}" />
<mkdir dir="${dist.classes.dir}" />
+ <!-- copy non-java files to classes dir to load from classpath -->
+ <copy todir="${dist.classes.dir}">
+ <fileset dir="${java.src.dir}">
+ <exclude name="**/*.java" />
+ </fileset>
+ </copy>
+
<javac fork="true" destdir="${dist.classes.dir}"
target="1.6" debug="true" deprecation="false" failonerror="true">
<src path="${java.src.dir}" />
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index e2191ec..5fb5355 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -12,6 +12,7 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
+import azkaban.utils.Props;
public class ExecutableFlow {
private String executionId;
@@ -41,6 +42,7 @@ public class ExecutableFlow {
private Integer pipelineLevel = null;
private Map<String, String> flowParameters = new HashMap<String, String>();
+ private Props globalProps;
public enum FailureAction {
FINISH_CURRENTLY_RUNNING,
@@ -62,6 +64,14 @@ public class ExecutableFlow {
this.setFlow(flow);
}
+ public void setGlobalProps(Props props) {
+ globalProps = props;
+ }
+
+ public Props getGlobalProps() {
+ return globalProps;
+ }
+
public ExecutableFlow() {
}
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 10a9e94..26d74f6 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -303,7 +303,7 @@ public class FlowRunner extends EventHandler implements Runnable {
String source = node.getJobPropsSource();
String propsSource = node.getPropsSource();
- Props parentProps = propsSource == null ? null : sharedProps
+ Props parentProps = propsSource == null ? flow.getGlobalProps() : sharedProps
.get(propsSource);
// We add the previous job output and put into this props.
@@ -344,6 +344,11 @@ public class FlowRunner extends EventHandler implements Runnable {
props.setParent(inherits);
}
+ else {
+ String source = fprops.getSource();
+ Props props = sharedProps.get(source);
+ props.setParent(flow.getGlobalProps());
+ }
}
}
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 6f4e91e..37048f0 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -54,7 +54,9 @@ public class FlowRunnerManager {
private String clientHostname;
private String clientPortNumber;
- public FlowRunnerManager(Props props, Mailman mailer) {
+ private Props globalProps;
+
+ public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
this.mailer = mailer;
// this.defaultFailureEmail = props.getString("job.failure.email");
// this.defaultSuccessEmail = props.getString("job.success.email");
@@ -78,7 +80,8 @@ public class FlowRunnerManager {
File dir = new File(path);
ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
flow.setExecutionPath(path);
-
+ flow.setGlobalProps(globalProps);
+
FlowRunner runner = new FlowRunner(flow);
runningFlows.put(id, runner);
runner.addListener(eventListener);
src/java/azkaban/executor/JobRunner.java 70(+43 -27)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 127d6b7..9c22d94 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -34,7 +34,7 @@ public class JobRunner extends EventHandler implements Runnable {
private ExecutableNode node;
private File workingDir;
- private Logger logger;
+ private Logger logger = null;
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender jobAppender;
@@ -45,10 +45,7 @@ public class JobRunner extends EventHandler implements Runnable {
public JobRunner(ExecutableNode node, Props props, File workingDir) {
this.props = props;
this.node = node;
- this.node.setStatus(Status.WAITING);
this.workingDir = workingDir;
-
- createLogger();
}
public ExecutableNode getNode() {
@@ -77,34 +74,40 @@ public class JobRunner extends EventHandler implements Runnable {
}
private void closeLogger() {
- logger.removeAppender(jobAppender);
- jobAppender.close();
+ if (jobAppender != null) {
+ logger.removeAppender(jobAppender);
+ jobAppender.close();
+ }
}
@Override
public void run() {
if (node.getStatus() == Status.DISABLED) {
+ node.setStatus(Status.SKIPPED);
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
return;
} else if (node.getStatus() == Status.KILLED) {
this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
return;
}
+
+ createLogger();
+ this.node.setStatus(Status.WAITING);
node.setStartTime(System.currentTimeMillis());
- logger.info("Starting job " + node.getId() + " at " + node.getStartTime());
+ logInfo("Starting job " + node.getId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
boolean succeeded = true;
- synchronized(this) {
- try {
- wait(5000);
- }
- catch (InterruptedException e) {
- logger.info("Job cancelled.");
- succeeded = false;
- }
- }
+// synchronized(this) {
+// try {
+// wait(5000);
+// }
+// catch (InterruptedException e) {
+// logger.info("Job cancelled.");
+// succeeded = false;
+// }
+// }
// Run Job
@@ -113,14 +116,14 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
JobWrappingFactory factory = JobWrappingFactory.getJobWrappingFactory();
job = factory.buildJobExecutor(props, logger);
-//
-// try {
-// job.run();
-// } catch (Exception e) {
-// succeeded = false;
-// //logger.error("job run failed!");
-// e.printStackTrace();
-// }
+
+ try {
+ job.run();
+ } catch (Exception e) {
+ succeeded = false;
+ logError("Job run failed!");
+ e.printStackTrace();
+ }
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
@@ -133,21 +136,21 @@ public class JobRunner extends EventHandler implements Runnable {
node.setStatus(Status.FAILED);
this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
}
- logger.info("Finishing job " + node.getId() + " at " + node.getEndTime());
+ logInfo("Finishing job " + node.getId() + " at " + node.getEndTime());
closeLogger();
}
public synchronized void cancel() {
// Cancel code here
if(job == null) {
- logger.error("Job doesn't exisit!");
+ logError("Job doesn't exisit!");
return;
}
try {
job.cancel();
} catch (Exception e) {
- logger.error("Failed trying to cancel job!");
+ logError("Failed trying to cancel job!");
e.printStackTrace();
}
@@ -170,4 +173,17 @@ public class JobRunner extends EventHandler implements Runnable {
emails = this.props.getStringList(EMAILLIST);
return emails;
}
+
+ private void logError(String message) {
+ if (logger != null) {
+ logger.error(message);
+ }
+ }
+
+ private void logInfo(String message) {
+ if (logger != null) {
+ logger.info(message);
+ }
+ }
+
}
src/java/azkaban/utils/Mailman.java 46(+23 -23)
diff --git a/src/java/azkaban/utils/Mailman.java b/src/java/azkaban/utils/Mailman.java
index 1770009..2553a21 100644
--- a/src/java/azkaban/utils/Mailman.java
+++ b/src/java/azkaban/utils/Mailman.java
@@ -63,29 +63,29 @@ public class Mailman {
Session session = Session.getInstance(props, null);
session.setDebug(true);
-// //email message
-// Message msg = new MimeMessage(session);
-// msg.setFrom(new InternetAddress(fromAddress));
-// for(String str : toAddress) {
-// msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
-// }
-//
-// msg.setSubject(subject);
-// msg.setText(body);
-//
-// //transport
-// SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
-//
-// try {
-// t.connect(_mailHost, _mailUser, _mailPassword);
-// t.sendMessage(msg, msg.getAllRecipients());
-// }
-// catch (Exception e) {
-// logger.error(e);
-// }
-// finally {
-// t.close();
-// }
+ //email message
+ Message msg = new MimeMessage(session);
+ msg.setFrom(new InternetAddress(fromAddress));
+ for(String str : toAddress) {
+ msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
+ }
+
+ msg.setSubject(subject);
+ msg.setText(body);
+
+ //transport
+ SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
+
+ try {
+ t.connect(_mailHost, _mailUser, _mailPassword);
+ t.sendMessage(msg, msg.getAllRecipients());
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ finally {
+ t.close();
+ }
}
public void sendEmailIfPossible(String fromAddress, List<String> toAddress,
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index 5d14aa0..c7b1d6f 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -99,8 +99,10 @@ public class AzkabanExecutorServer {
props.getString("mail.password", ""),
props.getString("mail.sender", ""));
- runnerManager = new FlowRunnerManager(props, mailer);
-
+ String globalPropsPath = props.getString("project.global.properties");
+ Props globalProps = new Props(null, globalPropsPath);
+ runnerManager = new FlowRunnerManager(props, globalProps, mailer);
+
try {
server.start();
}