Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 6c2524d..b9ec1e7 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -26,11 +26,11 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
protected String type;
protected MetricReportManager metricManager;
- public AbstractMetric(String metricName, String metricType, T initialValue) {
+ public AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
name = metricName;
type = metricType;
value = initialValue;
- metricManager = null;
+ metricManager = manager;
}
public String getName() {
@@ -41,7 +41,7 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
return type;
}
- public void setMetricManager(final MetricReportManager manager) {
+ public void updateMetricManager(final MetricReportManager manager) {
metricManager = manager;
}
@@ -49,7 +49,7 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
return value;
}
- protected void notifyManager() {
+ public synchronized void notifyManager() {
logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
try {
metricManager.reportMetric(this);
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 7226612..1b5f195 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -21,19 +21,15 @@ import azkaban.utils.Props;
public class GangliaMetricEmitter implements IMetricEmitter {
private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
- private static final String GANGLIA_METRIC_REPORTER_GROUP = "azkaban.metric.ganglia.group";
private String gmetricPath;
- private String gangliaGroup;
public GangliaMetricEmitter(Props azkProps) {
gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
- gangliaGroup = azkProps.get(GANGLIA_METRIC_REPORTER_GROUP);
}
private String buildCommand(IMetric<?> metric) {
- return String.format("%s -t %s -g \"%s\" -n %s -v %s", gmetricPath, metric.getValueType(), gangliaGroup,
- metric.getName(), metric.getValue().toString());
+ return String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue().toString());
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
index 7422ba4..ff6d4cd 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -19,7 +19,7 @@ package azkaban.metric;
public interface IMetric<T> {
String getName();
String getValueType();
- void setMetricManager(final MetricReportManager manager);
- void UpdateValueAndNotifyManager();
+ void updateMetricManager(final MetricReportManager manager);
+ void notifyManager();
T getValue();
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/InMemoryHistoryNode.java
new file mode 100644
index 0000000..54e9655
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/InMemoryHistoryNode.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class InMemoryHistoryNode {
+ private Object value;
+ private Date date;
+
+ public InMemoryHistoryNode(Object val) {
+ value = val;
+ date = new Date();
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public Date getTimestamp() {
+ return date;
+ }
+
+ public String formattedTimestamp() {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ return dateFormat.format(date);
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/InMemoryMetricEmitter.java
new file mode 100644
index 0000000..0df5876
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/InMemoryMetricEmitter.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import azkaban.utils.Props;
+
+
+public class InMemoryMetricEmitter implements IMetricEmitter {
+ Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
+ private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
+ private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+
+ long Interval;
+ long numInstances;
+
+ public InMemoryMetricEmitter(Props azkProps) {
+ historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
+ Interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7);
+ numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 10000);
+ }
+
+ @Override
+ public void reportMetric(IMetric<?> metric) throws Exception {
+ String metricName = metric.getName();
+ if (!historyListMapping.containsKey(metricName)) {
+ historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
+ }
+ synchronized (historyListMapping.get(metricName)) {
+ historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
+ cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
+ }
+ }
+
+ public List<InMemoryHistoryNode> getDrawMetric(String metricName, Date from, Date to) {
+ LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
+ if (historyListMapping.containsKey(metricName)) {
+
+ // selecting nodes within time frame
+ synchronized (historyListMapping.get(metricName)) {
+ for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
+ if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
+ selectedLists.add(node);
+ }
+ if (node.getTimestamp().after(to)) {
+ break;
+ }
+ }
+ }
+
+ // selecting nodes if num of nodes > numInstances
+ if (selectedLists.size() > numInstances) {
+ double step = (double) selectedLists.size() / numInstances;
+ long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
+ Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+
+ while(ite.hasNext()) {
+ ite.next();
+ if (currentIndex == nextIndex) {
+ nextIndex = (long) Math.floor(numSelectedInstances * step + 0.5);
+ numSelectedInstances++;
+ } else {
+ ite.remove();
+ }
+ currentIndex++;
+ }
+ }
+ }
+ cleanUsingTime(metricName, new Date());
+ return selectedLists;
+ }
+
+ private void cleanUsingTime(String metricName, Date firstAllowedDate) {
+ if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
+ synchronized (historyListMapping.get(metricName)) {
+
+ InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
+ // removing objects older than Interval time from firstAllowedDate
+ while (firstNode != null
+ && TimeUnit.MILLISECONDS.toSeconds(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > Interval) {
+ historyListMapping.get(metricName).removeFirst();
+ firstNode = historyListMapping.get(metricName).peekFirst();
+ }
+ }
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index 3352f4f..9d411eb 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -17,6 +17,7 @@
package azkaban.metric;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -29,7 +30,7 @@ public class MetricReportManager {
private static final Logger logger = Logger.getLogger(MetricReportManager.class);
private List<IMetric<?>> metrics;
- private IMetricEmitter metricEmitter;
+ private List<IMetricEmitter> metricEmitters;
private ExecutorService executorService;
private static volatile MetricReportManager instance = null;
@@ -37,6 +38,7 @@ public class MetricReportManager {
private MetricReportManager() {
executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
metrics = new ArrayList<IMetric<?>>();
+ metricEmitters = new LinkedList<IMetricEmitter>();
}
public static boolean isInstantiated() {
@@ -58,35 +60,44 @@ public class MetricReportManager {
// each element of metrics List is responsible to call this method and report metrics
public void reportMetric(final IMetric<?> metric) {
if (metric != null) {
- // TODO: change to debug level
- logger.info(String.format("Submitting %s metric for metric emission pool", metric.getName()));
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- metricEmitter.reportMetric(metric);
- } catch (Exception ex) {
- logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
- }
+
+ // Report metric to all the emitters
+ synchronized (metric) {
+ logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
+ for (final IMetricEmitter metricEmitter : metricEmitters) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ metricEmitter.reportMetric(metric);
+ } catch (Exception ex) {
+ logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
+ }
+ }
+ });
}
- });
+ }
}
}
- public void setMetricEmitter(final IMetricEmitter emitter) {
- metricEmitter = emitter;
+ public void addMetricEmitter(final IMetricEmitter emitter) {
+ metricEmitters.add(emitter);
+ }
+
+ public void removeMetricEmitter(final IMetricEmitter emitter) {
+ metricEmitters.remove(emitter);
}
- public IMetricEmitter getMetricEmitter() {
- return metricEmitter;
+ public List<IMetricEmitter> getMetricEmitters() {
+ return metricEmitters;
}
- public void AddMetric(final IMetric<?> metric) {
+ public void addMetric(final IMetric<?> metric) {
// metric null or already present
if (metric != null && getMetricFromName(metric.getName()) == null) {
logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
metrics.add(metric);
- metric.setMetricManager(this);
+ metric.updateMetricManager(this);
}
}
@@ -103,6 +114,10 @@ public class MetricReportManager {
return metric;
}
+ public List<IMetric<?>> getAllMetrics() {
+ return metrics;
+ }
+
protected void finalize() {
executorService.shutdown();
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
new file mode 100644
index 0000000..985dd74
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+
+public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
+ private Timer timer = new Timer();
+ private TimerTask recurringReporting = new TimerTask() {
+ @Override
+ public void run() {
+ finalizeValue();
+ notifyManager();
+ }
+ };
+
+ public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
+ long interval) {
+ super(metricName, metricType, initialValue, manager);
+ timer.schedule(recurringReporting, interval, interval);
+ }
+
+ public void updateInterval(long interval) {
+ timer.cancel();
+ timer.schedule(recurringReporting, interval, interval);
+ }
+
+ /**
+ * This method is responsible for making a final update to value, if any
+ */
+ protected abstract void finalizeValue();
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index aff7250..493d745 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,6 +44,7 @@ import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.jmx.JmxJettyServer;
import azkaban.metric.GangliaMetricEmitter;
import azkaban.metric.IMetricEmitter;
+import azkaban.metric.InMemoryMetricEmitter;
import azkaban.metric.MetricReportManager;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
@@ -137,11 +138,18 @@ public class AzkabanExecutorServer {
if (props.getBoolean("executor.metric.reports", false)) {
logger.info("Starting to configure Metric Reports");
MetricReportManager metricManager = MetricReportManager.getInstance();
- IMetricEmitter metricEmitter = new GangliaMetricEmitter(props);
- metricManager.setMetricEmitter(metricEmitter);
+ IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
+ metricManager.addMetricEmitter(metricEmitter);
+
+ // Adding number of Jobs
+ metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
+ + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+
+ // Adding number of flows
+ metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
+ "executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+ props.getInt("executor.metric.interval.default"))));
- metricManager.AddMetric(new NumRunningJobMetric());
- metricManager.AddMetric(new NumRunningFlowMetric(runnerManager));
logger.info("Completed configuring Metric Reports");
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index 0db4363..f0b45ed 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -16,38 +16,26 @@
package azkaban.execapp.metric;
-import java.util.Timer;
-import java.util.TimerTask;
import azkaban.execapp.FlowRunnerManager;
-import azkaban.metric.AbstractMetric;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
-public class NumRunningFlowMetric extends AbstractMetric<Integer> {
+public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
- public static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
- private static final int NUM_RUNNING_FLOW_INTERVAL = 5 * 1000; //milliseconds TODO: increase frequency
+ private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
private FlowRunnerManager flowManager;
- private Timer timer = new Timer();
- public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager) {
- super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0);
+ public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+ super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumRunningFlowMetric");
flowManager = flowRunnerManager;
-
- // schedule timer to trigger UpdateValueAndNotifyManager
- timer.schedule(new TimerTask() {
-
- @Override
- public void run() {
- UpdateValueAndNotifyManager();
- }
- }, NUM_RUNNING_FLOW_INTERVAL, NUM_RUNNING_FLOW_INTERVAL);
-
}
- public synchronized void UpdateValueAndNotifyManager() {
+ @Override
+ protected synchronized void finalizeValue() {
value = flowManager.getNumRunningFlows();
- notifyManager();
}
+
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index e6f64fe..66a4e38 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -19,29 +19,31 @@ package azkaban.execapp.metric;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventListener;
-import azkaban.metric.AbstractMetric;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
-public class NumRunningJobMetric extends AbstractMetric<Integer> implements EventListener {
+
+public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
- public static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
+ private static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
- public NumRunningJobMetric() {
- super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE , 0);
+ public NumRunningJobMetric(MetricReportManager manager, long interval) {
+ super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
logger.debug("Instantiated NumRunningJobMetric");
}
@Override
- public void UpdateValueAndNotifyManager() {
- notifyManager();
- }
-
- @Override
public synchronized void handleEvent(Event event) {
if (event.getType() == Type.JOB_STARTED) {
value = value + 1;
} else if (event.getType() == Type.JOB_FINISHED) {
value = value - 1;
}
- UpdateValueAndNotifyManager();
}
+
+ @Override
+ protected synchronized void finalizeValue() {
+ // nothing to finalize value is already updated
+ }
+
}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index b4140cb..92e8fd3 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -41,9 +41,7 @@ import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-
import org.joda.time.DateTimeZone;
-
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
@@ -96,6 +94,7 @@ import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.StatsServlet;
import azkaban.webapp.servlet.TriggerManagerServlet;
import azkaban.webapp.plugin.TriggerPlugin;
import azkaban.webapp.plugin.ViewerPlugin;
@@ -772,6 +771,7 @@ public class AzkabanWebServer extends AzkabanServer {
root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
+ root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
ServletHolder restliHolder = new ServletHolder(new RestliServlet());
restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
new file mode 100644
index 0000000..7a33a20
--- /dev/null
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -0,0 +1,146 @@
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import azkaban.metric.IMetric;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.InMemoryHistoryNode;
+import azkaban.metric.InMemoryMetricEmitter;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.server.session.Session;
+import azkaban.user.Permission;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.webapp.AzkabanWebServer;
+
+
+public class StatsServlet extends LoginAbstractAzkabanServlet {
+ private UserManager userManager;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+ AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ } else if (hasParam(req, "action")) {
+ String action = getParam(req, "action");
+ if (action.equals("changeMetricInterval")) {
+ handleChangeMetricInterval(req, resp, session);
+ }
+ } else {
+ handleStatePageLoad(req, resp, session);
+ }
+ }
+
+ private void handleChangeMetricInterval(HttpServletRequest req, HttpServletResponse resp, Session session)
+ throws ServletException {
+ String metricName = getParam(req, "metricName");
+ long newInterval = getLongParam(req, "interval");
+ if(MetricReportManager.isInstantiated()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
+ metric.updateInterval(newInterval);
+ }
+ }
+
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
+ throws ServletException, IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ String ajaxName = getParam(req, "ajax");
+
+ if (ajaxName.equals("metricHistory")) {
+ getMetricHistory(req, ret, session.getUser());
+ }
+
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
+ }
+
+ private void getMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+ if (MetricReportManager.isInstantiated()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ InMemoryMetricEmitter memoryEmitter = null;
+
+ for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
+ if (emitter instanceof InMemoryMetricEmitter) {
+ memoryEmitter = (InMemoryMetricEmitter) emitter;
+ break;
+ }
+ }
+
+ // if we have a memory emitter
+ if (memoryEmitter != null) {
+ try {
+ List<InMemoryHistoryNode> result =
+ memoryEmitter.getDrawMetric(getParam(req, "metricName"), parseDate(getParam(req, "from")),
+ parseDate(getParam(req, "to")));
+ if (result.size() > 0) {
+ ret.put("data", result);
+ } else {
+ ret.put("error", "No metric stats available");
+ }
+
+ } catch (ParseException ex) {
+ ret.put("error", "Invalid Date filter");
+ }
+ }
+ }
+ }
+
+ private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
+ throws ServletException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ if (!hasPermission(session.getUser(), Permission.Type.METRICS)) {
+ page.add("errorMsg", "User " + session.getUser().getUserId() + " has no permission.");
+ page.render();
+ return;
+ }
+ page.add("metricList", metricManager.getAllMetrics());
+ page.render();
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ }
+
+ protected boolean hasPermission(User user, Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private Date parseDate(String date) throws ParseException {
+ DateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm a");
+ return format.parse(date);
+ }
+}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
new file mode 100644
index 0000000..1ca3a73
--- /dev/null
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -0,0 +1,136 @@
+#*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+
+#parse("azkaban/webapp/servlet/velocity/style.vm")
+#parse("azkaban/webapp/servlet/velocity/javascript.vm")
+
+ <link rel="stylesheet" type="text/css" href="${context}/css/bootstrap-datetimepicker.css" />
+
+ <script type="text/javascript" src="${context}/js/raphael.min.js"></script>
+ <script type="text/javascript" src="${context}/js/morris.min.js"></script>
+ <script type="text/javascript" src="${context}/js/moment.min.js"></script>
+ <script type="text/javascript" src="${context}/js/bootstrap-datetimepicker.min.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+
+ function refreshMetricChart() {
+ var requestURL = '/stats';
+ var requestData = {
+ 'ajax': 'metricHistory',
+ 'from': $('#datetimebegin').val(),
+ 'to' : $('#datetimeend').val(),
+ 'metricName': $('#metricName').val()
+ };
+ var successHandler = function(responseData) {
+ if(responseData.error != null) {
+ $('#reportedMetric').html(responseData.error);
+ } else {
+ var graphDiv = document.createElement('div');
+ $('#reportedMetric').html(graphDiv);
+
+ Morris.Line({
+ element: graphDiv,
+ data: responseData.data,
+ xkey: 'timestamp',
+ ykeys: ['value'],
+ labels: [$('#metricName').val()]
+ });
+ }
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ }
+
+ $(document).ready(function () {
+ $('#datetimebegin').datetimepicker();
+ $('#datetimeend').datetimepicker();
+ $('#datetimebegin').on('change.dp', function(e) {
+ $('#datetimeend').data('DateTimePicker').setStartDate(e.date);
+ });
+ $('#datetimeend').on('change.dp', function(e) {
+ $('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
+ });
+ $('#retrieve').click(refreshMetricChart);
+ });
+
+ </script>
+ </head>
+ <body>
+
+#set ($current_page="Statistics")
+#parse ("azkaban/webapp/servlet/velocity/nav.vm")
+
+#if ($errorMsg)
+ #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+#else
+
+ ## Page header.
+
+ <div class="az-page-header">
+ <div class="container-full">
+ <div class="row">
+ <div class="header-title">
+ <h1><a href="${context}/stats">Statistics</a></h1>
+ </div>
+ <div class="header-control" style="width:900px">
+
+ <form id="metric-form" method="get">
+ <label for="metricLabel" >Metric</label>
+ #if (!$metricList.isEmpty())
+ <select id="metricName" name="metricName" style="width:200px">
+ #foreach ($metric in $metricList)
+ <option value="${metric.name}" style="width:200px">${metric.name}</option>
+ #end
+ </select>
+ #end
+ <label for="datetimebegin" >Between</label>
+
+ <input type="text" id="datetimebegin" value="" class="ui-datetime-container" style="width:200px">
+
+ <label for="datetimeend" >and</label>
+
+ <input type="text" id="datetimeend" value="" class="ui-datetime-container" style="width:200px">
+
+ <input type="button" id="retrieve" value="Retrieve" class="btn btn-success" >
+ </div>
+ </div>
+ </form>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ <div class="container-full">
+
+ #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+
+ <div class="row">
+ <div id="reportedMetric" style="padding: 60px 10px 10px 10px;height: 750px;">
+ </div>
+ </div><!-- /row -->
+
+
+
+ #parse ("azkaban/webapp/servlet/velocity/invalidsessionmodal.vm")
+ </div><!-- /container-full -->
+#end
+ </body>
+<html>