azkaban-developers

Refactor AzkabanExecutorServer (#1113) `AzkabanExecServer`

8/23/2017 11:21:30 AM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index fc47a15..9a1ba5c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -22,7 +22,6 @@ import azkaban.executor.JdbcExecutorLoader;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 
-
 /**
  * This Guice module is currently a one place container for all bindings in the current module. This
  * is intended to help during the migration process to Guice. Once this class starts growing we can
@@ -32,6 +31,7 @@ public class AzkabanExecServerModule extends AbstractModule {
 
   @Override
   protected void configure() {
+    install(new ExecJettyServerModule());
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
     bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
     bind(TriggerManager.class).in(Scopes.SINGLETON);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 8d28cef..72281da 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -17,7 +17,10 @@
 package azkaban.execapp;
 
 import static azkaban.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.Constants.ConfigurationKeys;
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+import static azkaban.execapp.ExecJettyServerModule.EXEC_JETTY_SERVER;
+import static azkaban.execapp.ExecJettyServerModule.EXEC_ROOT_CONTEXT;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
@@ -61,6 +64,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimeZone;
+import javax.inject.Named;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -70,19 +74,14 @@ import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
-import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.thread.QueuedThreadPool;
 
 public class AzkabanExecutorServer {
 
   public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
   public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
-  public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
   private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY = "jmx.attribute.processor.class";
   private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
-  private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
-  private static final int DEFAULT_THREAD_NUMBER = 50;
 
   private static AzkabanExecutorServer app;
 
@@ -92,6 +91,7 @@ public class AzkabanExecutorServer {
   private final MetricsManager metricsManager;
   private final Props props;
   private final Server server;
+  private final Context root;
 
   private final ArrayList<ObjectName> registeredMBeans = new ArrayList<>();
   private MBeanServer mbeanServer;
@@ -99,41 +99,19 @@ public class AzkabanExecutorServer {
   @Inject
   public AzkabanExecutorServer(final Props props,
       final ExecutorLoader executionLoader,
-      final FlowRunnerManager runnerManager, final MetricsManager metricsManager,
-      final ExecMetrics execMetrics) throws Exception {
+      final FlowRunnerManager runnerManager,
+      final MetricsManager metricsManager,
+      final ExecMetrics execMetrics,
+      @Named(EXEC_JETTY_SERVER) final Server server,
+      @Named(EXEC_ROOT_CONTEXT) final Context root) throws Exception {
     this.props = props;
     this.executionLoader = executionLoader;
     this.runnerManager = runnerManager;
+
     this.metricsManager = metricsManager;
     this.execMetrics = execMetrics;
-
-    this.server = createJettyServer(props);
-
-    JmxJobMBeanManager.getInstance().initialize(props);
-
-    // make sure this happens before
-    configureJobCallback(props);
-
-    configureMBeanServer();
-    configureMetricReports();
-
-    loadCustomJMXAttributeProcessor(props);
-
-    try {
-      this.server.start();
-    } catch (final Exception e) {
-      logger.error(e);
-      Utils.croak(e.getMessage(), 1);
-    }
-
-    insertExecutorEntryIntoDB();
-    dumpPortToFile();
-
-    logger.info("Started Executor Server on " + getExecutorHostPort());
-
-    if (props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
-      startExecMetrics();
-    }
+    this.server = server;
+    this.root = root;
   }
 
   /**
@@ -168,6 +146,7 @@ public class AzkabanExecutorServer {
   }
 
   public static void launch(final AzkabanExecutorServer azkabanExecutorServer) throws Exception {
+    azkabanExecutorServer.start();
     setupTimeZone(azkabanExecutorServer.getAzkabanProps());
     app = azkabanExecutorServer;
 
@@ -237,43 +216,34 @@ public class AzkabanExecutorServer {
     }
   }
 
-  private Server createJettyServer(final Props props) {
-    final int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
-
-    /*
-     * Default to a port number 0 (zero)
-     * The Jetty server automatically finds an unused port when the port number is set to zero
-     * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
-     */
-    final Server server = new Server(props.getInt("executor.port", 0));
-    final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
-    server.setThreadPool(httpThreadPool);
-
-    final boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
-    logger.info("Setting up connector with stats on: " + isStatsOn);
-
-    for (final Connector connector : server.getConnectors()) {
-      connector.setStatsOn(isStatsOn);
-      logger.info(String.format(
-          "Jetty connector name: %s, default header buffer size: %d",
-          connector.getName(), connector.getHeaderBufferSize()));
-      connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
-          DEFAULT_HEADER_BUFFER_SIZE));
-      logger.info(String.format(
-          "Jetty connector name: %s, (if) new header buffer size: %d",
-          connector.getName(), connector.getHeaderBufferSize()));
+  private void start() throws Exception {
+    this.root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+
+    JmxJobMBeanManager.getInstance().initialize(this.props);
+
+    // make sure this happens before
+    configureJobCallback(this.props);
+
+    configureMBeanServer();
+    configureMetricReports();
+
+    loadCustomJMXAttributeProcessor(this.props);
+
+    try {
+      this.server.start();
+    } catch (final Exception e) {
+      logger.error(e);
+      Utils.croak(e.getMessage(), 1);
     }
 
-    final Context root = new Context(server, "/", Context.SESSIONS);
-    root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
+    insertExecutorEntryIntoDB();
+    dumpPortToFile();
 
-    root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
-    root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
-    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
-    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
+    logger.info("Started Executor Server on " + getExecutorHostPort());
 
-    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
-    return server;
+    if (this.props.getBoolean(ConfigurationKeys.IS_METRICS_ENABLED, false)) {
+      startExecMetrics();
+    }
   }
 
   private void startExecMetrics() throws Exception {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
new file mode 100644
index 0000000..f449281
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
@@ -0,0 +1,77 @@
+package azkaban.execapp;
+
+import azkaban.utils.Props;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import javax.inject.Named;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+public class ExecJettyServerModule extends AbstractModule {
+
+  public static final String EXEC_JETTY_SERVER = "ExecServer";
+  public static final String EXEC_ROOT_CONTEXT = "root";
+
+  private static final int DEFAULT_THREAD_NUMBER = 50;
+  private static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
+  private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
+
+  private static final Logger logger = Logger.getLogger(ExecJettyServerModule.class);
+
+  @Override
+  protected void configure() {
+  }
+
+  @Provides
+  @Named(EXEC_JETTY_SERVER)
+  @Singleton
+  private Server createJettyServer(final Props props) {
+    final int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
+
+    /*
+     * Default to a port number 0 (zero)
+     * The Jetty server automatically finds an unused port when the port number is set to zero
+     * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
+     */
+    final Server server = new Server(props.getInt("executor.port", 0));
+    final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+    server.setThreadPool(httpThreadPool);
+
+    final boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
+    logger.info("Setting up connector with stats on: " + isStatsOn);
+
+    for (final Connector connector : server.getConnectors()) {
+      connector.setStatsOn(isStatsOn);
+      logger.info(String.format(
+          "Jetty connector name: %s, default header buffer size: %d",
+          connector.getName(), connector.getHeaderBufferSize()));
+      connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
+          DEFAULT_HEADER_BUFFER_SIZE));
+      logger.info(String.format(
+          "Jetty connector name: %s, (if) new header buffer size: %d",
+          connector.getName(), connector.getHeaderBufferSize()));
+    }
+
+    return server;
+  }
+
+  @Provides
+  @Named(EXEC_ROOT_CONTEXT)
+  @Singleton
+  private Context createRootContext(@Named(EXEC_JETTY_SERVER) final Server server) {
+    final Context root = new Context(server, "/", Context.SESSIONS);
+    root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
+
+    root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
+    root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
+    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
+    return root;
+  }
+
+}