azkaban-aplcache

Details

.classpath 1(+1 -0)

diff --git a/.classpath b/.classpath
index 9368e98..93e98de 100644
--- a/.classpath
+++ b/.classpath
@@ -20,5 +20,6 @@
 	<classpathentry kind="lib" path="lib/velocity-1.7.jar"/>
 	<classpathentry kind="lib" path="lib/ehcache-core-2.5.1.jar"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+	<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
 	<classpathentry kind="output" path="dist"/>
 </classpath>
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 0eabd49..5ddba5f 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -27,3 +27,10 @@ jetty.password=password
 jetty.keypassword=password
 jetty.truststore=keystore
 jetty.trustpassword=password
+
+# Azkaban Executor settings
+executor.maxThreads=50
+executor.port=12321
+executor.shared.token=abcdefg
+
+
diff --git a/lib/httpclient-4.2.1.jar b/lib/httpclient-4.2.1.jar
new file mode 100644
index 0000000..1d52333
Binary files /dev/null and b/lib/httpclient-4.2.1.jar differ
diff --git a/src/java/azkaban/executor/AzkabanExecutorServer.java b/src/java/azkaban/executor/AzkabanExecutorServer.java
new file mode 100644
index 0000000..f781e97
--- /dev/null
+++ b/src/java/azkaban/executor/AzkabanExecutorServer.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+import azkaban.utils.Props;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+public class AzkabanExecutorServer {
+	private static final Logger logger = Logger
+			.getLogger(AzkabanExecutorServer.class);
+
+	public static final String AZKABAN_HOME = "AZKABAN_HOME";
+	public static final String DEFAULT_CONF_PATH = "conf";
+	public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+
+	private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+	private static final int DEFAULT_PORT_NUMBER = 8081;
+	private static final int DEFAULT_THREAD_NUMBER = 50;
+
+	private static AzkabanExecutorServer app;
+
+	private Props props;
+	private File tempDir;
+	private Server server;
+
+	/**
+	 * Constructor
+	 * 
+	 * @throws Exception
+	 */
+	public AzkabanExecutorServer(Props props) throws Exception {
+		this.props = props;
+
+		int portNumber = props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+		int maxThreads = props.getInt("executor.maxThreads",
+				DEFAULT_THREAD_NUMBER);
+
+		Server server = new Server(portNumber);
+		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+		server.setThreadPool(httpThreadPool);
+
+        Context root = new Context(server, "/", Context.SESSIONS);
+		String sharedToken = props.getString("executor.shared.token", "");
+		ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
+		root.addServlet(executorHolder, "/");
+
+		server.start();
+		logger.info("Azkaban Executor Server started on port " + portNumber);
+
+		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
+	}
+
+	public void stopServer() throws Exception {
+		server.stop();
+		server.destroy();
+	}
+
+	/**
+	 * Returns the global azkaban properties
+	 * 
+	 * @return
+	 */
+	public Props getAzkabanProps() {
+		return props;
+	}
+
+	/**
+	 * Azkaban using Jetty
+	 * 
+	 * @param args
+	 * @throws IOException
+	 */
+	public static void main(String[] args) throws Exception {
+		OptionParser parser = new OptionParser();
+
+		OptionSpec<String> configDirectory = parser
+				.acceptsAll(Arrays.asList("c", "conf"),
+						"The conf directory for Azkaban.").withRequiredArg()
+				.describedAs("conf").ofType(String.class);
+
+		logger.error("Starting Jetty Azkaban Executor...");
+
+		// Grabbing the azkaban settings from the conf directory.
+		Props azkabanSettings = null;
+		OptionSet options = parser.parse(args);
+		if (options.has(configDirectory)) {
+			String path = options.valueOf(configDirectory);
+			logger.info("Loading azkaban settings file from " + path);
+			File file = new File(path, AZKABAN_PROPERTIES_FILE);
+			if (!file.exists() || file.isDirectory() || !file.canRead()) {
+				logger.error("Cannot read file " + file);
+			}
+
+			azkabanSettings = loadAzkabanConfiguration(file.getPath());
+		} else {
+			logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
+			azkabanSettings = loadConfigurationFromAzkabanHome();
+		}
+
+		if (azkabanSettings == null) {
+			logger.error("Azkaban Properties not loaded.");
+			logger.error("Exiting Azkaban Executor Server...");
+			return;
+		}
+
+		// Setup time zone
+		if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) {
+			String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID);
+			TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+			DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+
+			logger.info("Setting timezone to " + timezone);
+		}
+
+		app = new AzkabanExecutorServer(azkabanSettings);
+
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+
+			public void run() {
+				logger.info("Shutting down http server...");
+				try {
+					app.stopServer();
+				} catch (Exception e) {
+					logger.error("Error while shutting down http server.", e);
+				}
+				logger.info("kk thx bye.");
+			}
+		});
+	}
+
+	/**
+	 * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+	 * 
+	 * @return
+	 */
+	private static Props loadConfigurationFromAzkabanHome() {
+		String azkabanHome = System.getenv("AZKABAN_HOME");
+
+		if (azkabanHome == null) {
+			logger.error("AZKABAN_HOME not set. Will try default.");
+			return null;
+		}
+
+		if (!new File(azkabanHome).isDirectory()
+				|| !new File(azkabanHome).canRead()) {
+			logger.error(azkabanHome + " is not a readable directory.");
+			return null;
+		}
+
+		File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+		if (!confPath.exists() || !confPath.isDirectory()
+				|| !confPath.canRead()) {
+			logger.error(azkabanHome
+					+ " does not contain a readable conf directory.");
+			return null;
+		}
+
+		File confFile = new File(confPath, AZKABAN_PROPERTIES_FILE);
+		if (!confFile.exists() || confFile.isDirectory() || !confPath.canRead()) {
+			logger.error(confFile + " does not contain a readable azkaban.properties file.");
+			return null;
+		}
+
+		return loadAzkabanConfiguration(confFile.getPath());
+	}
+
+	/**
+	 * Returns the set temp dir
+	 * 
+	 * @return
+	 */
+	public File getTempDirectory() {
+		return tempDir;
+	}
+
+	/**
+	 * Loads the Azkaban conf file int a Props object
+	 * 
+	 * @param path
+	 * @return
+	 */
+	private static Props loadAzkabanConfiguration(String path) {
+		try {
+			return new Props(null, path);
+		} catch (FileNotFoundException e) {
+			logger.error("File not found. Could not load azkaban config file "
+					+ path);
+		} catch (IOException e) {
+			logger.error("File found, but error reading. Could not load azkaban config file "
+					+ path);
+		}
+
+		return null;
+	}
+}
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
new file mode 100644
index 0000000..580d95d
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -0,0 +1,20 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+
+import azkaban.flow.Flow;
+import azkaban.utils.Props;
+
+public class ExecutableFlow {
+	private Flow flow;
+	private HashMap<String, Props> sourceProps = new HashMap<String, Props>();
+	
+	public ExecutableFlow(Flow flow, HashMap<String,Props> sourceProps) {
+		this.flow = flow;
+		this.sourceProps = sourceProps;
+	}
+	
+	public void run() {
+		
+	}
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 30f7582..a099c60 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1,5 +1,21 @@
 package azkaban.executor;
 
-public interface ExecutorManager {
+import azkaban.utils.Props;
 
+public class ExecutorManager {
+	private String token;
+	
+	public ExecutorManager(Props props) {
+		token = props.getString("executor.shared.token", "");
+	}
+	
+	public void executeJob() {
+		
+	}
+	
+	private class ExecutorThread extends Thread {
+		public void run() {
+			
+		}
+	}
 }
diff --git a/src/java/azkaban/executor/ExecutorServlet.java b/src/java/azkaban/executor/ExecutorServlet.java
new file mode 100644
index 0000000..3d57999
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorServlet.java
@@ -0,0 +1,47 @@
+package azkaban.executor;
+
+import java.io.IOException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+public class ExecutorServlet extends HttpServlet {
+	private static final Logger logger = Logger.getLogger(ExecutorServlet.class.getName());
+	private String sharedToken;
+	
+	public ExecutorServlet(String token) {
+		super();
+		sharedToken = token;
+	}
+	
+	@Override
+	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		String token = getParam(req, "sharedToken");
+
+	}
+	
+	@Override
+	public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		
+	}
+	
+	/**
+	 * Duplicated code with AbstractAzkabanServlet, but ne
+	 */
+	public boolean hasParam(HttpServletRequest request, String param) {
+		return request.getParameter(param) != null;
+	}
+
+	public String getParam(HttpServletRequest request, String name)
+			throws ServletException {
+		String p = request.getParameter(name);
+		if (p == null)
+			throw new ServletException("Missing required parameter '" + name
+					+ "'.");
+		else
+			return p;
+	}
+}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 7e93afe..d56eae7 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -79,6 +79,8 @@ public class AzkabanWebServer {
     public static final String DEFAULT_CONF_PATH = "conf";
     public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
 
+    private static AzkabanWebServer app;
+    
     private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
     //private static final int DEFAULT_PORT_NUMBER = 8081;
     private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
@@ -122,6 +124,8 @@ public class AzkabanWebServer {
             String timezone = props.getString(DEFAULT_TIMEZONE_ID);
             TimeZone.setDefault(TimeZone.getTimeZone(timezone));
             DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+            
+			logger.info("Setting timezone to " + timezone);
         }
 
     }
