azkaban-aplcache
Changes
.classpath 1(+1 -0)
conf/azkaban.properties 7(+7 -0)
lib/httpclient-4.2.1.jar 0(+0 -0)
src/java/azkaban/executor/AzkabanExecutorServer.java 226(+226 -0)
src/web/css/azkaban.css 6(+6 -0)
Details
.classpath 1(+1 -0)
diff --git a/.classpath b/.classpath
index 9368e98..93e98de 100644
--- a/.classpath
+++ b/.classpath
@@ -20,5 +20,6 @@
<classpathentry kind="lib" path="lib/velocity-1.7.jar"/>
<classpathentry kind="lib" path="lib/ehcache-core-2.5.1.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+ <classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
<classpathentry kind="output" path="dist"/>
</classpath>
conf/azkaban.properties 7(+7 -0)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 0eabd49..5ddba5f 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -27,3 +27,10 @@ jetty.password=password
jetty.keypassword=password
jetty.truststore=keystore
jetty.trustpassword=password
+
+# Azkaban Executor settings
+executor.maxThreads=50
+executor.port=12321
+executor.shared.token=abcdefg
+
+
lib/httpclient-4.2.1.jar 0(+0 -0)
diff --git a/lib/httpclient-4.2.1.jar b/lib/httpclient-4.2.1.jar
new file mode 100644
index 0000000..1d52333
Binary files /dev/null and b/lib/httpclient-4.2.1.jar differ
src/java/azkaban/executor/AzkabanExecutorServer.java 226(+226 -0)
diff --git a/src/java/azkaban/executor/AzkabanExecutorServer.java b/src/java/azkaban/executor/AzkabanExecutorServer.java
new file mode 100644
index 0000000..f781e97
--- /dev/null
+++ b/src/java/azkaban/executor/AzkabanExecutorServer.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+import azkaban.utils.Props;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+public class AzkabanExecutorServer {
+ private static final Logger logger = Logger
+ .getLogger(AzkabanExecutorServer.class);
+
+ public static final String AZKABAN_HOME = "AZKABAN_HOME";
+ public static final String DEFAULT_CONF_PATH = "conf";
+ public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+
+ private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+ private static final int DEFAULT_PORT_NUMBER = 8081;
+ private static final int DEFAULT_THREAD_NUMBER = 50;
+
+ private static AzkabanExecutorServer app;
+
+ private Props props;
+ private File tempDir;
+ private Server server;
+
+ /**
+ * Constructor
+ *
+ * @throws Exception
+ */
+ public AzkabanExecutorServer(Props props) throws Exception {
+ this.props = props;
+
+ int portNumber = props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+ int maxThreads = props.getInt("executor.maxThreads",
+ DEFAULT_THREAD_NUMBER);
+
+ Server server = new Server(portNumber);
+ QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+ server.setThreadPool(httpThreadPool);
+
+ Context root = new Context(server, "/", Context.SESSIONS);
+ String sharedToken = props.getString("executor.shared.token", "");
+ ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
+ root.addServlet(executorHolder, "/");
+
+ server.start();
+ logger.info("Azkaban Executor Server started on port " + portNumber);
+
+ tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
+ }
+
+ public void stopServer() throws Exception {
+ server.stop();
+ server.destroy();
+ }
+
+ /**
+ * Returns the global azkaban properties
+ *
+ * @return
+ */
+ public Props getAzkabanProps() {
+ return props;
+ }
+
+ /**
+ * Azkaban using Jetty
+ *
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws Exception {
+ OptionParser parser = new OptionParser();
+
+ OptionSpec<String> configDirectory = parser
+ .acceptsAll(Arrays.asList("c", "conf"),
+ "The conf directory for Azkaban.").withRequiredArg()
+ .describedAs("conf").ofType(String.class);
+
+ logger.error("Starting Jetty Azkaban Executor...");
+
+ // Grabbing the azkaban settings from the conf directory.
+ Props azkabanSettings = null;
+ OptionSet options = parser.parse(args);
+ if (options.has(configDirectory)) {
+ String path = options.valueOf(configDirectory);
+ logger.info("Loading azkaban settings file from " + path);
+ File file = new File(path, AZKABAN_PROPERTIES_FILE);
+ if (!file.exists() || file.isDirectory() || !file.canRead()) {
+ logger.error("Cannot read file " + file);
+ }
+
+ azkabanSettings = loadAzkabanConfiguration(file.getPath());
+ } else {
+ logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
+ azkabanSettings = loadConfigurationFromAzkabanHome();
+ }
+
+ if (azkabanSettings == null) {
+ logger.error("Azkaban Properties not loaded.");
+ logger.error("Exiting Azkaban Executor Server...");
+ return;
+ }
+
+ // Setup time zone
+ if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) {
+ String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID);
+ TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+ DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+
+ logger.info("Setting timezone to " + timezone);
+ }
+
+ app = new AzkabanExecutorServer(azkabanSettings);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+
+ public void run() {
+ logger.info("Shutting down http server...");
+ try {
+ app.stopServer();
+ } catch (Exception e) {
+ logger.error("Error while shutting down http server.", e);
+ }
+ logger.info("kk thx bye.");
+ }
+ });
+ }
+
+ /**
+ * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+ *
+ * @return
+ */
+ private static Props loadConfigurationFromAzkabanHome() {
+ String azkabanHome = System.getenv("AZKABAN_HOME");
+
+ if (azkabanHome == null) {
+ logger.error("AZKABAN_HOME not set. Will try default.");
+ return null;
+ }
+
+ if (!new File(azkabanHome).isDirectory()
+ || !new File(azkabanHome).canRead()) {
+ logger.error(azkabanHome + " is not a readable directory.");
+ return null;
+ }
+
+ File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+ if (!confPath.exists() || !confPath.isDirectory()
+ || !confPath.canRead()) {
+ logger.error(azkabanHome
+ + " does not contain a readable conf directory.");
+ return null;
+ }
+
+ File confFile = new File(confPath, AZKABAN_PROPERTIES_FILE);
+ if (!confFile.exists() || confFile.isDirectory() || !confPath.canRead()) {
+ logger.error(confFile + " does not contain a readable azkaban.properties file.");
+ return null;
+ }
+
+ return loadAzkabanConfiguration(confFile.getPath());
+ }
+
+ /**
+ * Returns the set temp dir
+ *
+ * @return
+ */
+ public File getTempDirectory() {
+ return tempDir;
+ }
+
+ /**
+ * Loads the Azkaban conf file int a Props object
+ *
+ * @param path
+ * @return
+ */
+ private static Props loadAzkabanConfiguration(String path) {
+ try {
+ return new Props(null, path);
+ } catch (FileNotFoundException e) {
+ logger.error("File not found. Could not load azkaban config file "
+ + path);
+ } catch (IOException e) {
+ logger.error("File found, but error reading. Could not load azkaban config file "
+ + path);
+ }
+
+ return null;
+ }
+}
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
new file mode 100644
index 0000000..580d95d
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -0,0 +1,20 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+
+import azkaban.flow.Flow;
+import azkaban.utils.Props;
+
+public class ExecutableFlow {
+ private Flow flow;
+ private HashMap<String, Props> sourceProps = new HashMap<String, Props>();
+
+ public ExecutableFlow(Flow flow, HashMap<String,Props> sourceProps) {
+ this.flow = flow;
+ this.sourceProps = sourceProps;
+ }
+
+ public void run() {
+
+ }
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 30f7582..a099c60 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1,5 +1,21 @@
package azkaban.executor;
-public interface ExecutorManager {
+import azkaban.utils.Props;
+public class ExecutorManager {
+ private String token;
+
+ public ExecutorManager(Props props) {
+ token = props.getString("executor.shared.token", "");
+ }
+
+ public void executeJob() {
+
+ }
+
+ private class ExecutorThread extends Thread {
+ public void run() {
+
+ }
+ }
}
diff --git a/src/java/azkaban/executor/ExecutorServlet.java b/src/java/azkaban/executor/ExecutorServlet.java
new file mode 100644
index 0000000..3d57999
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorServlet.java
@@ -0,0 +1,47 @@
+package azkaban.executor;
+
+import java.io.IOException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+public class ExecutorServlet extends HttpServlet {
+ private static final Logger logger = Logger.getLogger(ExecutorServlet.class.getName());
+ private String sharedToken;
+
+ public ExecutorServlet(String token) {
+ super();
+ sharedToken = token;
+ }
+
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ String token = getParam(req, "sharedToken");
+
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+
+ }
+
+ /**
+ * Duplicated code with AbstractAzkabanServlet, but ne
+ */
+ public boolean hasParam(HttpServletRequest request, String param) {
+ return request.getParameter(param) != null;
+ }
+
+ public String getParam(HttpServletRequest request, String name)
+ throws ServletException {
+ String p = request.getParameter(name);
+ if (p == null)
+ throw new ServletException("Missing required parameter '" + name
+ + "'.");
+ else
+ return p;
+ }
+}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 7e93afe..d56eae7 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -79,6 +79,8 @@ public class AzkabanWebServer {
public static final String DEFAULT_CONF_PATH = "conf";
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+ private static AzkabanWebServer app;
+
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
//private static final int DEFAULT_PORT_NUMBER = 8081;
private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
@@ -122,6 +124,8 @@ public class AzkabanWebServer {
String timezone = props.getString(DEFAULT_TIMEZONE_ID);
TimeZone.setDefault(TimeZone.getTimeZone(timezone));
DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+
+ logger.info("Setting timezone to " + timezone);
}
}
@@ -291,7 +295,7 @@ public class AzkabanWebServer {
logger.error("Exiting Azkaban...");
return;
}
- AzkabanWebServer app = new AzkabanWebServer(azkabanSettings);
+ app = new AzkabanWebServer(azkabanSettings);
//int portNumber = azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",DEFAULT_SSL_PORT_NUMBER);
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 143b9ae..c591351 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -11,16 +11,13 @@ import azkaban.webapp.session.Session;
public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
@Override
- protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
- // TODO Auto-generated method stub
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
}
@Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
- // TODO Auto-generated method stub
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+
}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 5458d33..c7da369 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -54,7 +54,6 @@ public abstract class LoginAbstractAzkabanServlet extends
return;
}
-
if (session != null) {
logger.info("Found session " + session.getUser());
handleGet(req, resp, session);
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
new file mode 100644
index 0000000..5ba18ad
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -0,0 +1,86 @@
+<!DOCTYPE html>
+<html>
+ <head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+ <script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>
+ <script type="text/javascript" src="${context}/js/namespace.js"></script>
+ <script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+ <script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
+ <script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
+
+ var projectName = "${project.name}";
+ var flowName = "${flowid}";
+ </script>
+ <link rel="stylesheet" type="text/css" href="${context}/css/jquery.contextMenu.custom.css" />
+ </head>
+ <body>
+#set($current_page="all")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+ <div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
+ <div class="content">
+#if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+#else
+#if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+#elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+#end
+
+ <div id="all-jobs-content">
+ <div class="section-hd">
+ <h2><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h2>
+ <div class="section-sub-hd">
+ <h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
+ </div>
+ </div>
+
+ <div id="headertabs" class="headertabs">
+ <ul>
+ <li><a id="graphViewLink" href="#graph">Graph</a></li>
+ <li class="lidivider">|</li>
+ <li><a id="jobslistViewLink" href="#jobslist">Job List</a></li>
+ </ul>
+ </div>
+ <div id="graphView">
+ <div class="relative">
+ <div id="jobList">
+ <div id="filterList">
+ <input id="filter" placeholder=" Job Filter" />
+ </div>
+ <div id="list">
+ </div>
+ <div id="resetPanZoomBtn" class="btn5" >Reset Pan Zoom</div>
+ </div>
+ <div id="svgDiv" >
+ <svg id="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
+ </svg>
+ </div>
+ </div>
+ </div>
+ <div id="jobListView">
+ <p>This is my joblist view</p>
+ </div>
+ </div>
+#end
+ <ul id="jobMenu" class="contextMenu">
+ <li class="open"><a href="#open">Open...</a></li>
+ <li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
+ </ul>
+
+ </div>
+ </body>
+</html>
+
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 5ba18ad..a806c9f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -45,6 +45,9 @@
<div class="section-sub-hd">
<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
</div>
+
+ <a id="execute-btn" class="btn1" href="#">Execute Now</a>
+ <a id="execute-btn" class="btn2" href="#">Execute Custom</a>
</div>
<div id="headertabs" class="headertabs">
src/web/css/azkaban.css 6(+6 -0)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 0cad317..e7c81f7 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -352,6 +352,12 @@ tr:hover td {
float: right;
margin-right: 25px;
}
+.section-hd .btn2,
+.section-ft .btn2 {
+ float: right;
+ margin-right: 25px;
+}
+
/* blue */
.btn2 {