azkaban-aplcache
Changes
azkaban-web-server/build.gradle 3(+3 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
index b626fab..626a9c6 100644
--- a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
+++ b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
@@ -16,6 +16,9 @@
package azkaban.database;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -26,7 +29,6 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-import azkaban.utils.Props;
public abstract class AbstractJdbcLoader {
/**
@@ -65,6 +67,7 @@ public abstract class AbstractJdbcLoader {
protected Connection getDBConnection(boolean autoCommit) throws IOException {
Connection connection = null;
+ CommonMetrics.INSTANCE.markDBConnection();
try {
connection = dataSource.getConnection();
connection.setAutoCommit(autoCommit);
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
new file mode 100644
index 0000000..6959c89
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * This singleton class CommonMetrics is in charge of collecting varieties of metrics
+ * which are accessed in both web and exec modules. That said, these metrics will be
+ * exposed in both Web server and executor.
+ */
+public enum CommonMetrics {
+ INSTANCE;
+
+ private Meter dbConnectionMeter;
+ private MetricRegistry registry;
+
+ CommonMetrics() {
+ registry = MetricsManager.INSTANCE.getRegistry();
+ setupAllMetrics();
+ }
+
+ private void setupAllMetrics() {
+ dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", registry);
+ }
+
+ /**
+ * Mark the occurrence of an DB query event.
+ */
+ public void markDBConnection() {
+
+ /*
+ * This method should be Thread Safe.
+ * Two reasons that we don't make this function call synchronized:
+ * 1). drop wizard metrics deals with concurrency internally;
+ * 2). mark is basically a math addition operation, which should not cause race condition issue.
+ */
+ dbConnectionMeter.mark();
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
new file mode 100644
index 0000000..12398e3
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
@@ -0,0 +1,36 @@
+package azkaban.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Creating an utility class to facilitate metrics class like {@link azkaban.metrics.CommonMetrics}
+ * to share common operations easily.
+ */
+public final class MetricsUtility {
+
+ private MetricsUtility() {
+ //Utility class's constructor should not be called
+ }
+
+ /**
+ * A {@link Meter} measures the rate of events over time (e.g., “requests per second”).
+ * Here we track 1-minute moving averages.
+ */
+ public static Meter addMeter(String name, MetricRegistry registry) {
+ Meter curr = registry.meter(name);
+ registry.register(name + "-gauge", (Gauge<Double>) curr::getOneMinuteRate);
+ return curr;
+ }
+
+ /**
+ * A {@link Gauge} is an instantaneous reading of a particular value.
+ * This method adds an AtomicLong number/metric to registry.
+ */
+ public static void addLongGauge(String name, AtomicLong value, MetricRegistry registry) {
+ registry.register(name, (Gauge<Long>) value::get);
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
new file mode 100644
index 0000000..5d6a868
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import azkaban.metrics.MetricsTestUtility.DummyReporter;
+
+import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CommonMetricsTest {
+
+ private DummyReporter dr;
+
+ @Before
+ public void setup() {
+ dr = new DummyReporter(MetricsManager.INSTANCE.getRegistry());
+ dr.start(Duration.ofMillis(2).toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @After
+ public void shutdown() {
+ if (null != dr)
+ dr.stop();
+
+ dr = null;
+ }
+
+ @Test
+ public void testMarkDBConnectionMetrics() {
+ MetricsTestUtility.testMeter("DB-Connection-meter", dr, CommonMetrics.INSTANCE::markDBConnection);
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
new file mode 100644
index 0000000..706b52e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * This class is designed for a utility class to test drop wizard metrics
+ */
+public class MetricsTestUtility {
+
+ /**
+ * One Dummy Reporter extending {@link ScheduledReporter} collects metrics in various maps,
+ * which test methods are able to access easily.
+ */
+ public static class DummyReporter extends ScheduledReporter{
+
+ private Map<String, String> gaugeMap;
+
+ private Map<String, Long> meterMap;
+
+ public DummyReporter(MetricRegistry registry) {
+ super(registry, "dummy-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ this.gaugeMap = new HashMap<>();
+ this.meterMap = new HashMap<>();
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+ gaugeMap.put(entry.getKey(), entry.getValue().getValue().toString());
+ }
+
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ meterMap.put(entry.getKey(), entry.getValue().getCount());
+ }
+ }
+
+ private String getGauge(String key) {
+ return gaugeMap.get(key);
+ }
+
+ private long getMeter(String key) {
+ return meterMap.getOrDefault(key, 0L);
+ }
+ }
+
+ public static void testMeter(String meterName, DummyReporter dr, Runnable runnable) {
+
+ sleepMillis(20);
+ long currMeter = dr.getMeter(meterName);
+ runnable.run();
+ sleepMillis(20);
+ Assert.assertEquals(dr.getMeter(meterName), currMeter + 1);
+
+ runnable.run();
+ runnable.run();
+ sleepMillis(20);
+ Assert.assertEquals(dr.getMeter(meterName), currMeter + 3);
+ }
+
+ public static void testGauge(String GaugeName, DummyReporter dr, Consumer<Long> func) {
+ func.accept(1L);
+ sleepMillis(20);
+ Assert.assertEquals(dr.getGauge(GaugeName), "1");
+ func.accept(99L);
+ sleepMillis(20);
+ Assert.assertEquals(dr.getGauge(GaugeName), "99");
+ }
+
+
+ /**
+ * Helper method to sleep milli seconds.
+ */
+ private static void sleepMillis(int numMilli) {
+ try {
+ Thread.sleep(numMilli);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
azkaban-web-server/build.gradle 3(+3 -0)
diff --git a/azkaban-web-server/build.gradle b/azkaban-web-server/build.gradle
index 23e59f0..bce9503 100644
--- a/azkaban-web-server/build.gradle
+++ b/azkaban-web-server/build.gradle
@@ -58,6 +58,9 @@ dependencies {
testCompile('org.hamcrest:hamcrest-all:1.3')
testCompile('org.mockito:mockito-all:1.10.19')
+ //AZ web module tests need to access classes defined in azkaban-common test module
+ testCompile project(':azkaban-common').sourceSets.test.output
+
generateRestli('com.linkedin.pegasus:generator:' + pegasusVersion)
generateRestli('com.linkedin.pegasus:restli-tools:' + pegasusVersion)
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 5476470..225df57 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -16,22 +16,6 @@
package azkaban.webapp.servlet;
-import azkaban.executor.ExecutorManager;
-import azkaban.utils.FlowUtils;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.log4j.Logger;
-
import azkaban.constants.ServerProperties;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
@@ -57,11 +41,28 @@ import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.utils.ExternalLinkUtils;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.FlowUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.plugin.PluginRegistry;
import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.WebMetrics;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.log4j.Logger;
+
public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private static final Logger LOGGER =
@@ -489,6 +490,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private void ajaxFetchExecFlowLogs(HttpServletRequest req,
HttpServletResponse resp, HashMap<String, Object> ret, User user,
ExecutableFlow exFlow) throws ServletException {
+ long startMs = System.currentTimeMillis();
Project project =
getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
if (project == null) {
@@ -515,6 +517,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
} catch (ExecutorManagerException e) {
throw new ServletException(e);
}
+
+ /*
+ * We originally consider leverage Drop Wizard's Timer API {@link com.codahale.metrics.Timer}
+ * to measure the duration time.
+ * However, Timer will result in too many accompanying metrics (e.g., min, max, 99th quantile)
+ * regarding one metrics. We decided to use gauge to do that and monitor how it behaves.
+ */
+ WebMetrics.INSTANCE.setFetchLogLatency(System.currentTimeMillis() - startMs);
}
/**
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index f2a4603..3980d3f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,6 +16,7 @@
package azkaban.webapp.servlet;
+import azkaban.webapp.WebMetrics;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -99,6 +100,8 @@ public abstract class LoginAbstractAzkabanServlet extends
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
+
+ WebMetrics.INSTANCE.markWebGetCall();
// Set session id
Session session = getSessionFromRequest(req);
logRequest(req, session);
@@ -133,7 +136,7 @@ public abstract class LoginAbstractAzkabanServlet extends
/**
* Log out request - the format should be close to Apache access log format
- *
+ *
* @param req
* @param session
*/
@@ -167,7 +170,7 @@ public abstract class LoginAbstractAzkabanServlet extends
buf.append("not-browser");
}
}
-
+
logger.info(buf.toString());
}
@@ -276,7 +279,7 @@ public abstract class LoginAbstractAzkabanServlet extends
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
Session session = getSessionFromRequest(req);
-
+ WebMetrics.INSTANCE.markWebPostCall();
logRequest(req, session);
// Handle Multipart differently from other post messages
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
new file mode 100644
index 0000000..b0aeabe
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsUtility;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * This singleton class WebMetrics is in charge of collecting varieties of metrics
+ * from azkaban-web-server modules.
+ */
+public enum WebMetrics {
+ INSTANCE;
+
+ private MetricRegistry registry;
+
+ private Meter webGetCall;
+ private Meter webPostCall;
+
+ // How long does user log fetch take when user call fetch-log api.
+ private AtomicLong logFetchLatency = new AtomicLong(0L);
+
+ WebMetrics() {
+ registry = MetricsManager.INSTANCE.getRegistry();
+ setupAllMetrics();
+ }
+
+ private void setupAllMetrics() {
+ webGetCall = MetricsUtility.addMeter("Web-Get-Call-Meter", registry);
+ webPostCall = MetricsUtility.addMeter("Web-Post-Call-Meter", registry);
+ MetricsUtility.addLongGauge("fetchLogLatency", logFetchLatency, registry);
+ }
+
+ public void markWebGetCall() {
+
+ /*
+ * This method should be Thread Safe.
+ * Two reasons that we don't make this function call synchronized:
+ * 1). drop wizard metrics deals with concurrency internally;
+ * 2). mark is basically a math addition operation, which should not cause race condition issue.
+ */
+ webGetCall.mark();
+ }
+
+ public void markWebPostCall() {
+
+ webPostCall.mark();
+ }
+
+ public void setFetchLogLatency(long milliseconds) {
+ logFetchLatency.set(milliseconds);
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
new file mode 100644
index 0000000..46a7cb3
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility.DummyReporter;
+import azkaban.metrics.MetricsTestUtility;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class WebMetricsTest{
+
+ private DummyReporter dr;
+
+ @Before
+ public void setup() {
+ dr = new DummyReporter(MetricsManager.INSTANCE.getRegistry());
+ dr.start(Duration.ofMillis(2).toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @After
+ public void shutdown() {
+ if (null != dr)
+ dr.stop();
+
+ dr = null;
+ }
+
+ @Test
+ public void testLogFetchLatencyMetrics() {
+ MetricsTestUtility.testGauge("fetchLogLatency", dr, WebMetrics.INSTANCE::setFetchLogLatency);
+ }
+
+ @Test
+ public void testWebPostCallMeter() {
+ MetricsTestUtility.testMeter("Web-Post-Call-Meter", dr, WebMetrics.INSTANCE::markWebPostCall);
+ }
+
+ @Test
+ public void testWebGetCallMeter() {
+ MetricsTestUtility.testMeter("Web-Get-Call-Meter", dr, WebMetrics.INSTANCE::markWebGetCall);
+ }
+}