Details
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index f6742c1..bb9a7e1 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -38,6 +38,7 @@ import azkaban.storage.StorageImplementationType;
import azkaban.trigger.JdbcTriggerImpl;
import azkaban.trigger.TriggerLoader;
import azkaban.utils.Props;
+import com.codahale.metrics.MetricRegistry;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
@@ -83,6 +84,7 @@ public class AzkabanCommonModule extends AbstractModule {
bind(DataSource.class).to(AzkabanDataSource.class);
bind(ExecutorManager.class).in(Scopes.SINGLETON);
bind(AlerterHolder.class).in(Scopes.SINGLETON);
+ bind(MetricRegistry.class).in(Scopes.SINGLETON);
}
public Class<? extends Storage> resolveStorageClassType() {
diff --git a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
index a2e432e..6bf3915 100644
--- a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
+++ b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
@@ -27,14 +27,16 @@ import org.apache.commons.dbutils.QueryRunner;
public abstract class AbstractJdbcLoader {
private final AzkabanDataSource dataSource;
+ private final CommonMetrics commonMetrics;
- public AbstractJdbcLoader(final Props props) {
+ public AbstractJdbcLoader(final Props props, final CommonMetrics commonMetrics) {
this.dataSource = DataSourceUtils.getDataSource(props);
+ this.commonMetrics = commonMetrics;
}
protected Connection getDBConnection(final boolean autoCommit) throws IOException {
Connection connection = null;
- CommonMetrics.INSTANCE.markDBConnection();
+ this.commonMetrics.markDBConnection();
final long startMs = System.currentTimeMillis();
try {
connection = this.dataSource.getConnection();
@@ -43,7 +45,7 @@ public abstract class AbstractJdbcLoader {
DbUtils.closeQuietly(connection);
throw new IOException("Error getting DB connection.", e);
}
- CommonMetrics.INSTANCE.setDBConnectionTime(System.currentTimeMillis() - startMs);
+ this.commonMetrics.setDBConnectionTime(System.currentTimeMillis() - startMs);
return connection;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 76df007..3efc7c9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -119,15 +119,18 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
private final Props azkProps;
+ private final CommonMetrics commonMetrics;
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
@Inject
- public ExecutorManager(Props azkProps, ExecutorLoader loader, AlerterHolder alerterHolder) throws ExecutorManagerException {
+ public ExecutorManager(Props azkProps, ExecutorLoader loader, AlerterHolder alerterHolder,
+ CommonMetrics commonMetrics) throws ExecutorManagerException {
this.alerterHolder = alerterHolder;
this.azkProps = azkProps;
+ this.commonMetrics = commonMetrics;
this.executorLoader = loader;
this.setupExecutors();
this.loadRunningFlows();
@@ -1530,7 +1533,7 @@ public class ExecutorManager extends EventHandler implements
Status newStatus = flow.getStatus();
if(oldStatus != newStatus && newStatus == Status.FAILED) {
- CommonMetrics.INSTANCE.markFlowFail();
+ this.commonMetrics.markFlowFail();
}
ExecutionOptions options = flow.getExecutionOptions();
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 1f31a0a..2f7b9cf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import azkaban.metrics.CommonMetrics;
import com.google.inject.Inject;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
@@ -61,8 +62,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- public JdbcExecutorLoader(Props props) {
- super(props);
+ public JdbcExecutorLoader(Props props, CommonMetrics commonMetrics) {
+ super(props, commonMetrics);
}
public EncodingType getDefaultEncodingType() {
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index a055a07..02a9abe 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -48,12 +48,15 @@ public class ProcessJob extends AbstractProcessJob {
public static final String KRB5CCNAME = "KRB5CCNAME";
private static final Duration KILL_TIME = Duration.ofSeconds(30);
private static final String MEMCHECK_ENABLED = "memCheck.enabled";
+ private final CommonMetrics commonMetrics;
private volatile AzkabanProcess process;
private volatile boolean killed = false;
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
super(jobId, sysProps, jobProps, log);
+ // TODO: reallocf fully guicify CommonMetrics through ProcessJob dependents
+ this.commonMetrics = SERVICE_PROVIDER.getInstance(CommonMetrics.class);
}
/**
@@ -140,7 +143,7 @@ public class ProcessJob extends AbstractProcessJob {
if (isMemGranted) {
info(String.format("Memory granted for job %s", getId()));
if (attempt > 1) {
- CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+ this.commonMetrics.decrementOOMJobWaitCount();
}
break;
}
@@ -150,7 +153,7 @@ public class ProcessJob extends AbstractProcessJob {
Constants.MEMORY_CHECK_INTERVAL_MS), attempt,
Constants.MEMORY_CHECK_RETRY_LIMIT));
if (attempt == 1) {
- CommonMetrics.INSTANCE.incrementOOMJobWaitCount();
+ this.commonMetrics.incrementOOMJobWaitCount();
}
synchronized (this) {
try {
@@ -161,7 +164,7 @@ public class ProcessJob extends AbstractProcessJob {
}
}
if (this.killed) {
- CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+ this.commonMetrics.decrementOOMJobWaitCount();
info(String.format("Job %s was killed while waiting for memory check retry", getId()));
return;
}
@@ -169,7 +172,7 @@ public class ProcessJob extends AbstractProcessJob {
}
if (!isMemGranted) {
- CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+ this.commonMetrics.decrementOOMJobWaitCount();
handleError(oomMsg, null);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 5532c0a..ee7857e 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -18,6 +18,8 @@ package azkaban.metrics;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -25,8 +27,8 @@ import java.util.concurrent.atomic.AtomicLong;
* which are accessed in both web and exec modules. That said, these metrics will be
* exposed in both Web server and executor.
*/
-public enum CommonMetrics {
- INSTANCE;
+@Singleton
+public class CommonMetrics {
private final AtomicLong dbConnectionTime = new AtomicLong(0L);
private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
@@ -34,8 +36,9 @@ public enum CommonMetrics {
private Meter dbConnectionMeter;
private Meter flowFailMeter;
- CommonMetrics() {
- this.registry = MetricsManager.INSTANCE.getRegistry();
+ @Inject
+ public CommonMetrics(final MetricRegistry metricsRegistry) {
+ this.registry = metricsRegistry;
setupAllMetrics();
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
index d978591..65f0bdf 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -25,6 +25,8 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import java.lang.reflect.Constructor;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@@ -35,17 +37,19 @@ import org.apache.log4j.Logger;
* this class. Also, web servers and executors can call {@link #startReporting(String, Props)} to
* start reporting AZ metrics to remote metrics server.
*/
-public enum MetricsManager {
- INSTANCE;
+@Singleton
+public class MetricsManager {
private static final Logger logger = Logger.getLogger(MetricsManager.class);
- private final MetricRegistry registry = new MetricRegistry();
+ private final MetricRegistry registry;
private ConsoleReporter consoleReporter = null;
/**
* Constructor is eaagerly called when this class is loaded.
*/
- private MetricsManager() {
+ @Inject
+ public MetricsManager(MetricRegistry registry) {
+ this.registry = registry;
this.registry.register("MEMORY_Gauge", new MemoryUsageGaugeSet());
this.registry.register("GC_Gauge", new GarbageCollectorMetricSet());
this.registry.register("Thread_State_Gauge", new ThreadStatesGaugeSet());
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index a707ac2..a54095a 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -18,6 +18,7 @@ package azkaban.project;
import azkaban.database.AbstractJdbcLoader;
import azkaban.flow.Flow;
+import azkaban.metrics.CommonMetrics;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
@@ -64,8 +65,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- public JdbcProjectLoader(final Props props) {
- super(props);
+ public JdbcProjectLoader(final Props props, final CommonMetrics commonMetrics) {
+ super(props, commonMetrics);
this.tempDir = new File(props.getString("project.temp.dir", "temp"));
if (!this.tempDir.exists()) {
this.tempDir.mkdirs();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index f944421..3014a6c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,6 +16,8 @@
package azkaban.executor;
+import azkaban.metrics.CommonMetrics;
+import com.codahale.metrics.MetricRegistry;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
@@ -66,7 +68,8 @@ public class ExecutorManagerTest {
loader.addExecutor("localhost", 12345);
loader.addExecutor("localhost", 12346);
- return new ExecutorManager(props, loader, new AlerterHolder(props));
+ return new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
}
/*
@@ -80,7 +83,8 @@ public class ExecutorManagerTest {
ExecutorLoader loader = new MockExecutorLoader();
@SuppressWarnings("unused")
ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props));
+ new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
}
/*
@@ -93,7 +97,8 @@ public class ExecutorManagerTest {
ExecutorLoader loader = new MockExecutorLoader();
ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props));
+ new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
@@ -117,7 +122,8 @@ public class ExecutorManagerTest {
Executor executor2 = loader.addExecutor("localhost", 12346);
ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props));
+ new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
@@ -135,7 +141,8 @@ public class ExecutorManagerTest {
Executor executor1 = loader.addExecutor("localhost", 12345);
ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props));
+ new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
new Executor[] { executor1 });
@@ -162,7 +169,8 @@ public class ExecutorManagerTest {
Executor executor1 = loader.addExecutor("localhost", 12345);
ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props));
+ new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(),
@@ -333,7 +341,8 @@ public class ExecutorManagerTest {
executors.add(executor2);
when(loader.fetchActiveExecutors()).thenReturn(executors);
- manager = new ExecutorManager(props, loader, new AlerterHolder(props));
+ manager = new ExecutorManager(props, loader, new AlerterHolder(props),
+ new CommonMetrics(new MetricRegistry()));
flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 6c1138a..2a498f7 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -16,6 +16,8 @@
package azkaban.executor;
+import azkaban.metrics.CommonMetrics;
+import com.codahale.metrics.MetricRegistry;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
@@ -1055,7 +1057,7 @@ public class JdbcExecutorLoaderTest {
props.put("mysql.password", password);
props.put("mysql.numconnections", numConnections);
- return new JdbcExecutorLoader(props);
+ return new JdbcExecutorLoader(props, new CommonMetrics(new MetricRegistry()));
}
private boolean isTestSetup() {
diff --git a/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java b/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
index fb632a8..4ebea5e 100644
--- a/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
@@ -16,6 +16,7 @@
package azkaban.jobtype;
+import static azkaban.test.Utils.initServiceProvider;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -53,6 +54,9 @@ public class JobTypeManagerTest {
@Before
public void setUp() throws Exception {
+ // TODO: reallocf Remove initServiceProvider when ProcessJob fully guiced
+ initServiceProvider();
+
final File jobTypeDir = this.temp.newFolder(TEST_PLUGIN_DIR);
this.testPluginDirPath = jobTypeDir.getCanonicalPath();
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 0a29dee..910b1e0 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -18,6 +18,7 @@ package azkaban.metrics;
import static org.junit.Assert.assertEquals;
+import com.codahale.metrics.MetricRegistry;
import org.junit.Before;
import org.junit.Test;
@@ -29,14 +30,9 @@ public class CommonMetricsTest {
@Before
public void setUp() {
- // Use of global state can cause problems e.g.
- // The state is shared among tests.
- // e.g. we can't run a variant of the testOOMWaitingJobMetrics twice since it relies on the initial state of
- // the registry.
- // This can also cause problem when we run tests in parallel in the future.
- // todo HappyRay: move MetricsManager, CommonMetrics to use Juice.
- this.testUtil = new MetricsTestUtility(MetricsManager.INSTANCE.getRegistry());
- this.metrics = CommonMetrics.INSTANCE;
+ final MetricRegistry metricRegistry = new MetricRegistry();
+ this.testUtil = new MetricsTestUtility(metricRegistry);
+ this.metrics = new CommonMetrics(metricRegistry);
}
@Test
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
index e270d2e..2a7c50e 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -24,8 +24,6 @@ import com.codahale.metrics.MetricRegistry;
*/
public class MetricsTestUtility {
- // todo HappyRay: move singletons to Juice.
- // This can cause problems when we run tests in parallel in the future.
private final MetricRegistry registry;
public MetricsTestUtility(final MetricRegistry registry) {
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index 71d952c..9117c94 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -20,12 +20,14 @@ import azkaban.database.DataSourceUtils;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.Node;
+import azkaban.metrics.CommonMetrics;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
+import com.codahale.metrics.MetricRegistry;
import java.io.File;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -653,7 +655,7 @@ public class JdbcProjectLoaderTest {
props.put("mysql.password", password);
props.put("mysql.numconnections", numConnections);
- return new JdbcProjectLoader(props);
+ return new JdbcProjectLoader(props, new CommonMetrics(new MetricRegistry()));
}
private boolean isTestSetup() {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 6ec2cd7..8667451 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -21,8 +21,10 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.MockExecutorLoader;
+import azkaban.metrics.CommonMetrics;
import azkaban.trigger.builtin.CreateTriggerAction;
import azkaban.utils.Props;
+import com.codahale.metrics.MetricRegistry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -46,7 +48,7 @@ public class TriggerManagerDeadlockTest {
props.put("executor.port", 12321);
this.execLoader = new MockExecutorLoader();
final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
- new AlerterHolder(props));
+ new AlerterHolder(props), new CommonMetrics(new MetricRegistry()));
this.triggerManager = new TriggerManager(props, this.loader, executorManager);
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 0b25874..e34081a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -88,6 +88,7 @@ public class AzkabanExecutorServer {
private final ExecutorLoader executionLoader;
private final FlowRunnerManager runnerManager;
+ private final MetricsManager metricsManager;
private final Props props;
private final Server server;
@@ -97,10 +98,11 @@ public class AzkabanExecutorServer {
@Inject
public AzkabanExecutorServer(final Props props,
final ExecutorLoader executionLoader,
- final FlowRunnerManager runnerManager) throws Exception {
+ final FlowRunnerManager runnerManager, final MetricsManager metricsManager) throws Exception {
this.props = props;
this.executionLoader = executionLoader;
this.runnerManager = runnerManager;
+ this.metricsManager = metricsManager;
this.server = createJettyServer(props);
@@ -274,7 +276,7 @@ public class AzkabanExecutorServer {
ExecMetrics.INSTANCE.addFlowRunnerManagerMetrics(getFlowRunnerManager());
logger.info("starting reporting Executor Metrics");
- MetricsManager.INSTANCE.startReporting("AZ-EXEC", this.props);
+ this.metricsManager.startReporting("AZ-EXEC", this.props);
}
private void insertExecutorEntryIntoDB() throws ExecutorManagerException {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
index 9d6798b..5fffaf9 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -16,6 +16,8 @@
package azkaban.execapp;
+import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+
import azkaban.metrics.MetricsManager;
import azkaban.metrics.MetricsUtility;
import com.codahale.metrics.MetricRegistry;
@@ -29,7 +31,8 @@ public enum ExecMetrics {
private final MetricRegistry registry;
ExecMetrics() {
- this.registry = MetricsManager.INSTANCE.getRegistry();
+ // TODO: reallocf make guicy
+ this.registry = SERVICE_PROVIDER.getInstance(MetricRegistry.class);
setupStaticMetrics();
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index c7008ca..bd19b14 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -143,6 +143,8 @@ public class AzkabanWebServer extends AzkabanServer {
private final ScheduleManager scheduleManager;
private final TriggerManager triggerManager;
private final ClassLoader baseClassLoader;
+ private final MetricRegistry registry;
+ private final MetricsManager metricsManager;
private final Props props;
private final SessionCache sessionCache;
private final List<ObjectName> registeredMBeans = new ArrayList<>();
@@ -172,6 +174,8 @@ public class AzkabanWebServer extends AzkabanServer {
this.executorManager = SERVICE_PROVIDER.getInstance(ExecutorManager.class);
this.projectManager = SERVICE_PROVIDER.getInstance(ProjectManager.class);
this.triggerManager = SERVICE_PROVIDER.getInstance(TriggerManager.class);
+ this.metricsManager = SERVICE_PROVIDER.getInstance(MetricsManager.class);
+ this.registry = SERVICE_PROVIDER.getInstance(MetricRegistry.class);
loadBuiltinCheckersAndActions();
@@ -705,21 +709,21 @@ public class AzkabanWebServer extends AzkabanServer {
private void startWebMetrics() throws Exception {
- final MetricRegistry registry = MetricsManager.INSTANCE.getRegistry();
-
// The number of idle threads in Jetty thread pool
MetricsUtility
- .addGauge("JETTY-NumIdleThreads", registry, this.queuedThreadPool::getIdleThreads);
+ .addGauge("JETTY-NumIdleThreads", this.registry, this.queuedThreadPool::getIdleThreads);
// The number of threads in Jetty thread pool. The formula is:
// threads = idleThreads + busyThreads
- MetricsUtility.addGauge("JETTY-NumTotalThreads", registry, this.queuedThreadPool::getThreads);
+ MetricsUtility
+ .addGauge("JETTY-NumTotalThreads", this.registry, this.queuedThreadPool::getThreads);
// The number of requests queued in the Jetty thread pool.
- MetricsUtility.addGauge("JETTY-NumQueueSize", registry, this.queuedThreadPool::getQueueSize);
+ MetricsUtility
+ .addGauge("JETTY-NumQueueSize", this.registry, this.queuedThreadPool::getQueueSize);
MetricsUtility
- .addGauge("WEB-NumQueuedFlows", registry, this.executorManager::getQueuedFlowSize);
+ .addGauge("WEB-NumQueuedFlows", this.registry, this.executorManager::getQueuedFlowSize);
/**
* TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
* Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
@@ -728,11 +732,11 @@ public class AzkabanWebServer extends AzkabanServer {
* a thread safe subtraction. We need to fix this in the future.
*/
MetricsUtility
- .addGauge("WEB-NumRunningFlows", registry,
+ .addGauge("WEB-NumRunningFlows", this.registry,
() -> this.executorManager.getRunningFlows().size());
logger.info("starting reporting Web Server Metrics");
- MetricsManager.INSTANCE.startReporting("AZ-WEB", this.props);
+ this.metricsManager.startReporting("AZ-WEB", this.props);
}
private UserManager loadUserManager(final Props props) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 6479c8d..3e51b3a 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -16,6 +16,8 @@
package azkaban.webapp.servlet;
+import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+
import azkaban.Constants;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
@@ -66,6 +68,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private static final Logger LOGGER =
Logger.getLogger(ExecutorServlet.class.getName());
private static final long serialVersionUID = 1L;
+ private WebMetrics webMetrics;
private ProjectManager projectManager;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
@@ -81,6 +84,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
this.executorManager = server.getExecutorManager();
this.scheduleManager = server.getScheduleManager();
this.velocityHelper = new ExecutorVelocityHelper();
+ // TODO: reallocf fully guicify
+ this.webMetrics = SERVICE_PROVIDER.getInstance(WebMetrics.class);
}
@Override
@@ -517,7 +522,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
* However, Timer will result in too many accompanying metrics (e.g., min, max, 99th quantile)
* regarding one metrics. We decided to use gauge to do that and monitor how it behaves.
*/
- WebMetrics.INSTANCE.setFetchLogLatency(System.currentTimeMillis() - startMs);
+ this.webMetrics.setFetchLogLatency(System.currentTimeMillis() - startMs);
}
/**
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index f917316..cffcc99 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,6 +16,8 @@
package azkaban.webapp.servlet;
+import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+
import azkaban.project.Project;
import azkaban.server.session.Session;
import azkaban.user.Permission;
@@ -80,6 +82,9 @@ public abstract class LoginAbstractAzkabanServlet extends
private boolean shouldLogRawUserAgent = false;
+ // TODO: reallocf properly guicify WebMetrics
+ private WebMetrics webMetrics = SERVICE_PROVIDER.getInstance(WebMetrics.class);
+
@Override
public void init(final ServletConfig config) throws ServletException {
super.init(config);
@@ -99,7 +104,7 @@ public abstract class LoginAbstractAzkabanServlet extends
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
- WebMetrics.INSTANCE.markWebGetCall();
+ webMetrics.markWebGetCall();
// Set session id
final Session session = getSessionFromRequest(req);
logRequest(req, session);
@@ -274,7 +279,7 @@ public abstract class LoginAbstractAzkabanServlet extends
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
Session session = getSessionFromRequest(req);
- WebMetrics.INSTANCE.markWebPostCall();
+ webMetrics.markWebPostCall();
logRequest(req, session);
// Handle Multipart differently from other post messages
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
index 2c8c695..9b11bce 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
@@ -16,10 +16,11 @@
package azkaban.webapp;
-import azkaban.metrics.MetricsManager;
import azkaban.metrics.MetricsUtility;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,8 +28,8 @@ import java.util.concurrent.atomic.AtomicLong;
* This singleton class WebMetrics is in charge of collecting varieties of metrics
* from azkaban-web-server modules.
*/
-public enum WebMetrics {
- INSTANCE;
+@Singleton
+public class WebMetrics {
private final MetricRegistry registry;
@@ -38,8 +39,9 @@ public enum WebMetrics {
// How long does user log fetch take when user call fetch-log api.
private final AtomicLong logFetchLatency = new AtomicLong(0L);
- WebMetrics() {
- this.registry = MetricsManager.INSTANCE.getRegistry();
+ @Inject
+ WebMetrics(final MetricRegistry registry) {
+ this.registry = registry;
this.webGetCall = MetricsUtility.addMeter("Web-Get-Call-Meter", this.registry);
this.webPostCall = MetricsUtility.addMeter("Web-Post-Call-Meter", this.registry);
MetricsUtility.addGauge("fetchLogLatency", this.registry, this.logFetchLatency::get);
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/servlet/LoginAbstractAzkabanServletTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/servlet/LoginAbstractAzkabanServletTest.java
index f1d2f60..57ec350 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/servlet/LoginAbstractAzkabanServletTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/servlet/LoginAbstractAzkabanServletTest.java
@@ -17,6 +17,7 @@
package azkaban.webapp.servlet;
+import static azkaban.test.Utils.initServiceProvider;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNotSame;
import static org.mockito.Mockito.mock;
@@ -29,10 +30,17 @@ import java.io.StringWriter;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.junit.Before;
import org.junit.Test;
public class LoginAbstractAzkabanServletTest {
+ @Before
+ public void setUp() {
+ // TODO: reallocf remove setUp() when LoginAbstractServlet fully guiced
+ initServiceProvider();
+ }
+
private HttpServletResponse getResponse(final StringWriter stringWriter) {
final HttpServletResponse resp = mock(HttpServletResponse.class);
final PrintWriter writer = new PrintWriter(stringWriter);
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
index 1130b3e..3e502fa 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
@@ -18,8 +18,8 @@ package azkaban.webapp;
import static org.junit.Assert.assertEquals;
-import azkaban.metrics.MetricsManager;
import azkaban.metrics.MetricsTestUtility;
+import com.codahale.metrics.MetricRegistry;
import org.junit.Before;
import org.junit.Test;
@@ -31,9 +31,9 @@ public class WebMetricsTest {
@Before
public void setUp() {
- // todo HappyRay: move MetricsManager, WebMetrics to use Juice.
- this.testUtil = new MetricsTestUtility(MetricsManager.INSTANCE.getRegistry());
- this.metrics = WebMetrics.INSTANCE;
+ final MetricRegistry metricRegistry = new MetricRegistry();
+ this.testUtil = new MetricsTestUtility(metricRegistry);
+ this.metrics = new WebMetrics(metricRegistry);
}
@Test