azkaban-aplcache

Refactoring Web Server and related code. (#1314) The motivation

8/9/2017 1:01:32 AM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index aa4ca2f..dbc8c9b 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -122,6 +122,8 @@ public class AzkabanWebServer extends AzkabanServer {
   private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
   private static final String DEFAULT_STATIC_DIR = "";
+
+  @Deprecated
   private static AzkabanWebServer app;
   private final VelocityEngine velocityEngine;
 
@@ -136,8 +138,6 @@ public class AzkabanWebServer extends AzkabanServer {
   private final SessionCache sessionCache;
   private final List<ObjectName> registeredMBeans = new ArrayList<>();
 
-  //queuedThreadPool is mainly used to monitor jetty threadpool.
-  private QueuedThreadPool queuedThreadPool;
   private Map<String, TriggerPlugin> triggerPlugins;
   private MBeanServer mbeanServer;
 
@@ -184,13 +184,11 @@ public class AzkabanWebServer extends AzkabanServer {
     configureMBeanServer();
   }
 
+  @Deprecated
   public static AzkabanWebServer getInstance() {
     return app;
   }
 
-  /**
-   * Azkaban using Jetty
-   */
   public static void main(final String[] args) throws Exception {
     // Redirect all std out and err messages into log4j
     StdOutErrRedirect.redirectOutAndErrToLog();
@@ -216,7 +214,7 @@ public class AzkabanWebServer extends AzkabanServer {
     app = webServer;
 
     // TODO refactor code into ServerProvider
-    prepareAndStartServer(webServer.getServerProps(), app.server);
+    webServer.prepareAndStartServer();
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
 
@@ -230,14 +228,14 @@ public class AzkabanWebServer extends AzkabanServer {
 
         logger.info("Shutting down http server...");
         try {
-          app.close();
+          webServer.close();
         } catch (final Exception e) {
           logger.error("Error while shutting down http server.", e);
         }
         logger.info("kk thx bye.");
       }
 
-      public void logTopMemoryConsumers() throws Exception, IOException {
+      public void logTopMemoryConsumers() throws Exception {
         if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
             && new File("/usr/bin/head").exists()) {
           logger.info("logging top memeory consumer");
@@ -261,98 +259,6 @@ public class AzkabanWebServer extends AzkabanServer {
     });
   }
 
-  private static void prepareAndStartServer(final Props azkabanSettings, final Server server)
-      throws Exception {
-    validateDatabaseVersion(azkabanSettings);
-    configureRoutes(server, azkabanSettings);
-
-    if (azkabanSettings.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
-      app.startWebMetrics();
-    }
-    try {
-      server.start();
-      logger.info("Server started");
-    } catch (final Exception e) {
-      logger.warn(e);
-      Utils.croak(e.getMessage(), 1);
-    }
-  }
-
-  private static void validateDatabaseVersion(final Props azkabanSettings)
-      throws IOException, SQLException {
-    final boolean checkDB = azkabanSettings
-        .getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, false);
-    if (checkDB) {
-      final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(azkabanSettings);
-      setup.loadTableInfo();
-      if (setup.needsUpdating()) {
-        logger.error("Database is out of date.");
-        setup.printUpgradePlan();
-
-        logger.error("Exiting with error.");
-        System.exit(-1);
-      }
-    }
-  }
-
-  private static void configureRoutes(final Server server, final Props azkabanSettings)
-      throws TriggerManagerException {
-    final int maxThreads = azkabanSettings
-        .getInt("jetty.maxThreads", Constants.DEFAULT_JETTY_MAX_THREAD_COUNT);
-
-    final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
-    app.setThreadPool(httpThreadPool);
-    server.setThreadPool(httpThreadPool);
-
-    final String staticDir =
-        azkabanSettings.getString("web.resource.dir", DEFAULT_STATIC_DIR);
-    logger.info("Setting up web resource dir " + staticDir);
-    final Context root = new Context(server, "/", Context.SESSIONS);
-    root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
-
-    final String defaultServletPath =
-        azkabanSettings.getString("azkaban.default.servlet.path", "/index");
-    root.setResourceBase(staticDir);
-    final ServletHolder indexRedirect =
-        new ServletHolder(new IndexRedirectServlet(defaultServletPath));
-    root.addServlet(indexRedirect, "/");
-    final ServletHolder index = new ServletHolder(new ProjectServlet());
-    root.addServlet(index, "/index");
-
-    final ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
-    root.addServlet(staticServlet, "/css/*");
-    root.addServlet(staticServlet, "/js/*");
-    root.addServlet(staticServlet, "/images/*");
-    root.addServlet(staticServlet, "/fonts/*");
-    root.addServlet(staticServlet, "/favicon.ico");
-
-    root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager");
-    root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
-    root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
-    root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
-    root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
-    root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
-    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
-
-    final ServletHolder restliHolder = new ServletHolder(new RestliServlet());
-    restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
-    root.addServlet(restliHolder, "/restli/*");
-
-    final String viewerPluginDir =
-        azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
-    loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine());
-
-    // triggerplugin
-    final Map<String, TriggerPlugin> triggerPlugins =
-        new TriggerPluginLoader(azkabanSettings).loadTriggerPlugins(root);
-    app.setTriggerPlugins(triggerPlugins);
-    // always have basic time trigger
-    // TODO: find something else to do the job
-    app.getTriggerManager().start();
-
-    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
-  }
-
   private static void loadViewerPlugins(final Context root, final String pluginPath,
       final VelocityEngine ve) {
     final File viewerPluginPath = new File(pluginPath);
@@ -518,45 +424,115 @@ public class AzkabanWebServer extends AzkabanServer {
     ve.addProperty("jar.resource.loader.path", jarResourcePath);
   }
 
-  /**
-   * Loads the Azkaban property file from the AZKABAN_HOME conf directory
-   */
-  private static Props loadConfigurationFromAzkabanHome() {
-    final String azkabanHome = System.getenv("AZKABAN_HOME");
+  private void validateDatabaseVersion()
+      throws IOException, SQLException {
+    final boolean checkDB = this.props
+        .getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, false);
+    if (checkDB) {
+      final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(this.props);
+      setup.loadTableInfo();
+      if (setup.needsUpdating()) {
+        logger.error("Database is out of date.");
+        setup.printUpgradePlan();
 
-    if (azkabanHome == null) {
-      logger.error("AZKABAN_HOME not set. Will try default.");
-      return null;
+        logger.error("Exiting with error.");
+        System.exit(-1);
+      }
     }
+  }
 
-    if (!new File(azkabanHome).isDirectory()
-        || !new File(azkabanHome).canRead()) {
-      logger.error(azkabanHome + " is not a readable directory.");
-      return null;
-    }
+  private void configureRoutes() throws TriggerManagerException {
+    final String staticDir =
+        this.props.getString("web.resource.dir", DEFAULT_STATIC_DIR);
+    logger.info("Setting up web resource dir " + staticDir);
+    final Context root = new Context(this.server, "/", Context.SESSIONS);
+    root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
 
-    final File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
-    if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
-      logger
-          .error(azkabanHome + " does not contain a readable conf directory.");
-      return null;
-    }
+    final String defaultServletPath =
+        this.props.getString("azkaban.default.servlet.path", "/index");
+    root.setResourceBase(staticDir);
+    final ServletHolder indexRedirect =
+        new ServletHolder(new IndexRedirectServlet(defaultServletPath));
+    root.addServlet(indexRedirect, "/");
+    final ServletHolder index = new ServletHolder(new ProjectServlet());
+    root.addServlet(index, "/index");
+
+    final ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
+    root.addServlet(staticServlet, "/css/*");
+    root.addServlet(staticServlet, "/js/*");
+    root.addServlet(staticServlet, "/images/*");
+    root.addServlet(staticServlet, "/fonts/*");
+    root.addServlet(staticServlet, "/favicon.ico");
+
+    root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager");
+    root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
+    root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
+    root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
+    root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
+    root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
+    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+
+    final ServletHolder restliHolder = new ServletHolder(new RestliServlet());
+    restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
+    root.addServlet(restliHolder, "/restli/*");
 
-    return loadAzkabanConfigurationFromDirectory(confPath);
+    final String viewerPluginDir =
+        this.props.getString("viewer.plugin.dir", "plugins/viewer");
+    loadViewerPlugins(root, viewerPluginDir, getVelocityEngine());
+
+    // Trigger Plugin Loader
+    final TriggerPluginLoader triggerPluginLoader = new TriggerPluginLoader(this.props);
+
+    final Map<String, TriggerPlugin> triggerPlugins = triggerPluginLoader.loadTriggerPlugins(root);
+    setTriggerPlugins(triggerPlugins);
+    // always have basic time trigger
+    // TODO: find something else to do the job
+    getTriggerManager().start();
+
+    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
   }
 
-  private void startWebMetrics() throws Exception {
+  private void prepareAndStartServer()
+      throws Exception {
+    validateDatabaseVersion();
+    createThreadPool();
+    configureRoutes();
+
+    if (this.props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
+      startWebMetrics();
+    }
+    try {
+      this.server.start();
+      logger.info("Server started");
+    } catch (final Exception e) {
+      logger.warn(e);
+      Utils.croak(e.getMessage(), 1);
+    }
+  }
+
+  private void createThreadPool() {
+    final int maxThreads = this.props
+        .getInt("jetty.maxThreads", Constants.DEFAULT_JETTY_MAX_THREAD_COUNT);
 
+    final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+    this.server.setThreadPool(httpThreadPool);
+    addThreadPoolGauges(httpThreadPool);
+  }
+
+  private void addThreadPoolGauges(final QueuedThreadPool threadPool) {
     // The number of idle threads in Jetty thread pool
-    this.metricsManager.addGauge("JETTY-NumIdleThreads", this.queuedThreadPool::getIdleThreads);
+    this.metricsManager.addGauge("JETTY-NumIdleThreads", threadPool::getIdleThreads);
 
     // The number of threads in Jetty thread pool. The formula is:
     // threads = idleThreads + busyThreads
-    this.metricsManager.addGauge("JETTY-NumTotalThreads", this.queuedThreadPool::getThreads);
+    this.metricsManager.addGauge("JETTY-NumTotalThreads", threadPool::getThreads);
 
     // The number of requests queued in the Jetty thread pool.
-    this.metricsManager.addGauge("JETTY-NumQueueSize", this.queuedThreadPool::getQueueSize);
+    this.metricsManager.addGauge("JETTY-NumQueueSize", threadPool::getQueueSize);
+  }
 
+
+  private void startWebMetrics() throws Exception {
     this.metricsManager.addGauge("WEB-NumQueuedFlows", this.executorManager::getQueuedFlowSize);
     /*
      * TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
@@ -649,9 +625,8 @@ public class AzkabanWebServer extends AzkabanServer {
 
     registerMbean("jetty", new JmxJettyServer(this.server));
     registerMbean("triggerManager", new JmxTriggerManager(this.triggerManager));
-    if (this.executorManager instanceof ExecutorManager) {
-      registerMbean("executorManager", new JmxExecutorManager(
-          (ExecutorManager) this.executorManager));
+    if (this.executorManager != null) {
+      registerMbean("executorManager", new JmxExecutorManager(this.executorManager));
     }
 
     // Register Log4J loggers as JMX beans so the log level can be
@@ -727,8 +702,4 @@ public class AzkabanWebServer extends AzkabanServer {
       return null;
     }
   }
-
-  private void setThreadPool(final QueuedThreadPool queuedThreadPool) {
-    this.queuedThreadPool = queuedThreadPool;
-  }
 }