@@ -291,7 +295,7 @@ public class AzkabanWebServer {
             logger.error("Exiting Azkaban...");
             return;
         }
-        AzkabanWebServer app = new AzkabanWebServer(azkabanSettings);
+        app = new AzkabanWebServer(azkabanSettings);
 
         //int portNumber = azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
         int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",DEFAULT_SSL_PORT_NUMBER);
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 143b9ae..c591351 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -11,16 +11,13 @@ import azkaban.webapp.session.Session;
 public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 
 	@Override
-	protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
-			Session session) throws ServletException, IOException {
-		// TODO Auto-generated method stub
+	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
 		
 	}
 
 	@Override
-	protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
-			Session session) throws ServletException, IOException {
-		// TODO Auto-generated method stub
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+
 		
 	}
 
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 5458d33..c7da369 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -54,7 +54,6 @@ public abstract class LoginAbstractAzkabanServlet extends
 			return;
 		}
 
-		
 		if (session != null) {
 			logger.info("Found session " + session.getUser());
 			handleGet(req, resp, session);
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
new file mode 100644
index 0000000..5ba18ad
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -0,0 +1,86 @@
+<!DOCTYPE html> 
+<html>
+	<head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+		<script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>    
+		<script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>   
+		<script type="text/javascript" src="${context}/js/namespace.js"></script>
+		<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+		<script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
+		<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
+		<script type="text/javascript">
+			var contextURL = "${context}";
+			var currentTime = ${currentTime};
+			var timezone = "${timezone}";
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
+			
+			var projectName = "${project.name}";
+			var flowName = "${flowid}";
+		</script>
+		<link rel="stylesheet" type="text/css" href="${context}/css/jquery.contextMenu.custom.css" /> 
+	</head>
+	<body>
+#set($current_page="all")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+		<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>  
+		<div class="content">
+#if($errorMsg)
+				<div class="box-error-message">$errorMsg</div>
+#else
+#if($error_message != "null")
+				<div class="box-error-message">$error_message</div>
+#elseif($success_message != "null")
+				<div class="box-success-message">$success_message</div>
+#end
+
+				<div id="all-jobs-content">
+					<div class="section-hd">
+						<h2><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h2>
+						<div class="section-sub-hd">
+							<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
+						</div>
+					</div>
+					
+					<div id="headertabs" class="headertabs">
+						<ul>
+							<li><a id="graphViewLink" href="#graph">Graph</a></li>
+							<li class="lidivider">|</li>
+							<li><a id="jobslistViewLink" href="#jobslist">Job List</a></li>
+						</ul>
+					</div>
+					<div id="graphView">
+						<div class="relative">
+							<div id="jobList">
+								<div id="filterList">
+									<input id="filter" placeholder="  Job Filter" />
+								</div>
+								<div id="list">
+								</div>
+								<div id="resetPanZoomBtn" class="btn5" >Reset Pan Zoom</div>
+							</div>
+							<div id="svgDiv" >
+								<svg id="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
+								</svg>
+							</div>
+						</div>
+					</div>
+					<div id="jobListView">
+					<p>This is my joblist view</p>
+					</div>
+				</div>
+#end
+		<ul id="jobMenu" class="contextMenu">  
+			<li class="open"><a href="#open">Open...</a></li>
+			<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
+		</ul>
+
+		</div>
+	</body>
+</html>
+
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 5ba18ad..a806c9f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -45,6 +45,9 @@
 						<div class="section-sub-hd">
 							<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
 						</div>
+						
+						<a id="execute-btn" class="btn1" href="#">Execute Now</a>
+						<a id="execute-btn" class="btn2" href="#">Execute Custom</a>
 					</div>
 					
 					<div id="headertabs" class="headertabs">
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 0cad317..e7c81f7 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -352,6 +352,12 @@ tr:hover td {
   float: right;
   margin-right: 25px;
 }
+.section-hd .btn2,
+.section-ft .btn2 {
+  float: right;
+  margin-right: 25px;
+}
+
 
 /* blue */
 .btn2 {