azkaban-aplcache

Commiting old changes to azkaban.

10/8/2012 9:30:10 PM

Details

build.xml 7(+7 -0)

diff --git a/build.xml b/build.xml
index df62df6..222687b 100644
--- a/build.xml
+++ b/build.xml
@@ -33,6 +33,13 @@
 		<delete dir="${dist.classes.dir}" />
 		<mkdir dir="${dist.classes.dir}" />
 		
+		<!-- copy non-java files to classes dir to load from classpath -->
+		<copy todir="${dist.classes.dir}">
+			<fileset dir="${java.src.dir}">
+				<exclude name="**/*.java" />
+			</fileset>
+		</copy>
+		
 		<javac fork="true" destdir="${dist.classes.dir}"
 			target="1.6" debug="true" deprecation="false" failonerror="true">
 			<src path="${java.src.dir}" />
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index e2191ec..5fb5355 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -12,6 +12,7 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
+import azkaban.utils.Props;
 
 public class ExecutableFlow {
 	private String executionId;
@@ -41,6 +42,7 @@ public class ExecutableFlow {
 	
 	private Integer pipelineLevel = null;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
+	private Props globalProps;
 	
 	public enum FailureAction {
 		FINISH_CURRENTLY_RUNNING,
@@ -62,6 +64,14 @@ public class ExecutableFlow {
 		this.setFlow(flow);
 	}
 	
+	public void setGlobalProps(Props props) {
+		globalProps = props;
+	}
+	
+	public Props getGlobalProps() {
+		return globalProps;
+	}
+	
 	public ExecutableFlow() {
 	}
 	
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 10a9e94..26d74f6 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -303,7 +303,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		String source = node.getJobPropsSource();
 		String propsSource = node.getPropsSource();
 
-		Props parentProps = propsSource == null ? null : sharedProps
+		Props parentProps = propsSource == null ? flow.getGlobalProps() : sharedProps
 				.get(propsSource);
 
 		// We add the previous job output and put into this props.
@@ -344,6 +344,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 				props.setParent(inherits);
 			}
+			else {
+				String source = fprops.getSource();
+				Props props = sharedProps.get(source);
+				props.setParent(flow.getGlobalProps());
+			}
 		}
 	}
 
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 6f4e91e..37048f0 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -54,7 +54,9 @@ public class FlowRunnerManager {
 	private String clientHostname;
 	private String clientPortNumber;
 	
-	public FlowRunnerManager(Props props, Mailman mailer) {
+	private Props globalProps;
+	
+	public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
 		this.mailer = mailer;
 //		this.defaultFailureEmail = props.getString("job.failure.email");
 //		this.defaultSuccessEmail = props.getString("job.success.email");
@@ -78,7 +80,8 @@ public class FlowRunnerManager {
 		File dir = new File(path);
 		ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
 		flow.setExecutionPath(path);
-
+		flow.setGlobalProps(globalProps);
+		
 		FlowRunner runner = new FlowRunner(flow);
 		runningFlows.put(id, runner);
 		runner.addListener(eventListener);
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 127d6b7..9c22d94 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -34,7 +34,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	private ExecutableNode node;
 	private File workingDir;
 	
-	private Logger logger;
+	private Logger logger = null;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender jobAppender;
 	
@@ -45,10 +45,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	public JobRunner(ExecutableNode node, Props props, File workingDir) {
 		this.props = props;
 		this.node = node;
-		this.node.setStatus(Status.WAITING);
 		this.workingDir = workingDir;
-
-		createLogger();
 	}
 
 	public ExecutableNode getNode() {
@@ -77,34 +74,40 @@ public class JobRunner extends EventHandler implements Runnable {
 	}
 
 	private void closeLogger() {
-		logger.removeAppender(jobAppender);
-		jobAppender.close();
+		if (jobAppender != null) {
+			logger.removeAppender(jobAppender);
+			jobAppender.close();
+		}
 	}
 	
 	@Override
 	public void run() {
 		if (node.getStatus() == Status.DISABLED) {
+			node.setStatus(Status.SKIPPED);
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
 			return;
 		} else if (node.getStatus() == Status.KILLED) {
 			this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
 			return;
 		}
+		
+		createLogger();
+		this.node.setStatus(Status.WAITING);
 		node.setStartTime(System.currentTimeMillis());
-		logger.info("Starting job " + node.getId() + " at " + node.getStartTime());
+		logInfo("Starting job " + node.getId() + " at " + node.getStartTime());
 		node.setStatus(Status.RUNNING);
 		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
 		
 		boolean succeeded = true;
-		synchronized(this) {
-			try {
-				wait(5000);
-			}
-			catch (InterruptedException e) {
-				logger.info("Job cancelled.");
-				succeeded = false;
-			}
-		}
+//		synchronized(this) {
+//			try {
+//				wait(5000);
+//			}
+//			catch (InterruptedException e) {
+//				logger.info("Job cancelled.");
+//				succeeded = false;
+//			}
+//		}
 
 
 		// Run Job
@@ -113,14 +116,14 @@ public class JobRunner extends EventHandler implements Runnable {
 		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 		JobWrappingFactory factory  = JobWrappingFactory.getJobWrappingFactory();
         job = factory.buildJobExecutor(props, logger);
-//
-//        try {
-//                job.run();
-//        } catch (Exception e) {
-//                succeeded = false;
-//                //logger.error("job run failed!");
-//                e.printStackTrace();
-//        }
+
+        try {
+                job.run();
+        } catch (Exception e) {
+                succeeded = false;
+                logError("Job run failed!");
+                e.printStackTrace();
+        }
 		
 		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
@@ -133,21 +136,21 @@ public class JobRunner extends EventHandler implements Runnable {
 			node.setStatus(Status.FAILED);
 			this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
 		}
-		logger.info("Finishing job " + node.getId() + " at " + node.getEndTime());
+		logInfo("Finishing job " + node.getId() + " at " + node.getEndTime());
 		closeLogger();
 	}
 
 	public synchronized void cancel() {
 		// Cancel code here
 		if(job == null) {
-            logger.error("Job doesn't exisit!");
+			logError("Job doesn't exisit!");
             return;
 		}
 
 		try {
             job.cancel();
 		} catch (Exception e) {
-            logger.error("Failed trying to cancel job!");
+			logError("Failed trying to cancel job!");
             e.printStackTrace();
 		}
 
@@ -170,4 +173,17 @@ public class JobRunner extends EventHandler implements Runnable {
 		emails = this.props.getStringList(EMAILLIST);
 		return emails;
 	}
+	
+	private void logError(String message) {
+		if (logger != null) {
+			logger.error(message);
+		}
+	}
+	
+	private void logInfo(String message) {
+		if (logger != null) {
+			logger.info(message);
+		}
+	}
+	
 }
diff --git a/src/java/azkaban/utils/Mailman.java b/src/java/azkaban/utils/Mailman.java
index 1770009..2553a21 100644
--- a/src/java/azkaban/utils/Mailman.java
+++ b/src/java/azkaban/utils/Mailman.java
@@ -63,29 +63,29 @@ public class Mailman {
 		Session session = Session.getInstance(props, null);
 		session.setDebug(true);
 		
-//		//email message
-//		Message msg = new MimeMessage(session);
-//		msg.setFrom(new InternetAddress(fromAddress));
-//		for(String str : toAddress) {
-//			msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
-//		}
-//		
-//		msg.setSubject(subject);
-//		msg.setText(body);
-//		
-//		//transport
-//		SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
-//		
-//		try {
-//			t.connect(_mailHost, _mailUser, _mailPassword);
-//			t.sendMessage(msg, msg.getAllRecipients());
-//		}
-//		catch (Exception e) {
-//			logger.error(e);
-//		}
-//		finally {
-//			t.close();
-//		}
+		//email message
+		Message msg = new MimeMessage(session);
+		msg.setFrom(new InternetAddress(fromAddress));
+		for(String str : toAddress) {
+			msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
+		}
+		
+		msg.setSubject(subject);
+		msg.setText(body);
+		
+		//transport
+		SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
+		
+		try {
+			t.connect(_mailHost, _mailUser, _mailPassword);
+			t.sendMessage(msg, msg.getAllRecipients());
+		}
+		catch (Exception e) {
+			logger.error(e);
+		}
+		finally {
+			t.close();
+		}
 	}
 
 	public void sendEmailIfPossible(String fromAddress, List<String> toAddress,
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index 5d14aa0..c7b1d6f 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -99,8 +99,10 @@ public class AzkabanExecutorServer {
 				props.getString("mail.password", ""),
 				props.getString("mail.sender", ""));
 
-		runnerManager = new FlowRunnerManager(props, mailer);
-
+		String globalPropsPath = props.getString("project.global.properties");
+		Props globalProps = new Props(null, globalPropsPath);
+		runnerManager = new FlowRunnerManager(props, globalProps, mailer);
+		
 		try {
 			server.start();
 		}