azkaban-memoizeit

Adding InMemmoryMetricEmitter

12/12/2014 11:00:30 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 6c2524d..b9ec1e7 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -26,11 +26,11 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
   protected String type;
   protected MetricReportManager metricManager;
 
-  public AbstractMetric(String metricName, String metricType, T initialValue) {
+  public AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
     name = metricName;
     type = metricType;
     value = initialValue;
-    metricManager = null;
+    metricManager = manager;
   }
 
   public String getName() {
@@ -41,7 +41,7 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
     return type;
   }
 
-  public void setMetricManager(final MetricReportManager manager) {
+  public void updateMetricManager(final MetricReportManager manager) {
     metricManager = manager;
   }
 
@@ -49,7 +49,7 @@ public abstract class AbstractMetric<T> implements IMetric<T> {
     return value;
   }
 
-  protected void notifyManager() {
+  public synchronized void notifyManager() {
     logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
     try {
       metricManager.reportMetric(this);
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 7226612..1b5f195 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -21,19 +21,15 @@ import azkaban.utils.Props;
 
 public class GangliaMetricEmitter implements IMetricEmitter {
   private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
-  private static final String GANGLIA_METRIC_REPORTER_GROUP = "azkaban.metric.ganglia.group";
 
   private String gmetricPath;
-  private String gangliaGroup;
 
   public GangliaMetricEmitter(Props azkProps) {
     gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
-    gangliaGroup = azkProps.get(GANGLIA_METRIC_REPORTER_GROUP);
   }
 
   private String buildCommand(IMetric<?> metric) {
-    return String.format("%s -t %s -g \"%s\" -n %s -v %s", gmetricPath, metric.getValueType(), gangliaGroup,
-        metric.getName(), metric.getValue().toString());
+    return String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue().toString());
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
index 7422ba4..ff6d4cd 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -19,7 +19,7 @@ package azkaban.metric;
 public interface IMetric<T> {
   String getName();
   String getValueType();
-  void setMetricManager(final MetricReportManager manager);
-  void UpdateValueAndNotifyManager();
+  void updateMetricManager(final MetricReportManager manager);
+  void notifyManager();
   T getValue();
 }
diff --git a/azkaban-common/src/main/java/azkaban/metric/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/InMemoryHistoryNode.java
new file mode 100644
index 0000000..54e9655
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/InMemoryHistoryNode.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class InMemoryHistoryNode {
+  private Object value;
+  private Date date;
+
+  public InMemoryHistoryNode(Object val) {
+    value = val;
+    date = new Date();
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public Date getTimestamp() {
+    return date;
+  }
+
+  public String formattedTimestamp() {
+    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    return dateFormat.format(date);
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/InMemoryMetricEmitter.java
new file mode 100644
index 0000000..0df5876
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/InMemoryMetricEmitter.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import azkaban.utils.Props;
+
+
+public class InMemoryMetricEmitter implements IMetricEmitter {
+  Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
+  private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
+  private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+
+  long Interval;
+  long numInstances;
+
+  public InMemoryMetricEmitter(Props azkProps) {
+    historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
+    Interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7);
+    numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 10000);
+  }
+
+  @Override
+  public void reportMetric(IMetric<?> metric) throws Exception {
+    String metricName = metric.getName();
+    if (!historyListMapping.containsKey(metricName)) {
+      historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
+    }
+    synchronized (historyListMapping.get(metricName)) {
+      historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
+      cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
+    }
+  }
+
+  public List<InMemoryHistoryNode> getDrawMetric(String metricName, Date from, Date to) {
+    LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
+    if (historyListMapping.containsKey(metricName)) {
+
+      // selecting nodes within time frame
+      synchronized (historyListMapping.get(metricName)) {
+        for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
+          if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
+            selectedLists.add(node);
+          }
+          if (node.getTimestamp().after(to)) {
+            break;
+          }
+        }
+      }
+
+      // selecting nodes if num of nodes > numInstances
+      if (selectedLists.size() > numInstances) {
+        double step = (double) selectedLists.size() / numInstances;
+        long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
+        Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+
+        while(ite.hasNext()) {
+          ite.next();
+          if (currentIndex == nextIndex) {
+            nextIndex = (long) Math.floor(numSelectedInstances * step + 0.5);
+            numSelectedInstances++;
+          } else {
+            ite.remove();
+          }
+          currentIndex++;
+        }
+      }
+    }
+    cleanUsingTime(metricName, new Date());
+    return selectedLists;
+  }
+
+  private void cleanUsingTime(String metricName, Date firstAllowedDate) {
+    if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
+      synchronized (historyListMapping.get(metricName)) {
+
+        InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
+        // removing objects older than Interval time from firstAllowedDate
+        while (firstNode != null
+            && TimeUnit.MILLISECONDS.toSeconds(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > Interval) {
+          historyListMapping.get(metricName).removeFirst();
+          firstNode = historyListMapping.get(metricName).peekFirst();
+        }
+      }
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index 3352f4f..9d411eb 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -17,6 +17,7 @@
 package azkaban.metric;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -29,7 +30,7 @@ public class MetricReportManager {
   private static final Logger logger = Logger.getLogger(MetricReportManager.class);
 
   private List<IMetric<?>> metrics;
-  private IMetricEmitter metricEmitter;
+  private List<IMetricEmitter> metricEmitters;
   private ExecutorService executorService;
   private static volatile MetricReportManager instance = null;
 
@@ -37,6 +38,7 @@ public class MetricReportManager {
   private MetricReportManager() {
     executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
     metrics = new ArrayList<IMetric<?>>();
+    metricEmitters = new LinkedList<IMetricEmitter>();
   }
 
   public static boolean isInstantiated() {
@@ -58,35 +60,44 @@ public class MetricReportManager {
   // each element of metrics List is responsible to call this method and report metrics
   public void reportMetric(final IMetric<?> metric) {
     if (metric != null) {
-      // TODO: change to debug level
-      logger.info(String.format("Submitting %s metric for metric emission pool", metric.getName()));
-      executorService.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            metricEmitter.reportMetric(metric);
-          } catch (Exception ex) {
-            logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
-          }
+
+      // Report metric to all the emitters
+      synchronized (metric) {
+        logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
+        for (final IMetricEmitter metricEmitter : metricEmitters) {
+          executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                metricEmitter.reportMetric(metric);
+              } catch (Exception ex) {
+                logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
+              }
+            }
+          });
         }
-      });
+      }
     }
   }
 
-  public void setMetricEmitter(final IMetricEmitter emitter) {
-    metricEmitter = emitter;
+  public void addMetricEmitter(final IMetricEmitter emitter) {
+    metricEmitters.add(emitter);
+  }
+
+  public void removeMetricEmitter(final IMetricEmitter emitter) {
+    metricEmitters.remove(emitter);
   }
 
-  public IMetricEmitter getMetricEmitter() {
-    return metricEmitter;
+  public List<IMetricEmitter> getMetricEmitters() {
+    return metricEmitters;
   }
 
-  public void AddMetric(final IMetric<?> metric) {
+  public void addMetric(final IMetric<?> metric) {
     // metric null or already present
     if (metric != null && getMetricFromName(metric.getName()) == null) {
       logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
       metrics.add(metric);
-      metric.setMetricManager(this);
+      metric.updateMetricManager(this);
     }
   }
 
@@ -103,6 +114,10 @@ public class MetricReportManager {
     return metric;
   }
 
+  public List<IMetric<?>> getAllMetrics() {
+    return metrics;
+  }
+
   protected void finalize() {
     executorService.shutdown();
   }
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
new file mode 100644
index 0000000..985dd74
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+
+public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
+  private Timer timer = new Timer();
+  private TimerTask recurringReporting = new TimerTask() {
+    @Override
+    public void run() {
+      finalizeValue();
+      notifyManager();
+    }
+  };
+
+  public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
+      long interval) {
+    super(metricName, metricType, initialValue, manager);
+    timer.schedule(recurringReporting, interval, interval);
+  }
+
+  public void updateInterval(long interval) {
+    timer.cancel();
+    timer.schedule(recurringReporting, interval, interval);
+  }
+
+  /**
+   * This method is responsible for making a final update to value, if any
+   */
+  protected abstract void finalizeValue();
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index aff7250..493d745 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,6 +44,7 @@ import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.metric.GangliaMetricEmitter;
 import azkaban.metric.IMetricEmitter;
+import azkaban.metric.InMemoryMetricEmitter;
 import azkaban.metric.MetricReportManager;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
@@ -137,11 +138,18 @@ public class AzkabanExecutorServer {
     if (props.getBoolean("executor.metric.reports", false)) {
       logger.info("Starting to configure Metric Reports");
       MetricReportManager metricManager = MetricReportManager.getInstance();
-      IMetricEmitter metricEmitter = new GangliaMetricEmitter(props);
-      metricManager.setMetricEmitter(metricEmitter);
+      IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
+      metricManager.addMetricEmitter(metricEmitter);
+
+      // Adding number of Jobs
+      metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
+          + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+
+      // Adding number of flows
+      metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
+          "executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+          props.getInt("executor.metric.interval.default"))));
 
-      metricManager.AddMetric(new NumRunningJobMetric());
-      metricManager.AddMetric(new NumRunningFlowMetric(runnerManager));
       logger.info("Completed configuring Metric Reports");
     }
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index 0db4363..f0b45ed 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -16,38 +16,26 @@
 
 package azkaban.execapp.metric;
 
-import java.util.Timer;
-import java.util.TimerTask;
 import azkaban.execapp.FlowRunnerManager;
-import azkaban.metric.AbstractMetric;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
 
 
-public class NumRunningFlowMetric extends AbstractMetric<Integer> {
+public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
   public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
-  public static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
-  private static final int NUM_RUNNING_FLOW_INTERVAL = 5 * 1000; //milliseconds TODO: increase frequency
+  private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
 
   private FlowRunnerManager flowManager;
-  private Timer timer = new Timer();
 
-  public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager) {
-    super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0);
+  public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+    super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumRunningFlowMetric");
     flowManager = flowRunnerManager;
-
-    // schedule timer to trigger UpdateValueAndNotifyManager
-    timer.schedule(new TimerTask() {
-
-      @Override
-      public void run() {
-        UpdateValueAndNotifyManager();
-      }
-    }, NUM_RUNNING_FLOW_INTERVAL, NUM_RUNNING_FLOW_INTERVAL);
-
   }
 
-  public synchronized void UpdateValueAndNotifyManager() {
+  @Override
+  protected synchronized void finalizeValue() {
     value = flowManager.getNumRunningFlows();
-    notifyManager();
   }
+
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index e6f64fe..66a4e38 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -19,29 +19,31 @@ package azkaban.execapp.metric;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
-import azkaban.metric.AbstractMetric;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
 
-public class NumRunningJobMetric extends AbstractMetric<Integer> implements EventListener {
+
+public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
   public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
-  public static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
+  private static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
 
-  public NumRunningJobMetric() {
-    super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE , 0);
+  public NumRunningJobMetric(MetricReportManager manager, long interval) {
+    super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
     logger.debug("Instantiated NumRunningJobMetric");
   }
 
   @Override
-  public void UpdateValueAndNotifyManager() {
-    notifyManager();
-  }
-
-  @Override
   public synchronized void handleEvent(Event event) {
     if (event.getType() == Type.JOB_STARTED) {
       value = value + 1;
     } else if (event.getType() == Type.JOB_FINISHED) {
       value = value - 1;
     }
-    UpdateValueAndNotifyManager();
   }
+
+  @Override
+  protected synchronized void finalizeValue() {
+    // nothing to finalize value is already updated
+  }
+
 }
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index b4140cb..92e8fd3 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -41,9 +41,7 @@ import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
 import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-
 import org.joda.time.DateTimeZone;
-
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
@@ -96,6 +94,7 @@ import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
 import azkaban.webapp.plugin.TriggerPlugin;
 import azkaban.webapp.plugin.ViewerPlugin;
@@ -772,6 +771,7 @@ public class AzkabanWebServer extends AzkabanServer {
     root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
+    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
 
     ServletHolder restliHolder = new ServletHolder(new RestliServlet());
     restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
new file mode 100644
index 0000000..7a33a20
--- /dev/null
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -0,0 +1,146 @@
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import azkaban.metric.IMetric;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.InMemoryHistoryNode;
+import azkaban.metric.InMemoryMetricEmitter;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.server.session.Session;
+import azkaban.user.Permission;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.webapp.AzkabanWebServer;
+
+
+public class StatsServlet extends LoginAbstractAzkabanServlet {
+  private UserManager userManager;
+
+  @Override
+  public void init(ServletConfig config) throws ServletException {
+    super.init(config);
+    AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
+  }
+
+  @Override
+  protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+      IOException {
+    if (hasParam(req, "ajax")) {
+      handleAJAXAction(req, resp, session);
+    } else if (hasParam(req, "action")) {
+      String action = getParam(req, "action");
+      if (action.equals("changeMetricInterval")) {
+        handleChangeMetricInterval(req, resp, session);
+      }
+    } else {
+      handleStatePageLoad(req, resp, session);
+    }
+  }
+
+  private void handleChangeMetricInterval(HttpServletRequest req, HttpServletResponse resp, Session session)
+      throws ServletException {
+    String metricName = getParam(req, "metricName");
+    long newInterval = getLongParam(req, "interval");
+    if(MetricReportManager.isInstantiated()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
+      metric.updateInterval(newInterval);
+    }
+  }
+
+  private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
+      throws ServletException, IOException {
+    HashMap<String, Object> ret = new HashMap<String, Object>();
+    String ajaxName = getParam(req, "ajax");
+
+    if (ajaxName.equals("metricHistory")) {
+      getMetricHistory(req, ret, session.getUser());
+    }
+
+    if (ret != null) {
+      this.writeJSON(resp, ret);
+    }
+  }
+
+  private void getMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+    if (MetricReportManager.isInstantiated()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      InMemoryMetricEmitter memoryEmitter = null;
+
+      for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
+        if (emitter instanceof InMemoryMetricEmitter) {
+          memoryEmitter = (InMemoryMetricEmitter) emitter;
+          break;
+        }
+      }
+
+      // if we have a memory emitter
+      if (memoryEmitter != null) {
+        try {
+          List<InMemoryHistoryNode> result =
+              memoryEmitter.getDrawMetric(getParam(req, "metricName"), parseDate(getParam(req, "from")),
+                  parseDate(getParam(req, "to")));
+          if (result.size() > 0) {
+            ret.put("data", result);
+          } else {
+            ret.put("error", "No metric stats available");
+          }
+
+        } catch (ParseException ex) {
+          ret.put("error", "Invalid Date filter");
+        }
+      }
+    }
+  }
+
+  private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
+      throws ServletException {
+    Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
+    MetricReportManager metricManager = MetricReportManager.getInstance();
+    if (!hasPermission(session.getUser(), Permission.Type.METRICS)) {
+      page.add("errorMsg", "User " + session.getUser().getUserId() + " has no permission.");
+      page.render();
+      return;
+    }
+    page.add("metricList", metricManager.getAllMetrics());
+    page.render();
+  }
+
+  @Override
+  protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+      IOException {
+  }
+
+  protected boolean hasPermission(User user, Permission.Type type) {
+    for (String roleName : user.getRoles()) {
+      Role role = userManager.getRole(roleName);
+      if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private Date parseDate(String date) throws ParseException {
+    DateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm a");
+    return format.parse(date);
+  }
+}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
new file mode 100644
index 0000000..1ca3a73
--- /dev/null
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -0,0 +1,136 @@
+#*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+
+#parse("azkaban/webapp/servlet/velocity/style.vm")
+#parse("azkaban/webapp/servlet/velocity/javascript.vm")
+
+    <link rel="stylesheet" type="text/css" href="${context}/css/bootstrap-datetimepicker.css" />
+
+    <script type="text/javascript" src="${context}/js/raphael.min.js"></script>
+    <script type="text/javascript" src="${context}/js/morris.min.js"></script>
+    <script type="text/javascript" src="${context}/js/moment.min.js"></script>
+    <script type="text/javascript" src="${context}/js/bootstrap-datetimepicker.min.js"></script>
+    <script type="text/javascript">
+      var contextURL = "${context}";
+      var currentTime = ${currentTime};
+      var timezone = "${timezone}";
+
+      function refreshMetricChart() {
+            var requestURL = '/stats';
+            var requestData = {
+              'ajax': 'metricHistory',
+              'from': $('#datetimebegin').val(),
+              'to'  : $('#datetimeend').val(),
+              'metricName': $('#metricName').val()
+            };
+            var successHandler = function(responseData) {
+              if(responseData.error != null) {
+                $('#reportedMetric').html(responseData.error);
+              } else {
+                var graphDiv = document.createElement('div');
+                $('#reportedMetric').html(graphDiv);
+
+                Morris.Line({
+                              element: graphDiv,
+                              data: responseData.data,
+                              xkey: 'timestamp',
+                              ykeys: ['value'],
+                              labels: [$('#metricName').val()]
+                            });
+              }
+            };
+            $.get(requestURL, requestData, successHandler, 'json');
+      }
+
+      $(document).ready(function () {
+          $('#datetimebegin').datetimepicker();
+          $('#datetimeend').datetimepicker();
+          $('#datetimebegin').on('change.dp', function(e) {
+              $('#datetimeend').data('DateTimePicker').setStartDate(e.date);
+            });
+          $('#datetimeend').on('change.dp', function(e) {
+              $('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
+            });
+          $('#retrieve').click(refreshMetricChart);
+      });
+
+    </script>
+  </head>
+  <body>
+
+#set ($current_page="Statistics")
+#parse ("azkaban/webapp/servlet/velocity/nav.vm")
+
+#if ($errorMsg)
+  #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+#else
+
+  ## Page header.
+
+    <div class="az-page-header">
+      <div class="container-full">
+        <div class="row">
+          <div class="header-title">
+            <h1><a href="${context}/stats">Statistics</a></h1>
+          </div>
+          <div class="header-control" style="width:900px">
+
+            <form id="metric-form" method="get">
+                        <label for="metricLabel" >Metric</label>
+              #if (!$metricList.isEmpty())
+                 <select id="metricName"  name="metricName" style="width:200px">
+                    #foreach ($metric in $metricList)
+                              <option value="${metric.name}" style="width:200px">${metric.name}</option>
+                     #end
+                 </select>
+              #end
+                   <label for="datetimebegin" >Between</label>
+
+                    <input type="text" id="datetimebegin" value="" class="ui-datetime-container" style="width:200px">
+
+                   <label for="datetimeend" >and</label>
+
+                     <input type="text" id="datetimeend" value="" class="ui-datetime-container" style="width:200px">
+
+                    <input type="button" id="retrieve" value="Retrieve" class="btn btn-success" >
+                </div>
+              </div>
+            </form>
+          </div>
+        </div>
+      </div>
+    </div>
+
+    <div class="container-full">
+
+  #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+
+      <div class="row">
+        <div id="reportedMetric" style="padding: 60px 10px 10px 10px;height: 750px;">
+        </div>
+      </div><!-- /row -->
+
+
+
+  #parse ("azkaban/webapp/servlet/velocity/invalidsessionmodal.vm")
+    </div><!-- /container-full -->
+#end
+  </body>
+<html>