Details
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index 023d1ed..60c96c3 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -22,6 +22,7 @@ import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerStatus;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -41,7 +42,8 @@ import org.joda.time.format.DateTimeFormatter;
* TODO kunkun-tang: When new AZ quartz Scheduler comes, we will remove this class.
*/
public class ScheduleManager implements TriggerAgent {
- public static final String triggerSource = "SimpleTimeTrigger";
+
+ public static final String SIMPLE_TIME_TRIGGER = "SimpleTimeTrigger";
private static final Logger logger = Logger.getLogger(ScheduleManager.class);
private final DateTimeFormatter _dateFormat = DateTimeFormat
.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
@@ -57,6 +59,7 @@ public class ScheduleManager implements TriggerAgent {
* schedule.
*
*/
+ @Inject
public ScheduleManager(final ScheduleLoader loader) {
this.loader = loader;
}
@@ -241,6 +244,6 @@ public class ScheduleManager implements TriggerAgent {
@Override
public String getTriggerSource() {
- return triggerSource;
+ return SIMPLE_TIME_TRIGGER;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index ab0bdb8..95003cc 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -26,6 +26,7 @@ import azkaban.trigger.TriggerManagerAdapter;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
+import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -43,10 +44,10 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private long lastUpdateTime = -1;
- public TriggerBasedScheduleLoader(final TriggerManager triggerManager,
- final String triggerSource) {
+ @Inject
+ public TriggerBasedScheduleLoader(final TriggerManager triggerManager) {
this.triggerManager = triggerManager;
- this.triggerSource = triggerSource;
+ this.triggerSource = ScheduleManager.SIMPLE_TIME_TRIGGER;
}
private Trigger scheduleToTrigger(final Schedule s) {
diff --git a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
index 7351f7d..8c29a0d 100644
--- a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
+++ b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
@@ -19,6 +19,7 @@ package azkaban.server.session;
import azkaban.utils.Props;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.inject.Inject;
import java.util.concurrent.TimeUnit;
/**
@@ -40,6 +41,7 @@ public class SessionCache {
/**
* Constructor taking global props.
*/
+ @Inject
public SessionCache(final Props props) {
this.cache = CacheBuilder.newBuilder()
.maximumSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS))
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index bd19b14..745e46d 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -29,9 +29,7 @@ import azkaban.jmx.JmxTriggerManager;
import azkaban.metrics.MetricsManager;
import azkaban.metrics.MetricsUtility;
import azkaban.project.ProjectManager;
-import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
-import azkaban.scheduler.TriggerBasedScheduleLoader;
import azkaban.server.AzkabanServer;
import azkaban.server.session.SessionCache;
import azkaban.trigger.TriggerManager;
@@ -44,7 +42,6 @@ import azkaban.trigger.builtin.KillExecutionAction;
import azkaban.trigger.builtin.SlaAlertAction;
import azkaban.trigger.builtin.SlaChecker;
import azkaban.user.UserManager;
-import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -91,9 +88,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.jmx.HierarchyDynamicMBean;
import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.runtime.log.Log4JLogChute;
-import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
-import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
@@ -130,8 +124,6 @@ public class AzkabanWebServer extends AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
- private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
- private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
private static final String DEFAULT_STATIC_DIR = "";
private static AzkabanWebServer app;
private final VelocityEngine velocityEngine;
@@ -142,53 +134,50 @@ public class AzkabanWebServer extends AzkabanServer {
private final ExecutorManager executorManager;
private final ScheduleManager scheduleManager;
private final TriggerManager triggerManager;
- private final ClassLoader baseClassLoader;
private final MetricRegistry registry;
private final MetricsManager metricsManager;
private final Props props;
private final SessionCache sessionCache;
private final List<ObjectName> registeredMBeans = new ArrayList<>();
+
//queuedThreadPool is mainly used to monitor jetty threadpool.
private QueuedThreadPool queuedThreadPool;
private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
- /**
- * Constructor usually called by tomcat AzkabanServletContext to create the
- * initial server
- */
- public AzkabanWebServer() throws Exception {
- this(null, loadConfigurationFromAzkabanHome());
- }
-
@Inject
- public AzkabanWebServer(final Server server, final Props props) throws Exception {
- this.props = requireNonNull(props);
- this.server = server;
-
- this.velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
- this.sessionCache = new SessionCache(props);
- this.userManager = loadUserManager(props);
-
- // TODO remove hack. Move injection to constructor
- this.executorManager = SERVICE_PROVIDER.getInstance(ExecutorManager.class);
- this.projectManager = SERVICE_PROVIDER.getInstance(ProjectManager.class);
- this.triggerManager = SERVICE_PROVIDER.getInstance(TriggerManager.class);
- this.metricsManager = SERVICE_PROVIDER.getInstance(MetricsManager.class);
- this.registry = SERVICE_PROVIDER.getInstance(MetricRegistry.class);
+ public AzkabanWebServer(final Props props,
+ final Server server,
+ final ExecutorManager executorManager,
+ final ProjectManager projectManager,
+ final TriggerManager triggerManager,
+ final MetricsManager metricsManager,
+ final MetricRegistry metricRegistry,
+ final SessionCache sessionCache,
+ final UserManager userManager,
+ final ScheduleManager scheduleManager,
+ final VelocityEngine velocityEngine) {
+ this.props = requireNonNull(props, "props is null.");
+ this.server = requireNonNull(server, "server is null.");
+ this.executorManager = requireNonNull(executorManager, "executorManager is null.");
+ this.projectManager = requireNonNull(projectManager, "projectManager is null.");
+ this.triggerManager = requireNonNull(triggerManager, "triggerManager is null.");
+ this.metricsManager = requireNonNull(metricsManager, "metricsManager is null.");
+ this.registry = requireNonNull(metricRegistry, "metricRegistry is null.");
+ this.sessionCache = requireNonNull(sessionCache, "sessionCache is null.");
+ this.userManager = requireNonNull(userManager, "userManager is null.");
+ this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
+ this.velocityEngine = requireNonNull(velocityEngine, "velocityEngine is null.");
loadBuiltinCheckersAndActions();
// load all trigger agents here
- this.scheduleManager = loadScheduleManager(this.triggerManager);
final String triggerPluginDir =
props.getString("trigger.plugin.dir", "plugins/triggers");
loadPluginCheckersAndActions(triggerPluginDir);
- this.baseClassLoader = this.getClassLoader();
-
// Setup time zone
if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
final String timezone = props.getString(DEFAULT_TIMEZONE_ID);
@@ -739,32 +728,6 @@ public class AzkabanWebServer extends AzkabanServer {
this.metricsManager.startReporting("AZ-WEB", this.props);
}
- private UserManager loadUserManager(final Props props) {
- final Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
- final UserManager manager;
- if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
- logger.info("Loading user manager class " + userManagerClass.getName());
- try {
- final Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
- manager = (UserManager) userManagerConstructor.newInstance(props);
- } catch (final Exception e) {
- logger.error("Could not instantiate UserManager " + userManagerClass.getName());
- throw new RuntimeException(e);
- }
- } else {
- manager = new XmlUserManager(props);
- }
- return manager;
- }
-
- private ScheduleManager loadScheduleManager(final TriggerManager tm)
- throws Exception {
- logger.info("Loading trigger based scheduler");
- final ScheduleLoader loader =
- new TriggerBasedScheduleLoader(tm, ScheduleManager.triggerSource);
- return new ScheduleManager(loader);
- }
-
private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
ExecuteFlowAction.setExecutorManager(this.executorManager);
@@ -791,7 +754,7 @@ public class AzkabanWebServer extends AzkabanServer {
return;
}
- final ClassLoader parentLoader = this.getClassLoader();
+ final ClassLoader parentLoader = getClass().getClassLoader();
final File[] pluginDirs = triggerPluginPath.listFiles();
final ArrayList<String> jarPaths = new ArrayList<>();
for (final File pluginDir : pluginDirs) {
@@ -944,44 +907,6 @@ public class AzkabanWebServer extends AzkabanServer {
}
/**
- * Creates and configures the velocity engine.
- */
- private VelocityEngine configureVelocityEngine(final boolean devMode) {
- final VelocityEngine engine = new VelocityEngine();
- engine.setProperty("resource.loader", "classpath, jar");
- engine.setProperty("classpath.resource.loader.class",
- ClasspathResourceLoader.class.getName());
- engine.setProperty("classpath.resource.loader.cache", !devMode);
- engine.setProperty("classpath.resource.loader.modificationCheckInterval",
- 5L);
- engine.setProperty("jar.resource.loader.class",
- JarResourceLoader.class.getName());
- engine.setProperty("jar.resource.loader.cache", !devMode);
- engine.setProperty("resource.manager.logwhenfound", false);
- engine.setProperty("input.encoding", "UTF-8");
- engine.setProperty("output.encoding", "UTF-8");
- engine.setProperty("directive.set.null.allowed", true);
- engine.setProperty("resource.manager.logwhenfound", false);
- engine.setProperty("velocimacro.permissions.allow.inline", true);
- engine.setProperty("velocimacro.library.autoreload", devMode);
- engine.setProperty("velocimacro.library",
- "/azkaban/webapp/servlet/velocity/macros.vm");
- engine.setProperty(
- "velocimacro.permissions.allow.inline.to.replace.global", true);
- engine.setProperty("velocimacro.arguments.strict", true);
- engine.setProperty("runtime.log.invalid.references", devMode);
- engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
- engine.setProperty("runtime.log.logsystem.log4j.logger",
- Logger.getLogger("org.apache.velocity.Logger"));
- engine.setProperty("parser.pool.size", 3);
- return engine;
- }
-
- public ClassLoader getClassLoader() {
- return this.baseClassLoader;
- }
-
- /**
* Returns the global azkaban properties
*/
@Override
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index de4effa..4641c83 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -17,8 +17,22 @@
package azkaban.webapp;
+import azkaban.scheduler.ScheduleLoader;
+import azkaban.scheduler.TriggerBasedScheduleLoader;
+import azkaban.user.UserManager;
+import azkaban.user.XmlUserManager;
+import azkaban.utils.Props;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import java.lang.reflect.Constructor;
+import org.apache.log4j.Logger;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.runtime.log.Log4JLogChute;
+import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
import org.mortbay.jetty.Server;
/**
@@ -28,9 +42,71 @@ import org.mortbay.jetty.Server;
*/
public class AzkabanWebServerModule extends AbstractModule {
+ private static final Logger log = Logger.getLogger(AzkabanWebServerModule.class);
+ private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
+ private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
+
@Override
protected void configure() {
bind(Server.class).toProvider(WebServerProvider.class);
bind(AzkabanWebServer.class).in(Scopes.SINGLETON);
+ bind(ScheduleLoader.class).to(TriggerBasedScheduleLoader.class);
+ }
+
+ @Inject
+ @Singleton
+ @Provides
+ public UserManager createUserManager(final Props props) {
+ final Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
+ final UserManager manager;
+ if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
+ log.info("Loading user manager class " + userManagerClass.getName());
+ try {
+ final Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
+ manager = (UserManager) userManagerConstructor.newInstance(props);
+ } catch (final Exception e) {
+ log.error("Could not instantiate UserManager " + userManagerClass.getName());
+ throw new RuntimeException(e);
+ }
+ } else {
+ manager = new XmlUserManager(props);
+ }
+ return manager;
+ }
+
+ @Inject
+ @Singleton
+ @Provides
+ public VelocityEngine createVelocityEngine(final Props props) {
+ final boolean devMode = props.getBoolean(VELOCITY_DEV_MODE_PARAM, false);
+
+ final VelocityEngine engine = new VelocityEngine();
+ engine.setProperty("resource.loader", "classpath, jar");
+ engine.setProperty("classpath.resource.loader.class",
+ ClasspathResourceLoader.class.getName());
+ engine.setProperty("classpath.resource.loader.cache", !devMode);
+ engine.setProperty("classpath.resource.loader.modificationCheckInterval",
+ 5L);
+ engine.setProperty("jar.resource.loader.class",
+ JarResourceLoader.class.getName());
+ engine.setProperty("jar.resource.loader.cache", !devMode);
+ engine.setProperty("resource.manager.logwhenfound", false);
+ engine.setProperty("input.encoding", "UTF-8");
+ engine.setProperty("output.encoding", "UTF-8");
+ engine.setProperty("directive.set.null.allowed", true);
+ engine.setProperty("resource.manager.logwhenfound", false);
+ engine.setProperty("velocimacro.permissions.allow.inline", true);
+ engine.setProperty("velocimacro.library.autoreload", devMode);
+ engine.setProperty("velocimacro.library",
+ "/azkaban/webapp/servlet/velocity/macros.vm");
+ engine.setProperty(
+ "velocimacro.permissions.allow.inline.to.replace.global", true);
+ engine.setProperty("velocimacro.arguments.strict", true);
+ engine.setProperty("runtime.log.invalid.references", devMode);
+ engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
+ engine.setProperty("runtime.log.logsystem.log4j.logger",
+ Logger.getLogger("org.apache.velocity.Logger"));
+ engine.setProperty("parser.pool.size", 3);
+ return engine;
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 381cf77..190ba1d 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -16,6 +16,8 @@
package azkaban.webapp.servlet;
+import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+
import azkaban.server.AzkabanServer;
import azkaban.server.HttpRequestUtils;
import azkaban.server.session.Session;
@@ -45,11 +47,6 @@ import org.joda.time.DateTime;
*/
public abstract class AbstractAzkabanServlet extends HttpServlet {
- public static final String DEFAULT_LOG_URL_PREFIX =
- "predefined_log_url_prefix";
- public static final String LOG_URL_PREFIX = "log_url_prefix";
- public static final String HTML_TYPE = "text/html";
- public static final String XML_MIME_TYPE = "application/xhtml+xml";
public static final String JSON_MIME_TYPE = "application/json";
public static final String jarVersion = AbstractAzkabanServlet.class.getPackage()
.getImplementationVersion();
@@ -69,22 +66,6 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
private List<ViewerPlugin> viewerPlugins;
private List<TriggerPlugin> triggerPlugins;
- /**
- * Retrieve the Azkaban application
- */
- public static AzkabanWebServer getApp(final ServletConfig config) {
- final AzkabanWebServer app =
- (AzkabanWebServer) config.getServletContext().getAttribute(
- AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
-
- if (app == null) {
- throw new IllegalStateException(
- "No batch application is defined in the servlet context!");
- } else {
- return app;
- }
- }
-
public static String createJsonResponse(final String status, final String message,
final String action, final Map<String, Object> params) {
final HashMap<String, Object> response = new HashMap<>();
@@ -111,9 +92,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
@Override
public void init(final ServletConfig config) throws ServletException {
- this.application =
- (AzkabanServer) config.getServletContext().getAttribute(
- AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+ this.application = SERVICE_PROVIDER.getInstance(AzkabanWebServer.class);
if (this.application == null) {
throw new IllegalStateException(