diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index c0621a0..adec51b 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -33,6 +33,9 @@ import azkaban.user.UserManager;
import azkaban.utils.Props;
import azkaban.server.session.SessionCache;
+import static azkaban.Constants.*;
+
+
public abstract class AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanServer.class);
private static Props azkabanProperties = null;
@@ -47,12 +50,11 @@ public abstract class AzkabanServer {
}
public static Props loadProps(String[] args, OptionParser parser) {
- ;
- OptionSpec<String> configDirectory =
- parser
- .acceptsAll(Arrays.asList("c", "conf"),
- "The conf directory for Azkaban.").withRequiredArg()
- .describedAs("conf").ofType(String.class);
+ OptionSpec<String> configDirectory = parser.acceptsAll(
+ Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
+ .withRequiredArg()
+ .describedAs("conf")
+ .ofType(String.class);
// Grabbing the azkaban settings from the conf directory.
Props azkabanSettings = null;
@@ -75,12 +77,27 @@ public abstract class AzkabanServer {
azkabanSettings = loadConfigurationFromAzkabanHome();
}
+ if (azkabanSettings != null) {
+ updateDerivedConfigs(azkabanSettings);
+ }
return azkabanSettings;
}
- private static Props loadAzkabanConfigurationFromDirectory(File dir) {
- File azkabanPrivatePropsFile =
- new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ private static void updateDerivedConfigs(Props azkabanSettings) {
+ final boolean isSslEnabled = azkabanSettings.getBoolean("jetty.use.ssl", true);
+ final int port = isSslEnabled
+ ? azkabanSettings.getInt("jetty.ssl.port", DEFAULT_SSL_PORT_NUMBER)
+ : azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
+
+ // setting stats configuration for connectors
+ final String hostname = azkabanSettings.getString("jetty.hostname", "localhost");
+ azkabanSettings.put("server.hostname", hostname);
+ azkabanSettings.put("server.port", port);
+ azkabanSettings.put("server.useSSL", String.valueOf(isSslEnabled));
+ }
+
+ public static Props loadAzkabanConfigurationFromDirectory(File dir) {
+ File azkabanPrivatePropsFile = new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
Props props = null;
@@ -98,18 +115,15 @@ public abstract class AzkabanServer {
} catch (FileNotFoundException e) {
logger.error("File not found. Could not load azkaban config file", e);
} catch (IOException e) {
- logger.error(
- "File found, but error reading. Could not load azkaban config file",
- e);
+ logger.error("File found, but error reading. Could not load azkaban config file", e);
}
-
return props;
}
/**
* Loads the Azkaban property file from the AZKABAN_HOME conf directory
*
- * @return
+ * @return Props instance
*/
private static Props loadConfigurationFromAzkabanHome() {
String azkabanHome = System.getenv("AZKABAN_HOME");
@@ -118,17 +132,14 @@ public abstract class AzkabanServer {
logger.error("AZKABAN_HOME not set. Will try default.");
return null;
}
-
- if (!new File(azkabanHome).isDirectory()
- || !new File(azkabanHome).canRead()) {
+ if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
logger.error(azkabanHome + " is not a readable directory.");
return null;
}
File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
- logger
- .error(azkabanHome + " does not contain a readable conf directory.");
+ logger.error(azkabanHome + " does not contain a readable conf directory.");
return null;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index c1bdf14..a6f6bff 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -29,7 +29,6 @@ import org.mortbay.thread.QueuedThreadPool;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
@@ -419,72 +418,10 @@ public class AzkabanExecutorServer {
});
}
- /**
- * Loads the Azkaban property file from the AZKABAN_HOME conf directory
- *
- * @return
- */
- /* package */static Props loadConfigurationFromAzkabanHome() {
- String azkabanHome = System.getenv("AZKABAN_HOME");
-
- if (azkabanHome == null) {
- logger.error("AZKABAN_HOME not set. Will try default.");
- return null;
- }
-
- if (!new File(azkabanHome).isDirectory()
- || !new File(azkabanHome).canRead()) {
- logger.error(azkabanHome + " is not a readable directory.");
- return null;
- }
-
- File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
- if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
- logger
- .error(azkabanHome + " does not contain a readable conf directory.");
- return null;
- }
-
- return loadAzkabanConfigurationFromDirectory(confPath);
- }
-
public FlowRunnerManager getFlowRunnerManager() {
return runnerManager;
}
- /**
- * Loads the Azkaban conf file int a Props object
- *
- * @return
- */
- private static Props loadAzkabanConfigurationFromDirectory(File dir) {
- File azkabanPrivatePropsFile =
- new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
-
- Props props = null;
- try {
- // This is purely optional
- if (azkabanPrivatePropsFile.exists() && azkabanPrivatePropsFile.isFile()) {
- logger.info("Loading azkaban private properties file");
- props = new Props(null, azkabanPrivatePropsFile);
- }
-
- if (azkabanPropsFile.exists() && azkabanPropsFile.isFile()) {
- logger.info("Loading azkaban properties file");
- props = new Props(props, azkabanPropsFile);
- }
- } catch (FileNotFoundException e) {
- logger.error("File not found. Could not load azkaban config file", e);
- } catch (IOException e) {
- logger.error(
- "File found, but error reading. Could not load azkaban config file",
- e);
- }
-
- return props;
- }
-
private void configureMBeanServer() {
logger.info("Registering MBeans...");
mbeanServer = ManagementFactory.getPlatformMBeanServer();
@@ -632,5 +569,6 @@ public class AzkabanExecutorServer {
server.destroy();
SystemMemoryInfo.shutdown();
getFlowRunnerManager().shutdownNow();
+ close();
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 3b650e6..2ec0fdc 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -21,7 +21,6 @@ import com.codahale.metrics.MetricRegistry;
import com.google.inject.Guice;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -30,6 +29,7 @@ import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -140,17 +140,11 @@ public class AzkabanWebServer extends AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
public static final String DEFAULT_CONF_PATH = "conf";
- public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
- public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
- "azkaban.private.properties";
-
private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
private static final int MAX_HEADER_BUFFER_SIZE = 10 * 1024 * 1024;
private static AzkabanWebServer app;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
- private static final int DEFAULT_PORT_NUMBER = 8081;
- private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
private static final int DEFAULT_THREAD_NUMBER = 20;
private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
@@ -174,7 +168,6 @@ public class AzkabanWebServer extends AzkabanServer {
private final Props props;
private final SessionCache sessionCache;
- private final File tempDir;
private final List<ObjectName> registeredMBeans = new ArrayList<>();
private Map<String, TriggerPlugin> triggerPlugins;
@@ -217,7 +210,7 @@ public class AzkabanWebServer extends AzkabanServer {
loadBuiltinCheckersAndActions();
// load all trigger agents here
- scheduleManager = loadScheduleManager(triggerManager, props);
+ scheduleManager = loadScheduleManager(triggerManager);
String triggerPluginDir =
props.getString("trigger.plugin.dir", "plugins/triggers");
@@ -226,8 +219,6 @@ public class AzkabanWebServer extends AzkabanServer {
baseClassLoader = this.getClassLoader();
- tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
-
// Setup time zone
if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
String timezone = props.getString(DEFAULT_TIMEZONE_ID);
@@ -292,11 +283,10 @@ public class AzkabanWebServer extends AzkabanServer {
private ExecutorManager loadExecutorManager(Props props) throws Exception {
JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
- ExecutorManager execManager = new ExecutorManager(props, loader, alerters);
- return execManager;
+ return new ExecutorManager(props, loader, alerters);
}
- private ScheduleManager loadScheduleManager(TriggerManager tm, Props props)
+ private ScheduleManager loadScheduleManager(TriggerManager tm)
throws Exception {
logger.info("Loading trigger based scheduler");
ScheduleLoader loader =
@@ -312,31 +302,24 @@ public class AzkabanWebServer extends AzkabanServer {
private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
- if (triggerManager instanceof TriggerManager) {
- SlaChecker.setExecutorManager(executorManager);
- ExecuteFlowAction.setExecutorManager(executorManager);
- ExecuteFlowAction.setProjectManager(projectManager);
- ExecuteFlowAction.setTriggerManager(triggerManager);
- KillExecutionAction.setExecutorManager(executorManager);
- SlaAlertAction.setExecutorManager(executorManager);
- SlaAlertAction.setAlerters(alerters);
- SlaAlertAction.setExecutorManager(executorManager);
- CreateTriggerAction.setTriggerManager(triggerManager);
- ExecutionChecker.setExecutorManager(executorManager);
- }
- triggerManager.registerCheckerType(BasicTimeChecker.type,
- BasicTimeChecker.class);
+ SlaChecker.setExecutorManager(executorManager);
+ ExecuteFlowAction.setExecutorManager(executorManager);
+ ExecuteFlowAction.setProjectManager(projectManager);
+ ExecuteFlowAction.setTriggerManager(triggerManager);
+ KillExecutionAction.setExecutorManager(executorManager);
+ SlaAlertAction.setExecutorManager(executorManager);
+ SlaAlertAction.setAlerters(alerters);
+ SlaAlertAction.setExecutorManager(executorManager);
+ CreateTriggerAction.setTriggerManager(triggerManager);
+ ExecutionChecker.setExecutorManager(executorManager);
+
+ triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
- triggerManager.registerCheckerType(ExecutionChecker.type,
- ExecutionChecker.class);
- triggerManager.registerActionType(ExecuteFlowAction.type,
- ExecuteFlowAction.class);
- triggerManager.registerActionType(KillExecutionAction.type,
- KillExecutionAction.class);
- triggerManager
- .registerActionType(SlaAlertAction.type, SlaAlertAction.class);
- triggerManager.registerActionType(CreateTriggerAction.type,
- CreateTriggerAction.class);
+ triggerManager.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
+ triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+ triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+ triggerManager.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
}
private Map<String, Alerter> loadAlerters(Props props) {
@@ -618,25 +601,14 @@ public class AzkabanWebServer extends AzkabanServer {
return velocityEngine;
}
- /**
- *
- * @return
- */
public UserManager getUserManager() {
return userManager;
}
- /**
- *
- * @return
- */
public ProjectManager getProjectManager() {
return projectManager;
}
- /**
- *
- */
public ExecutorManager getExecutorManager() {
return executorManager;
}
@@ -712,15 +684,63 @@ public class AzkabanWebServer extends AzkabanServer {
Props azkabanSettings = AzkabanServer.loadProps(args);
if (azkabanSettings == null) {
- logger.error("Azkaban Properties not loaded.");
- logger.error("Exiting Azkaban...");
- return;
+ logger.error("Azkaban Properties not loaded. Exiting..");
+ System.exit(1);
}
- int maxThreads =
- azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
- boolean isStatsOn =
- azkabanSettings.getBoolean("jetty.connector.stats", true);
+ final Server server = createServer(azkabanSettings);
+
+ app = new AzkabanWebServer(server, azkabanSettings);
+
+ prepareAndStartServer(azkabanSettings, server);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+
+ public void run() {
+ try {
+ logTopMemoryConsumers();
+ } catch (Exception e) {
+ logger.info(("Exception when logging top memory consumers"), e);
+ }
+
+ logger.info("Shutting down http server...");
+ try {
+ app.close();
+ server.stop();
+ server.destroy();
+ } catch (Exception e) {
+ logger.error("Error while shutting down http server.", e);
+ }
+ logger.info("kk thx bye.");
+ }
+
+ public void logTopMemoryConsumers() throws Exception, IOException {
+ if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
+ && new File("/usr/bin/head").exists()) {
+ logger.info("logging top memeory consumer");
+
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c",
+ "/bin/ps aux --sort -rss | /usr/bin/head");
+ Process p = processBuilder.start();
+ p.waitFor();
+
+ InputStream is = p.getInputStream();
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(is));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ logger.info(line);
+ }
+ is.close();
+ }
+ }
+ });
+ }
+
+ private static Server createServer(Props azkabanSettings) {
+ final int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
+ boolean isStatsOn = azkabanSettings.getBoolean("jetty.connector.stats", true);
logger.info("Setting up connector with stats on: " + isStatsOn);
boolean ssl;
@@ -728,7 +748,7 @@ public class AzkabanWebServer extends AzkabanServer {
final Server server = new Server();
if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
int sslPortNumber =
- azkabanSettings.getInt("jetty.ssl.port", DEFAULT_SSL_PORT_NUMBER);
+ azkabanSettings.getInt("jetty.ssl.port", Constants.DEFAULT_SSL_PORT_NUMBER);
port = sslPortNumber;
ssl = true;
logger.info("Setting up Jetty Https Server with port:" + sslPortNumber
@@ -756,7 +776,7 @@ public class AzkabanWebServer extends AzkabanServer {
server.addConnector(secureConnector);
} else {
ssl = false;
- port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
+ port = azkabanSettings.getInt("jetty.port", Constants.DEFAULT_PORT_NUMBER);
SocketConnector connector = new SocketConnector();
connector.setPort(port);
connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
@@ -768,16 +788,28 @@ public class AzkabanWebServer extends AzkabanServer {
connector.setStatsOn(isStatsOn);
}
- String hostname = azkabanSettings.getString("jetty.hostname", "localhost");
- azkabanSettings.put("server.hostname", hostname);
- azkabanSettings.put("server.port", port);
- azkabanSettings.put("server.useSSL", String.valueOf(ssl));
+ logger.info(String.format("Starting %sserver on port: %d", ssl ? "SSL " : "", port));
+ return server;
+ }
- app = new AzkabanWebServer(server, azkabanSettings);
+ private static void prepareAndStartServer(Props azkabanSettings, Server server) throws Exception {
+ validateDatabaseVersion(azkabanSettings);
+ configureRoutes(server, azkabanSettings);
- boolean checkDB =
- azkabanSettings.getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION,
- false);
+ if (azkabanSettings.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
+ app.startWebMetrics();
+ }
+ try {
+ server.start();
+ logger.info("Server started");
+ } catch (Exception e) {
+ logger.warn(e);
+ Utils.croak(e.getMessage(), 1);
+ }
+ }
+
+ private static void validateDatabaseVersion(Props azkabanSettings) throws IOException, SQLException {
+ boolean checkDB = azkabanSettings.getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, false);
if (checkDB) {
AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(azkabanSettings);
setup.loadTableInfo();
@@ -789,6 +821,10 @@ public class AzkabanWebServer extends AzkabanServer {
System.exit(-1);
}
}
+ }
+
+ private static void configureRoutes(Server server, Props azkabanSettings) throws TriggerManagerException {
+ final int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
app.setThreadPool(httpThreadPool);
@@ -843,62 +879,6 @@ public class AzkabanWebServer extends AzkabanServer {
app.getTriggerManager().start();
root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
-
-
- if (azkabanSettings.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
- app.startWebMetrics();
- }
- try {
- server.start();
- } catch (Exception e) {
- logger.warn(e);
- Utils.croak(e.getMessage(), 1);
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
-
- public void run() {
- try {
- logTopMemoryConsumers();
- } catch (Exception e) {
- logger.info(("Exception when logging top memory consumers"), e);
- }
-
- logger.info("Shutting down http server...");
- try {
- app.close();
- server.stop();
- server.destroy();
- } catch (Exception e) {
- logger.error("Error while shutting down http server.", e);
- }
- logger.info("kk thx bye.");
- }
-
- public void logTopMemoryConsumers() throws Exception, IOException {
- if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
- && new File("/usr/bin/head").exists()) {
- logger.info("logging top memeory consumer");
-
- java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c",
- "/bin/ps aux --sort -rss | /usr/bin/head");
- Process p = processBuilder.start();
- p.waitFor();
-
- InputStream is = p.getInputStream();
- java.io.BufferedReader reader =
- new java.io.BufferedReader(new InputStreamReader(is));
- String line = null;
- while ((line = reader.readLine()) != null) {
- logger.info(line);
- }
- is.close();
- }
- }
- });
- logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
- + ".");
}
private static Map<String, TriggerPlugin> loadTriggerPlugins(Context root,
@@ -1242,43 +1222,6 @@ public class AzkabanWebServer extends AzkabanServer {
return loadAzkabanConfigurationFromDirectory(confPath);
}
- /**
- * Returns the set temp dir
- *
- * @return
- */
- public File getTempDirectory() {
- return tempDir;
- }
-
- private static Props loadAzkabanConfigurationFromDirectory(File dir) {
- File azkabanPrivatePropsFile =
- new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
-
- Props props = null;
- try {
- // This is purely optional
- if (azkabanPrivatePropsFile.exists() && azkabanPrivatePropsFile.isFile()) {
- logger.info("Loading azkaban private properties file");
- props = new Props(null, azkabanPrivatePropsFile);
- }
-
- if (azkabanPropsFile.exists() && azkabanPropsFile.isFile()) {
- logger.info("Loading azkaban properties file");
- props = new Props(props, azkabanPropsFile);
- }
- } catch (FileNotFoundException e) {
- logger.error("File not found. Could not load azkaban config file", e);
- } catch (IOException e) {
- logger.error(
- "File found, but error reading. Could not load azkaban config file",
- e);
- }
-
- return props;
- }
-
private void configureMBeanServer() {
logger.info("Registering MBeans...");
mbeanServer = ManagementFactory.getPlatformMBeanServer();