diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index fc47a15..9a1ba5c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -22,7 +22,6 @@ import azkaban.executor.JdbcExecutorLoader;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
-
/**
* This Guice module is currently a one place container for all bindings in the current module. This
* is intended to help during the migration process to Guice. Once this class starts growing we can
@@ -32,6 +31,7 @@ public class AzkabanExecServerModule extends AbstractModule {
@Override
protected void configure() {
+ install(new ExecJettyServerModule());
bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
bind(TriggerManager.class).in(Scopes.SINGLETON);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 8d28cef..72281da 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -17,7 +17,10 @@
package azkaban.execapp;
import static azkaban.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.Constants.ConfigurationKeys;
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+import static azkaban.execapp.ExecJettyServerModule.EXEC_JETTY_SERVER;
+import static azkaban.execapp.ExecJettyServerModule.EXEC_ROOT_CONTEXT;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
@@ -61,6 +64,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
+import javax.inject.Named;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -70,19 +74,14 @@ import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
-import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.thread.QueuedThreadPool;
public class AzkabanExecutorServer {
public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
- public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY = "jmx.attribute.processor.class";
private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
- private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
- private static final int DEFAULT_THREAD_NUMBER = 50;
private static AzkabanExecutorServer app;
@@ -92,6 +91,7 @@ public class AzkabanExecutorServer {
private final MetricsManager metricsManager;
private final Props props;
private final Server server;
+ private final Context root;
private final ArrayList<ObjectName> registeredMBeans = new ArrayList<>();
private MBeanServer mbeanServer;
@@ -99,41 +99,19 @@ public class AzkabanExecutorServer {
@Inject
public AzkabanExecutorServer(final Props props,
final ExecutorLoader executionLoader,
- final FlowRunnerManager runnerManager, final MetricsManager metricsManager,
- final ExecMetrics execMetrics) throws Exception {
+ final FlowRunnerManager runnerManager,
+ final MetricsManager metricsManager,
+ final ExecMetrics execMetrics,
+ @Named(EXEC_JETTY_SERVER) final Server server,
+ @Named(EXEC_ROOT_CONTEXT) final Context root) throws Exception {
this.props = props;
this.executionLoader = executionLoader;
this.runnerManager = runnerManager;
+
this.metricsManager = metricsManager;
this.execMetrics = execMetrics;
-
- this.server = createJettyServer(props);
-
- JmxJobMBeanManager.getInstance().initialize(props);
-
- // make sure this happens before
- configureJobCallback(props);
-
- configureMBeanServer();
- configureMetricReports();
-
- loadCustomJMXAttributeProcessor(props);
-
- try {
- this.server.start();
- } catch (final Exception e) {
- logger.error(e);
- Utils.croak(e.getMessage(), 1);
- }
-
- insertExecutorEntryIntoDB();
- dumpPortToFile();
-
- logger.info("Started Executor Server on " + getExecutorHostPort());
-
- if (props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
- startExecMetrics();
- }
+ this.server = server;
+ this.root = root;
}
/**
@@ -168,6 +146,7 @@ public class AzkabanExecutorServer {
}
public static void launch(final AzkabanExecutorServer azkabanExecutorServer) throws Exception {
+ azkabanExecutorServer.start();
setupTimeZone(azkabanExecutorServer.getAzkabanProps());
app = azkabanExecutorServer;
@@ -237,43 +216,34 @@ public class AzkabanExecutorServer {
}
}
- private Server createJettyServer(final Props props) {
- final int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
-
- /*
- * Default to a port number 0 (zero)
- * The Jetty server automatically finds an unused port when the port number is set to zero
- * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
- */
- final Server server = new Server(props.getInt("executor.port", 0));
- final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
- server.setThreadPool(httpThreadPool);
-
- final boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
- logger.info("Setting up connector with stats on: " + isStatsOn);
-
- for (final Connector connector : server.getConnectors()) {
- connector.setStatsOn(isStatsOn);
- logger.info(String.format(
- "Jetty connector name: %s, default header buffer size: %d",
- connector.getName(), connector.getHeaderBufferSize()));
- connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
- DEFAULT_HEADER_BUFFER_SIZE));
- logger.info(String.format(
- "Jetty connector name: %s, (if) new header buffer size: %d",
- connector.getName(), connector.getHeaderBufferSize()));
+ private void start() throws Exception {
+ this.root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+
+ JmxJobMBeanManager.getInstance().initialize(this.props);
+
+ // make sure this happens before
+ configureJobCallback(this.props);
+
+ configureMBeanServer();
+ configureMetricReports();
+
+ loadCustomJMXAttributeProcessor(this.props);
+
+ try {
+ this.server.start();
+ } catch (final Exception e) {
+ logger.error(e);
+ Utils.croak(e.getMessage(), 1);
}
- final Context root = new Context(server, "/", Context.SESSIONS);
- root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
+ insertExecutorEntryIntoDB();
+ dumpPortToFile();
- root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
- root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
- root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
- root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
+ logger.info("Started Executor Server on " + getExecutorHostPort());
- root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
- return server;
+ if (this.props.getBoolean(ConfigurationKeys.IS_METRICS_ENABLED, false)) {
+ startExecMetrics();
+ }
}
private void startExecMetrics() throws Exception {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
new file mode 100644
index 0000000..f449281
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
@@ -0,0 +1,77 @@
+package azkaban.execapp;
+
+import azkaban.utils.Props;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import javax.inject.Named;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+public class ExecJettyServerModule extends AbstractModule {
+
+ public static final String EXEC_JETTY_SERVER = "ExecServer";
+ public static final String EXEC_ROOT_CONTEXT = "root";
+
+ private static final int DEFAULT_THREAD_NUMBER = 50;
+ private static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
+ private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
+
+ private static final Logger logger = Logger.getLogger(ExecJettyServerModule.class);
+
+ @Override
+ protected void configure() {
+ }
+
+ @Provides
+ @Named(EXEC_JETTY_SERVER)
+ @Singleton
+ private Server createJettyServer(final Props props) {
+ final int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
+
+ /*
+ * Default to a port number 0 (zero)
+ * The Jetty server automatically finds an unused port when the port number is set to zero
+ * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
+ */
+ final Server server = new Server(props.getInt("executor.port", 0));
+ final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+ server.setThreadPool(httpThreadPool);
+
+ final boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
+ logger.info("Setting up connector with stats on: " + isStatsOn);
+
+ for (final Connector connector : server.getConnectors()) {
+ connector.setStatsOn(isStatsOn);
+ logger.info(String.format(
+ "Jetty connector name: %s, default header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
+ connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
+ DEFAULT_HEADER_BUFFER_SIZE));
+ logger.info(String.format(
+ "Jetty connector name: %s, (if) new header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
+ }
+
+ return server;
+ }
+
+ @Provides
+ @Named(EXEC_ROOT_CONTEXT)
+ @Singleton
+ private Context createRootContext(@Named(EXEC_JETTY_SERVER) final Server server) {
+ final Context root = new Context(server, "/", Context.SESSIONS);
+ root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
+
+ root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
+ root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
+ root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+ root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
+ return root;
+ }
+
+}