azkaban-uncached

Merge pull request #376 from logiclord/master Adding metric

1/20/2015 4:52:43 PM

Changes

build.gradle 1(+1 -0)

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index ddd8bba..8bc725f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,8 +32,7 @@ public interface ConnectorParams {
   public static final String LOG_ACTION = "log";
   public static final String ATTACHMENTS_ACTION = "attachments";
   public static final String METADATA_ACTION = "metadata";
-  public static final String RELOAD_JOBTYPE_PLUGINS_ACTION =
-      "reloadJobTypePlugins";
+  public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
 
   public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
   public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
@@ -80,12 +79,33 @@ public interface ConnectorParams {
   public static final String JMX_GET_MBEANS = "getMBeans";
   public static final String JMX_GET_MBEAN_INFO = "getMBeanInfo";
   public static final String JMX_GET_MBEAN_ATTRIBUTE = "getAttribute";
-  public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES =
-      "getAllMBeanAttributes";
+  public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES = "getAllMBeanAttributes";
   public static final String JMX_ATTRIBUTE = "attribute";
   public static final String JMX_MBEAN = "mBean";
 
-  public static final String JMX_GET_ALL_EXECUTOR_ATTRIBUTES =
-      "getAllExecutorAttributes";
+  public static final String JMX_GET_ALL_EXECUTOR_ATTRIBUTES = "getAllExecutorAttributes";
   public static final String JMX_HOSTPORT = "hostPort";
+
+  public static final String STATS_GET_ALLMETRICSNAME = "getAllMetricNames";
+  public static final String STATS_GET_METRICHISTORY = "getMetricHistory";
+  public static final String STATS_SET_REPORTINGINTERVAL = "changeMetricInterval";
+  public static final String STATS_SET_CLEANINGINTERVAL = "changeCleaningInterval";
+  public static final String STATS_SET_MAXREPORTERPOINTS = "changeEmitterPoints";
+  public static final String STATS_SET_ENABLEMETRICS = "enableMetrics";
+  public static final String STATS_SET_DISABLEMETRICS = "disableMetrics";
+  public static final String STATS_MAP_METRICNAMEPARAM = "metricName";
+
+  /**
+   * useStats param is used to filter datapoints on /stats graph by using standard deviation and means
+   * By default, we consider only top/bottom 5% datapoints
+   */
+
+  public static final String STATS_MAP_METRICRETRIEVALMODE = "useStats";
+  public static final String STATS_MAP_STARTDATE = "from";
+  public static final String STATS_MAP_ENDDATE = "to";
+  public static final String STATS_MAP_REPORTINGINTERVAL = "interval";
+  public static final String STATS_MAP_CLEANINGINTERVAL = "interval";
+  public static final String STATS_MAP_EMITTERNUMINSTANCES = "numInstances";
+
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index e3521db..5f7bcd5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -38,7 +38,6 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 
 import azkaban.alert.Alerter;
@@ -718,6 +717,53 @@ public class ExecutorManager extends EventHandler implements
     return jsonResponse;
   }
 
+  /**
+   * Manage servlet call for stats servlet in Azkaban execution server
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+   */
+  @Override
+  public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
+
+    URIBuilder builder = new URIBuilder();
+    builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+
+    builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+
+    if (params != null) {
+      for (Pair<String, String> pair : params) {
+        builder.setParameter(pair.getFirst(), pair.getSecond());
+      }
+    }
+
+    URI uri = null;
+    try {
+      uri = builder.build();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
+    ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+    HttpClient httpclient = new DefaultHttpClient();
+    HttpGet httpget = new HttpGet(uri);
+    String response = null;
+    try {
+      response = httpclient.execute(httpget, responseHandler);
+    } catch (IOException e) {
+      throw e;
+    } finally {
+      httpclient.getConnectionManager().shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    Map<String, Object> jsonResponse =
+        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+
+    return jsonResponse;
+  }
+
+
   @Override
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index c7e8913..10379ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import azkaban.project.Project;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
 
 public interface ExecutorManagerAdapter {
 
@@ -164,6 +165,22 @@ public interface ExecutorManagerAdapter {
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
       throws ExecutorManagerException;
 
+  /**
+   * Manage servlet call for stats servlet in Azkaban execution server
+   * Action can take any of the following values
+   * <ul>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_REPORTINGINTERVAL}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_CLEANINGINTERVAL}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_MAXREPORTERPOINTS}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_ALLMETRICSNAME}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_METRICHISTORY}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
+   * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
+   * </ul>
+   */
+  public Map<String, Object> callExecutorStats(String action,
+      Pair<String, String>... params) throws IOException;
+
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException;
 
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
new file mode 100644
index 0000000..5a5a967
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Abstract class for Metric
+ * @param <T> Type of Value of a given metric
+ */
+public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable{
+  protected static final Logger logger = Logger.getLogger(MetricReportManager.class);
+  protected String name;
+  protected T value;
+  protected String type;
+  protected MetricReportManager metricManager;
+
+  /**
+   * @param metricName Name of metric
+   * @param metricType Metric type. For display purposes.
+   * @param initialValue Initial Value of a metric
+   * @param manager Metric Manager whom a metric will report to
+   */
+  protected AbstractMetric(String metricName, String metricType, T initialValue, MetricReportManager manager) {
+    name = metricName;
+    type = metricType;
+    value = initialValue;
+    metricManager = manager;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#getName()
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#getValueType()
+   */
+  public String getValueType() {
+    return type;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#updateMetricManager(azkaban.metric.MetricReportManager)
+   */
+  public void updateMetricManager(final MetricReportManager manager) {
+    metricManager = manager;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @throws CloneNotSupportedException
+   * @see azkaban.metric.IMetric#getSnapshot()
+   */
+  @SuppressWarnings("unchecked")
+  public IMetric<T> getSnapshot() throws CloneNotSupportedException{
+    return (IMetric<T>) this.clone();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#getValue()
+   */
+  public T getValue() {
+    return value;
+  }
+
+  /**
+   * Method used to notify manager for a tracking event.
+   * Metric is free to call this method as per implementation.
+   * Timer based or Azkaban events are the most common implementation
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetric#notifyManager()
+   */
+  public void notifyManager() {
+    logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
+    try {
+      metricManager.reportMetric(this);
+    } catch (Throwable ex) {
+      logger.error(String.format("Metric Manager is not set for %s metric", this.getClass().getName()), ex);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
new file mode 100644
index 0000000..5e954b7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import org.apache.commons.collections.bag.SynchronizedBag;
+
+import azkaban.utils.Props;
+
+
+/**
+ * MetricEmitter implementation to report metric to a ganglia gmetric process
+ */
+public class GangliaMetricEmitter implements IMetricEmitter {
+  private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
+  private String gmetricPath;
+
+  /**
+   * @param azkProps Azkaban Properties
+   */
+  public GangliaMetricEmitter(Props azkProps) {
+    gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
+  }
+
+  private String buildCommand(IMetric<?> metric) {
+    String cmd = null;
+
+    synchronized (metric) {
+      cmd =
+          String.format("%s -t %s -n %s -v %s", gmetricPath, metric.getValueType(), metric.getName(), metric.getValue()
+              .toString());
+    }
+
+    return cmd;
+  }
+
+  /**
+   * Report metric by executing command line interface of gmetrics
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+   */
+  @Override
+  public void reportMetric(final IMetric<?> metric) throws MetricException {
+    String gangliaCommand = buildCommand(metric);
+
+    if (gangliaCommand != null) {
+      // executes shell command to report metric to ganglia dashboard
+      try {
+        Process emission = Runtime.getRuntime().exec(gangliaCommand);
+        int exitCode;
+        exitCode = emission.waitFor();
+        if (exitCode != 0) {
+          throw new MetricException("Failed to report metric using gmetric");
+        }
+      } catch (Exception e) {
+        throw new MetricException("Failed to report metric using gmetric");
+      }
+    } else {
+      throw new MetricException("Failed to build ganglia Command");
+    }
+  }
+
+  @Override
+  public void purgeAllData() throws MetricException {
+
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
new file mode 100644
index 0000000..188b399
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Interface of any Metric
+ * @param <T> Type of Value of a given metric
+ */
+public interface IMetric<T> {
+  String getName();
+  String getValueType();
+  void updateMetricManager(final MetricReportManager manager);
+  void notifyManager();
+  T getValue();
+  IMetric<T> getSnapshot() throws CloneNotSupportedException;
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
new file mode 100644
index 0000000..bdc874b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Interface for metric emitters
+ */
+public interface IMetricEmitter {
+  void reportMetric(final IMetric<?> metric) throws MetricException;
+  void purgeAllData() throws MetricException;
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
new file mode 100644
index 0000000..b8f1f6e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryNode.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric.inmemoryemitter;
+
+import java.util.Date;
+
+/**
+ * A snapshot of metric's value
+ */
+public class InMemoryHistoryNode {
+  private Object value;
+  private Date date;
+
+  /**
+   * Takes snapshot of the metric with a given value
+   * @param val
+   */
+  public InMemoryHistoryNode(final Object val) {
+    value = val;
+    date = new Date();
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public Date getTimestamp() {
+    return date;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
new file mode 100644
index 0000000..e93d11b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric.inmemoryemitter;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import azkaban.metric.IMetric;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
+import azkaban.utils.Props;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+
+/**
+ * Metric Emitter which maintains in memory snapshots of the metrics
+ * This is also the default metric emitter and used by /stats servlet
+ */
+public class InMemoryMetricEmitter implements IMetricEmitter {
+  protected static final Logger logger = Logger.getLogger(InMemoryMetricEmitter.class);
+
+  /**
+   * Data structure to keep track of snapshots
+   */
+  protected Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
+  private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
+  private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+  private static final String INMEMORY_METRIC_STANDARDDEVIATION_FACTOR =
+      "azkaban.metric.inmemory.standardDeviationFactor";
+
+  private double standardDeviationFactor;
+  /**
+   * Interval (in millisecond) from today for which we should maintain the in memory snapshots
+   */
+  private long timeWindow;
+  /**
+   * Maximum number of snapshots that should be displayed on /stats servlet
+   */
+  private long numInstances;
+
+  /**
+   * @param azkProps Azkaban Properties
+   */
+  public InMemoryMetricEmitter(Props azkProps) {
+    historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
+    timeWindow = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
+    numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
+    standardDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_STANDARDDEVIATION_FACTOR, 2);
+  }
+
+  /**
+   * Update reporting interval
+   * @param val interval in milli seconds
+   */
+  public synchronized void setReportingInterval(long val) {
+    timeWindow = val;
+  }
+
+  /**
+   * Set number of /stats servlet display points
+   * @param num
+   */
+  public void setReportingInstances(long num) {
+    numInstances = num;
+  }
+
+  /**
+   * Ingest metric in snapshot data structure while maintaining interval
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
+   */
+  @Override
+  public void reportMetric(final IMetric<?> metric) throws MetricException {
+    String metricName = metric.getName();
+    if (!historyListMapping.containsKey(metricName)) {
+      logger.info("First time capturing metric: " + metricName);
+      historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
+    }
+    synchronized (historyListMapping.get(metricName)) {
+      logger.debug("Ingesting metric: " + metricName);
+      historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
+      cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
+    }
+  }
+
+  /**
+   * Get snapshots for a given metric at a given time
+   * @param metricName name of the metric
+   * @param from Start date
+   * @param to end date
+   * @param useStats get statistically significant points only
+   * @return List of snapshots
+   */
+  public List<InMemoryHistoryNode> getMetrics(final String metricName, final Date from, final Date to,
+      final Boolean useStats) throws ClassCastException {
+    LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
+    if (historyListMapping.containsKey(metricName)) {
+
+      logger.debug("selecting snapshots within time frame");
+      synchronized (historyListMapping.get(metricName)) {
+        for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
+          if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
+            selectedLists.add(node);
+          }
+          if (node.getTimestamp().after(to)) {
+            break;
+          }
+        }
+      }
+
+      // selecting nodes if num of nodes > numInstances
+      if (useStats) {
+        statBasedSelectMetricHistory(selectedLists);
+      } else {
+        generalSelectMetricHistory(selectedLists);
+      }
+    }
+    cleanUsingTime(metricName, new Date());
+    return selectedLists;
+  }
+
+  /**
+   * filter snapshots using statistically significant points only
+   * @param selectedLists list of snapshots
+   */
+  private void statBasedSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists)
+      throws ClassCastException {
+    logger.debug("selecting snapshots which are far away from mean value");
+    DescriptiveStatistics descStats = getDescriptiveStatistics(selectedLists);
+    Double mean = descStats.getMean();
+    Double std = descStats.getStandardDeviation();
+
+    Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+    while (ite.hasNext()) {
+      InMemoryHistoryNode currentNode = ite.next();
+      double value = ((Number) currentNode.getValue()).doubleValue();
+      // remove all elements which lies in 95% value band
+      if (value < mean + standardDeviationFactor * std && value > mean - standardDeviationFactor * std) {
+        ite.remove();
+      }
+    }
+  }
+
+  private DescriptiveStatistics getDescriptiveStatistics(final LinkedList<InMemoryHistoryNode> selectedLists)
+      throws ClassCastException {
+    DescriptiveStatistics descStats = new DescriptiveStatistics();
+    for (InMemoryHistoryNode node : selectedLists) {
+      descStats.addValue(((Number) node.getValue()).doubleValue());
+    }
+    return descStats;
+  }
+
+  /**
+   * filter snapshots by evenly selecting points across the interval
+   * @param selectedLists list of snapshots
+   */
+  private void generalSelectMetricHistory(final LinkedList<InMemoryHistoryNode> selectedLists) {
+    logger.debug("selecting snapshots evenly from across the time interval");
+    if (selectedLists.size() > numInstances) {
+      double step = (double) selectedLists.size() / numInstances;
+      long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
+      Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();
+      while (ite.hasNext()) {
+        ite.next();
+        if (currentIndex == nextIndex) {
+          nextIndex = (long) Math.floor(numSelectedInstances * step + 0.5);
+          numSelectedInstances++;
+        } else {
+          ite.remove();
+        }
+        currentIndex++;
+      }
+    }
+  }
+
+  /**
+   * Remove snapshots to maintain reporting interval
+   * @param metricName Name of the metric
+   * @param firstAllowedDate End date of the interval
+   */
+  private void cleanUsingTime(final String metricName, final Date firstAllowedDate) {
+    if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
+      synchronized (historyListMapping.get(metricName)) {
+
+        InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
+        long localCopyOfTimeWindow = 0;
+
+        // go ahead for clean up using latest possible value of interval
+        // any interval change will not affect on going clean up
+        synchronized (this) {
+          localCopyOfTimeWindow = timeWindow;
+        }
+
+        // removing objects older than Interval time from firstAllowedDate
+        while (firstNode != null
+            && TimeUnit.MILLISECONDS.toMillis(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > localCopyOfTimeWindow) {
+          historyListMapping.get(metricName).removeFirst();
+          firstNode = historyListMapping.get(metricName).peekFirst();
+        }
+      }
+    }
+  }
+
+  /**
+   * Clear snapshot data structure
+   * {@inheritDoc}
+   * @see azkaban.metric.IMetricEmitter#purgeAllData()
+   */
+  @Override
+  public void purgeAllData() throws MetricException {
+    historyListMapping.clear();
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricException.java b/azkaban-common/src/main/java/azkaban/metric/MetricException.java
new file mode 100644
index 0000000..6decb4a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+/**
+ * Exception for Azkaban's Metric Component
+ */
+public class MetricException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public MetricException(String message) {
+    super(message);
+  }
+
+  public MetricException(Throwable cause) {
+    super(cause);
+  }
+
+  public MetricException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
new file mode 100644
index 0000000..4834361
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * Manager for access or updating metric related functionality of Azkaban
+ * MetricManager is responsible all handling all action requests from statsServlet in Exec server
+ * <p> Metric Manager 'has a' relationship with :-
+ * <ul>
+ * <li>all the metric Azkaban is tracking</li>
+ * <li>all the emitters Azkaban is supposed to report metrics</li>
+ * </ul></p>
+ */
+public class MetricReportManager {
+  /**
+   * Maximum number of metrics reporting threads
+   */
+  private static final int MAX_EMITTER_THREADS = 4;
+  private static final Logger logger = Logger.getLogger(MetricReportManager.class);
+
+  /**
+   * List of all the metrics that Azkaban is tracking
+   * Manager is not concerned with type of metric as long as it honors IMetric contracts
+   */
+  private List<IMetric<?>> metrics;
+
+  /**
+   * List of all the emitter listening all the metrics
+   * Manager is not concerned with how emitter is reporting value.
+   * Manager is only responsible to notify all emitters whenever an IMetric wants to be notified
+   */
+  private List<IMetricEmitter> metricEmitters;
+  private ExecutorService executorService;
+  // Singleton variable
+  private static volatile MetricReportManager instance = null;
+  private static volatile boolean isManagerEnabled;
+
+  private MetricReportManager() {
+    logger.debug("Instantiating Metric Manager");
+    executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
+    metrics = new ArrayList<IMetric<?>>();
+    metricEmitters = new LinkedList<IMetricEmitter>();
+    enableManager();
+  }
+
+  /**
+   * @return true, if we have Instantiated and enabled metric manager from Azkaban exec server
+   */
+  public static boolean isAvailable() {
+    return isInstantiated() && isManagerEnabled;
+  }
+
+  /**
+   * @return true, if we have Instantiated metric manager from Azkaban exec server
+   */
+  public static boolean isInstantiated() {
+    return instance != null;
+  }
+
+  /**
+   * Get a singleton object for Metric Manager
+   */
+  public static MetricReportManager getInstance() {
+    if (instance == null) {
+      synchronized (MetricReportManager.class) {
+        if (instance == null) {
+          logger.info("Instantiating MetricReportManager");
+          instance = new MetricReportManager();
+        }
+      }
+    }
+    return instance;
+  }
+
+  /***
+   * each element of metrics List is responsible to call this method and report metrics
+   * @param metric
+   */
+  public void reportMetric(final IMetric<?> metric) {
+    if (metric != null && isAvailable()) {
+      try {
+        final IMetric<?> metricSnapshot;
+        // take snapshot
+        synchronized (metric) {
+          metricSnapshot = metric.getSnapshot();
+        }
+        logger.debug(String.format("Submitting %s metric for metric emission pool", metricSnapshot.getName()));
+        // report to all emitters
+        for (final IMetricEmitter metricEmitter : metricEmitters) {
+          executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                metricEmitter.reportMetric(metricSnapshot);
+              } catch (Exception ex) {
+                logger.error(String.format("Failed to report %s metric due to ", metricSnapshot.getName()), ex);
+              }
+            }
+          });
+        }
+      } catch (CloneNotSupportedException ex) {
+        logger.error(String.format("Failed to take snapshot for %s metric", metric.getClass().getName()), ex);
+      }
+    }
+  }
+
+  /**
+   * Add a metric emitter to report metric
+   * @param emitter
+   */
+  public void addMetricEmitter(final IMetricEmitter emitter) {
+    metricEmitters.add(emitter);
+  }
+
+  /**
+   * remove a metric emitter
+   * @param emitter
+   */
+  public void removeMetricEmitter(final IMetricEmitter emitter) {
+    metricEmitters.remove(emitter);
+  }
+
+  /**
+   * Get all the metric emitters
+   * @return
+   */
+  public List<IMetricEmitter> getMetricEmitters() {
+    return metricEmitters;
+  }
+
+  /**
+   * Add a metric to be managed by Metric Manager
+   * @param metric
+   */
+  public void addMetric(final IMetric<?> metric) {
+    // metric null or already present
+    if(metric == null)
+      throw new IllegalArgumentException("Cannot add a null metric");
+
+    if (getMetricFromName(metric.getName()) == null) {
+      logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
+      metrics.add(metric);
+      metric.updateMetricManager(this);
+    } else {
+      logger.error("Failed to add metric");
+    }
+  }
+
+  /**
+   * Get metric object for a given metric name
+   * @param name metricName
+   * @return metric Object, if found. Otherwise null.
+   */
+  public IMetric<?> getMetricFromName(final String name) {
+    IMetric<?> metric = null;
+    if (name != null) {
+      for (IMetric<?> currentMetric : metrics) {
+        if (currentMetric.getName().equals(name)) {
+          metric = currentMetric;
+          break;
+        }
+      }
+    }
+    return metric;
+  }
+
+  /**
+   * Get all the emitters
+   * @return
+   */
+  public List<IMetric<?>> getAllMetrics() {
+    return metrics;
+  }
+
+  public void enableManager() {
+    logger.info("Enabling Metric Manager");
+    isManagerEnabled = true;
+  }
+
+  /**
+   * Disable Metric Manager and ask all emitters to purge all available data.
+   */
+  public void disableManager() {
+    logger.info("Disabling Metric Manager");
+    if (isManagerEnabled) {
+      isManagerEnabled = false;
+      for (IMetricEmitter emitter : metricEmitters) {
+        try {
+          emitter.purgeAllData();
+        } catch (MetricException ex) {
+          logger.error("Failed to purge data ", ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Shutdown execution service
+   * {@inheritDoc}
+   * @see java.lang.Object#finalize()
+   */
+  protected void finalize() {
+    executorService.shutdown();
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
new file mode 100644
index 0000000..cf1211d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Metrics tracked after every interval using timer
+ * @param <T> Type of Value of a given metric
+ */
+public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
+  private Timer timer;
+  protected long MAX_MILISEC_INTERVAL = 60 * 60 * 1000;
+  protected long MIN_MILISEC_INTERVAL = 3 * 1000;
+
+  /**
+   * @param metricName Name of metric
+   * @param metricType Metric type. For display purposes.
+   * @param initialValue Initial Value of a metric
+   * @param manager Metric Manager whom a metric will report to
+   * @param interval Time interval for metric tracking
+   * @throws MetricException
+   */
+  public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
+      long interval) throws MetricException {
+    super(metricName, metricType, initialValue, manager);
+    if(validateInterval(interval)) {
+      throw new MetricException("Invalid interval: Cannot instantiate timer");
+    }
+    timer = new Timer();
+    timer.schedule(getTimerTask(), interval, interval);
+  }
+
+  /**
+   * Get a TimerTask to reschedule Timer
+   * @return An anonymous TimerTask class
+   */
+  private TimerTask getTimerTask() {
+    final TimeBasedReportingMetric<T> lockObject = this;
+    TimerTask recurringReporting = new TimerTask() {
+      @Override
+      public void run() {
+        synchronized (lockObject) {
+          preTrackingEventMethod();
+          notifyManager();
+          postTrackingEventMethod();
+        }
+      }
+    };
+    return recurringReporting;
+  }
+
+  /**
+   * Method to change tracking interval
+   * @param interval
+   * @throws MetricException
+   */
+  public void updateInterval(final long interval) throws MetricException {
+    if(validateInterval(interval)) {
+      throw new MetricException("Invalid interval: Cannot update timer");
+    }
+    logger.debug(String.format("Updating tracking interval to %d milisecond for %s metric", interval, getName()));
+    timer.cancel();
+    timer = new Timer();
+    timer.schedule(getTimerTask(), interval, interval);
+  }
+
+  private boolean validateInterval(final long interval) {
+    return interval >= MIN_MILISEC_INTERVAL && interval <= MAX_MILISEC_INTERVAL;
+  }
+
+  /**
+   * This method is responsible for making any last minute update to value, if any
+   */
+  protected abstract void preTrackingEventMethod();
+
+  /**
+   * This method is responsible for making any post processing after tracking
+   */
+  protected abstract void postTrackingEventMethod();
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
new file mode 100644
index 0000000..4043cb7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/FakeMetric.java
@@ -0,0 +1,54 @@
+package azkaban.metric;
+
+/**
+ * Dummy Metric to test Azkaban Metrics
+ */
+public class FakeMetric extends AbstractMetric<Integer>{
+
+  public FakeMetric(MetricReportManager manager) {
+    super("FakeMetric", "int", 4, manager);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((metricManager == null) ? 0 : metricManager.hashCode());
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof FakeMetric))
+      return false;
+    FakeMetric other = (FakeMetric) obj;
+    if (metricManager == null) {
+      if (other.metricManager != null)
+        return false;
+    } else if (!metricManager.equals(other.metricManager))
+      return false;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (type == null) {
+      if (other.type != null)
+        return false;
+    } else if (!type.equals(other.type))
+      return false;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
new file mode 100644
index 0000000..45bfc5f
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metric/MetricManagerTest.java
@@ -0,0 +1,90 @@
+package azkaban.metric;
+
+import java.util.Date;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
+import azkaban.utils.Props;
+import static org.junit.Assert.*;
+
+/**
+ * Azkaban Metric Manager Tests
+ */
+public class MetricManagerTest {
+  MetricReportManager manager;
+  FakeMetric metric;
+  InMemoryMetricEmitter emitter;
+
+  @Before
+  public void setUp() throws Exception {
+    manager = MetricReportManager.getInstance();
+    metric = new FakeMetric(manager);
+    manager.addMetric(metric);
+    emitter = new InMemoryMetricEmitter(new Props());
+    manager.addMetricEmitter(emitter);
+  }
+
+  /**
+   * Test enable disable and status methods
+   */
+  @Test
+  public void managerStatusTest() {
+    assertNotNull("Singleton Failed to instantiate", manager);
+    assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+    manager.disableManager();
+    assertFalse("Failed to disable metric manager", MetricReportManager.isAvailable());
+    manager.enableManager();
+    assertTrue("Failed to enable metric manager", MetricReportManager.isAvailable());
+  }
+
+  /**
+   * Test adding and accessing metric methods
+   */
+  @Test
+  public void managerMetricMaintenanceTest() {
+    assertEquals("Failed to add metric", manager.getAllMetrics().size(), 1);
+    assertTrue("Failed to add metric", manager.getAllMetrics().contains(metric));
+    assertEquals("Failed to get metric by Name", manager.getMetricFromName("FakeMetric"), metric);
+  }
+
+  /**
+   * Test adding, removing and accessing metric emitter.
+   */
+  @Test
+  public void managerEmitterMaintenanceTest() {
+    assertTrue("Failed to add Emitter", manager.getMetricEmitters().contains(emitter));
+
+    int originalSize = manager.getMetricEmitters().size();
+    manager.removeMetricEmitter(emitter);
+    assertEquals("Failed to remove emitter", manager.getMetricEmitters().size(), originalSize - 1);
+    manager.addMetricEmitter(emitter);
+  }
+
+  /**
+   * Test metric reporting methods, including InMemoryMetricEmitter methods
+   * @throws Exception
+   */
+  @Test
+  public void managerEmitterHandlingTest() throws Exception {
+    emitter.purgeAllData();
+    Date from = new Date();
+    metric.notifyManager();
+
+    synchronized (this) {
+      try {
+        wait(2000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    Date to = new Date();
+    List<InMemoryHistoryNode> nodes = emitter.getMetrics("FakeMetric", from, to, false);
+
+    assertEquals("Failed to report metric", 1, nodes.size());
+    assertEquals("Failed to report metric", nodes.get(0).getValue(), 4);
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 89351ee..d68bd16 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -29,9 +29,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTimeZone;
-
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
@@ -41,7 +39,16 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
+import azkaban.execapp.metric.NumQueuedFlowMetric;
+import azkaban.execapp.metric.NumRunningFlowMetric;
+import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.jmx.JmxJettyServer;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.server.AzkabanServer;
@@ -49,17 +56,17 @@ import azkaban.server.ServerConstants;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
+
 public class AzkabanExecutorServer {
-  private static final Logger logger = Logger
-      .getLogger(AzkabanExecutorServer.class);
+  private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
   private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
 
   public static final String AZKABAN_HOME = "AZKABAN_HOME";
   public static final String DEFAULT_CONF_PATH = "conf";
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
-  public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
-      "azkaban.private.properties";
+  public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
   public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
+  public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
   public static final int DEFAULT_PORT_NUMBER = 12321;
 
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -104,16 +111,16 @@ public class AzkabanExecutorServer {
 
     root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
-    root.setAttribute(
-        ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+
+    root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
 
     executionLoader = createExecLoader(props);
     projectLoader = createProjectLoader(props);
-    runnerManager =
-        new FlowRunnerManager(props, executionLoader, projectLoader, this
-            .getClass().getClassLoader());
+    runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
 
     configureMBeanServer();
+    configureMetricReports();
 
     try {
       server.start();
@@ -125,6 +132,44 @@ public class AzkabanExecutorServer {
     logger.info("Azkaban Executor Server started on port " + portNumber);
   }
 
+  /**
+   * Configure Metric Reporting as per azkaban.properties settings
+   * @throws MetricException
+   */
+  private void configureMetricReports() throws MetricException {
+    Props props = getAzkabanProps();
+    if (props != null && props.getBoolean("executor.metric.reports", false)) {
+      logger.info("Starting to configure Metric Reports");
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
+      metricManager.addMetricEmitter(metricEmitter);
+
+      logger.info("Adding number of failed flow metric");
+      metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt(METRIC_INTERVAL
+          + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+
+      logger.info("Adding number of failed jobs metric");
+      metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt(METRIC_INTERVAL
+          + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+
+      logger.info("Adding number of running Jobs metric");
+      metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt(METRIC_INTERVAL
+          + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt(METRIC_INTERVAL + "default"))));
+
+      logger.info("Adding number of running flows metric");
+      metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
+          METRIC_INTERVAL  + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
+          props.getInt(METRIC_INTERVAL + "default"))));
+
+      logger.info("Adding number of queued flows metric");
+      metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
+          METRIC_INTERVAL + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+          props.getInt(METRIC_INTERVAL + "default"))));
+
+      logger.info("Completed configuring Metric Reports");
+    }
+  }
+
   private ExecutorLoader createExecLoader(Props props) {
     return new JdbcExecutorLoader(props);
   }
@@ -161,7 +206,7 @@ public class AzkabanExecutorServer {
 
   /**
    * Returns the currently executing executor server, if one exists.
-   * 
+   *
    * @return
    */
   public static AzkabanExecutorServer getApp() {
@@ -224,16 +269,14 @@ public class AzkabanExecutorServer {
       return null;
     }
 
-    if (!new File(azkabanHome).isDirectory()
-        || !new File(azkabanHome).canRead()) {
+    if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
       logger.error(azkabanHome + " is not a readable directory.");
       return null;
     }
 
     File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
     if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
-      logger
-          .error(azkabanHome + " does not contain a readable conf directory.");
+      logger.error(azkabanHome + " does not contain a readable conf directory.");
       return null;
     }
 
@@ -251,8 +294,7 @@ public class AzkabanExecutorServer {
    * @return
    */
   private static Props loadAzkabanConfigurationFromDirectory(File dir) {
-    File azkabanPrivatePropsFile =
-        new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+    File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
     File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
 
     Props props = null;
@@ -270,9 +312,7 @@ public class AzkabanExecutorServer {
     } catch (FileNotFoundException e) {
       logger.error("File not found. Could not load azkaban config file", e);
     } catch (IOException e) {
-      logger.error(
-          "File found, but error reading. Could not load azkaban config file",
-          e);
+      logger.error("File found, but error reading. Could not load azkaban config file", e);
     }
 
     return props;
@@ -306,8 +346,7 @@ public class AzkabanExecutorServer {
       logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
       registeredMBeans.add(mbeanName);
     } catch (Exception e) {
-      logger.error("Error registering mbean " + mbeanClass.getCanonicalName(),
-          e);
+      logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
     }
 
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index f73a30a..e98f5f2 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,8 @@ import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.metric.NumFailedJobMetric;
+import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
@@ -52,19 +54,20 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.SwapQueue;
 
+
 /**
  * Class that handles the running of a ExecutableFlow DAG
- * 
+ *
  */
 public class FlowRunner extends EventHandler implements Runnable {
-  private static final Layout DEFAULT_LAYOUT = new PatternLayout(
-      "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+  private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
   // We check update every 5 minutes, just in case things get stuck. But for the
   // most part, we'll be idling.
   private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -93,8 +96,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private final JobTypeManager jobtypeManager;
 
   private JobRunnerEventListener listener = new JobRunnerEventListener();
-  private Set<JobRunner> activeJobRunners = Collections
-      .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+  private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
 
   // Thread safe swap queue for finishedExecutions.
   private SwapQueue<ExecutableNode> finishedNodes;
@@ -122,23 +124,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Constructor. This will create its own ExecutorService for thread pools
-   * 
+   *
    * @param flow
    * @param executorLoader
    * @param projectLoader
    * @param jobtypeManager
    * @throws ExecutorManagerException
    */
-  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, JobTypeManager jobtypeManager)
-      throws ExecutorManagerException {
+  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+      JobTypeManager jobtypeManager) throws ExecutorManagerException {
     this(flow, executorLoader, projectLoader, jobtypeManager, null);
   }
 
   /**
    * Constructor. If executorService is null, then it will create it's own for
    * thread pools.
-   * 
+   *
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -146,9 +147,8 @@ public class FlowRunner extends EventHandler implements Runnable {
    * @param executorService
    * @throws ExecutorManagerException
    */
-  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, JobTypeManager jobtypeManager,
-      ExecutorService executorService) throws ExecutorManagerException {
+  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+      JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
     this.executorLoader = executorLoader;
@@ -210,23 +210,18 @@ public class FlowRunner extends EventHandler implements Runnable {
       runFlow();
     } catch (Throwable t) {
       if (logger != null) {
-        logger
-            .error(
-                "An error has occurred during the running of the flow. Quiting.",
-                t);
+        logger.error("An error has occurred during the running of the flow. Quiting.", t);
       }
       flow.setStatus(Status.FAILED);
     } finally {
       if (watcher != null) {
         logger.info("Watcher is attached. Stopping watcher.");
         watcher.stopWatcher();
-        logger
-            .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+        logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
       }
 
       flow.setEndTime(System.currentTimeMillis());
-      logger.info("Setting end time for flow " + execId + " to "
-          + System.currentTimeMillis());
+      logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
       closeLogger();
 
       updateFlow();
@@ -251,8 +246,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     // If there are flow overrides, we apply them now.
-    Map<String, String> flowParam =
-        flow.getExecutionOptions().getFlowParameters();
+    Map<String, String> flowParam = flow.getExecutionOptions().getFlowParameters();
     if (flowParam != null && !flowParam.isEmpty()) {
       commonFlowProps = new Props(commonFlowProps, flowParam);
     }
@@ -265,11 +259,9 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(logger);
     }
 
-    logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
-        + projectId + " version:" + version);
+    logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
     if (pipelineExecId != null) {
-      logger.info("Running simulateously with " + pipelineExecId
-          + ". Pipelining level " + pipelineLevel);
+      logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
     }
 
     // The current thread is used for interrupting blocks
@@ -279,10 +271,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private void updateFlowReference() throws ExecutorManagerException {
     logger.info("Update active reference");
-    if (!executorLoader.updateExecutableReference(execId,
-        System.currentTimeMillis())) {
-      throw new ExecutorManagerException(
-          "The executor reference doesn't exist. May have been killed prematurely.");
+    if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
+      throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
     }
   }
 
@@ -356,7 +346,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Main method that executes the jobs.
-   * 
+   *
    * @throws Exception
    */
   private void runFlow() throws Exception {
@@ -405,8 +395,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     resetFailedState(this.flow, retryJobs);
 
     for (ExecutableNode node : retryJobs) {
-      if (node.getStatus() == Status.READY
-          || node.getStatus() == Status.DISABLED) {
+      if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
         runReadyJob(node);
       } else if (node.getStatus() == Status.SUCCEEDED) {
         for (String outNodeId : node.getOutNodes()) {
@@ -475,8 +464,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Instant kill or skip if necessary.
     boolean jobsRun = false;
     for (ExecutableNode node : nodesToCheck) {
-      if (Status.isStatusFinished(node.getStatus())
-          || Status.isStatusRunning(node.getStatus())) {
+      if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
         // Really shouldn't get in here.
         continue;
       }
@@ -493,8 +481,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   private boolean runReadyJob(ExecutableNode node) throws IOException {
-    if (Status.isStatusFinished(node.getStatus())
-        || Status.isStatusRunning(node.getStatus())) {
+    if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
       return false;
     }
 
@@ -504,8 +491,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (nextNodeStatus == Status.CANCELLED) {
-      logger.info("Cancelling '" + node.getNestedId()
-          + "' due to prior errors.");
+      logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
       node.cancelNode(System.currentTimeMillis());
       finishExecutableNode(node);
     } else if (nextNodeStatus == Status.SKIPPED) {
@@ -537,8 +523,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (node.getRetries() > node.getAttempt()) {
-      logger.info("Job '" + node.getId() + "' will be retried. Attempt "
-          + node.getAttempt() + " of " + node.getRetries());
+      logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of "
+          + node.getRetries());
       node.setDelayedExecution(node.getRetryBackoff());
       node.resetForRetry();
       return true;
@@ -579,8 +565,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     for (String end : flow.getEndNodes()) {
       ExecutableNode node = flow.getExecutableNode(end);
 
-      if (node.getStatus() == Status.KILLED
-          || node.getStatus() == Status.FAILED
+      if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED
           || node.getStatus() == Status.CANCELLED) {
         succeeded = false;
       }
@@ -602,22 +587,19 @@ public class FlowRunner extends EventHandler implements Runnable {
     flow.setUpdateTime(System.currentTimeMillis());
     long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
     switch (flow.getStatus()) {
-    case FAILED_FINISHING:
-      logger.info("Setting flow '" + id + "' status to FAILED in "
-          + durationSec + " seconds");
-      flow.setStatus(Status.FAILED);
-      break;
-    case FAILED:
-    case KILLED:
-    case CANCELLED:
-    case FAILED_SUCCEEDED:
-      logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
-          + " in " + durationSec + " seconds");
-      break;
-    default:
-      flow.setStatus(Status.SUCCEEDED);
-      logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
-          + " in " + durationSec + " seconds");
+      case FAILED_FINISHING:
+        logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
+        flow.setStatus(Status.FAILED);
+        break;
+      case FAILED:
+      case KILLED:
+      case CANCELLED:
+      case FAILED_SUCCEEDED:
+        logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+        break;
+      default:
+        flow.setStatus(Status.SUCCEEDED);
+        logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
     }
 
     // If the finalized flow is actually the top level flow, than we finish
@@ -678,13 +660,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     // load the override props if any
     try {
-      props =
-          projectLoader.fetchProjectProperty(flow.getProjectId(),
-              flow.getVersion(), node.getId() + ".jor");
+      props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId() + ".jor");
     } catch (ProjectManagerException e) {
       e.printStackTrace();
-      logger.error("Error loading job override property for job "
-          + node.getId());
+      logger.error("Error loading job override property for job " + node.getId());
     }
 
     File path = new File(execDir, source);
@@ -694,8 +673,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         props = new Props(null, path);
       } catch (IOException e) {
         e.printStackTrace();
-        logger.error("Error loading job file " + source + " for job "
-            + node.getId());
+        logger.error("Error loading job file " + source + " for job " + node.getId());
       }
     }
     // setting this fake source as this will be used to determine the location
@@ -725,15 +703,14 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Determines what the state of the next node should be. Returns null if the
    * node should not be run.
-   * 
+   *
    * @param node
    * @return
    */
   public Status getImpliedStatus(ExecutableNode node) {
     // If it's running or finished with 'SUCCEEDED', than don't even
     // bother starting this job.
-    if (Status.isStatusRunning(node.getStatus())
-        || node.getStatus() == Status.SUCCEEDED) {
+    if (Status.isStatusRunning(node.getStatus()) || node.getStatus() == Status.SUCCEEDED) {
       return null;
     }
 
@@ -748,8 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
       if (!Status.isStatusFinished(depStatus)) {
         return null;
-      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
-          || depStatus == Status.KILLED) {
+      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
         // We propagate failures as KILLED states.
         shouldKill = true;
       }
@@ -757,16 +733,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     // If it's disabled but ready to run, we want to make sure it continues
     // being disabled.
-    if (node.getStatus() == Status.DISABLED
-        || node.getStatus() == Status.SKIPPED) {
+    if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
       return Status.SKIPPED;
     }
 
     // If the flow has failed, and we want to finish only the currently running
     // jobs, we just
     // kill everything else. We also kill, if the flow has been cancelled.
-    if (flowFailed
-        && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+    if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
       return Status.CANCELLED;
     } else if (shouldKill || isKilled()) {
       return Status.CANCELLED;
@@ -780,8 +754,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     Props previousOutput = null;
     // Iterate the in nodes again and create the dependencies
     for (String dependency : node.getInNodes()) {
-      Props output =
-          node.getParentFlow().getExecutableNode(dependency).getOutputProps();
+      Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
       if (output != null) {
         output = Props.clone(output);
         output.setParent(previousOutput);
@@ -796,9 +769,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Load job file.
     File path = new File(execDir, node.getJobSource());
 
-    JobRunner jobRunner =
-        new JobRunner(node, path.getParentFile(), executorLoader,
-            jobtypeManager);
+    JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
     if (watcher != null) {
       jobRunner.setPipeline(watcher, pipelineLevel);
     }
@@ -810,9 +781,31 @@ public class FlowRunner extends EventHandler implements Runnable {
     jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
     jobRunner.addListener(listener);
 
+    configureJobLevelMetrics(jobRunner);
+
     return jobRunner;
   }
 
+  /**
+   * Configure Azkaban metrics tracking for a new jobRunner instance
+   * @param jobRunner
+   */
+  private void configureJobLevelMetrics(JobRunner jobRunner) {
+    logger.info("Configuring Azkaban metrics tracking for jobrunner object");
+    if (MetricReportManager.isAvailable()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+
+      //Adding NumRunningJobMetric listener
+      jobRunner.addListener((NumRunningJobMetric) metricManager
+          .getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME));
+
+      //Adding NumFailedJobMetric listener
+      jobRunner.addListener((NumFailedJobMetric) metricManager
+          .getMetricFromName(NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME));
+
+    }
+  }
+
   public void pause(String user) {
     synchronized (mainSyncObj) {
       if (!flowFinished) {
@@ -872,8 +865,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       if (watcher != null) {
         logger.info("Watcher is attached. Stopping watcher.");
         watcher.stopWatcher();
-        logger
-            .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+        logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
       }
 
       logger.info("Killing " + activeJobRunners.size() + " jobs.");
@@ -891,8 +883,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-  private void resetFailedState(ExecutableFlowBase flow,
-      List<ExecutableNode> nodesToRetry) {
+  private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
     // bottom up
     LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
     for (String id : flow.getEndNodes()) {
@@ -921,24 +912,24 @@ public class FlowRunner extends EventHandler implements Runnable {
       } else if (node instanceof ExecutableFlowBase) {
         ExecutableFlowBase base = (ExecutableFlowBase) node;
         switch (base.getStatus()) {
-        case CANCELLED:
-          node.setStatus(Status.READY);
-          node.setEndTime(-1);
-          node.setStartTime(-1);
-          node.setUpdateTime(currentTime);
-          // Break out of the switch. We'll reset the flow just like a normal
-          // node
-          break;
-        case KILLED:
-        case FAILED:
-        case FAILED_FINISHING:
-          resetFailedState(base, nodesToRetry);
-          continue;
-        default:
-          // Continue the while loop. If the job is in a finished state that's
-          // not
-          // a failure, we don't want to reset the job.
-          continue;
+          case CANCELLED:
+            node.setStatus(Status.READY);
+            node.setEndTime(-1);
+            node.setStartTime(-1);
+            node.setUpdateTime(currentTime);
+            // Break out of the switch. We'll reset the flow just like a normal
+            // node
+            break;
+          case KILLED:
+          case FAILED:
+          case FAILED_FINISHING:
+            resetFailedState(base, nodesToRetry);
+            continue;
+          default:
+            // Continue the while loop. If the job is in a finished state that's
+            // not
+            // a failure, we don't want to reset the job.
+            continue;
         }
       } else if (node.getStatus() == Status.CANCELLED) {
         // Not a flow, but killed
@@ -946,16 +937,13 @@ public class FlowRunner extends EventHandler implements Runnable {
         node.setStartTime(-1);
         node.setEndTime(-1);
         node.setUpdateTime(currentTime);
-      } else if (node.getStatus() == Status.FAILED
-          || node.getStatus() == Status.KILLED) {
+      } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
         node.resetForRetry();
         nodesToRetry.add(node);
       }
 
-      if (!(node instanceof ExecutableFlowBase)
-          && node.getStatus() != oldStatus) {
-        logger.info("Resetting job '" + node.getNestedId() + "' from "
-            + oldStatus + " to " + node.getStatus());
+      if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
+        logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
       }
 
       for (String inId : node.getInNodes()) {
@@ -977,16 +965,14 @@ public class FlowRunner extends EventHandler implements Runnable {
       // start node has not.
       for (String id : flow.getStartNodes()) {
         ExecutableNode node = flow.getExecutableNode(id);
-        if (node.getStatus() == Status.READY
-            || node.getStatus() == Status.DISABLED) {
+        if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
           nodesToRetry.add(node);
         }
       }
     }
     flow.setUpdateTime(System.currentTimeMillis());
     flow.setEndTime(-1);
-    logger.info("Resetting flow '" + flow.getNestedId() + "' from "
-        + oldFlowState + " to " + flow.getStatus());
+    logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
   }
 
   private void interrupt() {
@@ -1007,14 +993,13 @@ public class FlowRunner extends EventHandler implements Runnable {
         ExecutableNode node = runner.getNode();
         long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (mainSyncObj) {
-          logger.info("Job " + node.getNestedId() + " finished with status "
-              + node.getStatus() + " in " + seconds + " seconds");
+          logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds
+              + " seconds");
 
           // Cancellation is handled in the main thread, but if the flow is
           // paused, the main thread is paused too.
           // This unpauses the flow for cancellation.
-          if (flowPaused && node.getStatus() == Status.FAILED
-              && failureAction == FailureAction.CANCEL_ALL) {
+          if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
             flowPaused = false;
           }
 
@@ -1057,8 +1042,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     ExecutableNode node = flow.getExecutableNodePath(jobId);
     File path = new File(execDir, node.getJobSource());
 
-    String attachmentFileName =
-        JobRunner.createAttachmentFileName(node, attempt);
+    String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
     File attachmentFile = new File(path.getParentFile(), attachmentFileName);
     if (!attachmentFile.exists()) {
       return null;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 54d82e8..d052d99 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -42,12 +42,15 @@ import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -60,26 +63,26 @@ import azkaban.utils.TrackingThreadPool;
 
 /**
  * Execution manager for the server side execution.
- * 
+ *
  * When a flow is submitted to FlowRunnerManager, it is the
  * {@link Status.PREPARING} status. When a flow is about to be executed by
  * FlowRunner, its status is updated to {@link Status.RUNNING}
- * 
+ *
  * Two main data structures are used in this class to maintain flows.
- * 
+ *
  * runningFlows: this is used as a bookkeeping for submitted flows in
  * FlowRunnerManager. It has nothing to do with the executor service that is
  * used to execute the flows. This bookkeeping is used at the time of canceling
  * or killing a flow. The flows in this data structure is removed in the
  * handleEvent method.
- * 
+ *
  * submittedFlows: this is used to keep track the execution of the flows, so it
  * has the mapping between a Future<?> and an execution id. This would allow us
  * to find out the execution ids of the flows that are in the Status.PREPARING
  * status. The entries in this map is removed once the flow execution is
  * completed.
- * 
- * 
+ *
+ *
  */
 public class FlowRunnerManager implements EventListener,
     ThreadPoolExecutingListener {
@@ -483,6 +486,8 @@ public class FlowRunnerManager implements EventListener,
         .setValidateProxyUser(validateProxyUser)
         .setNumJobThreads(numJobThreads).addListener(this);
 
+    configureFlowLevelMetrics(runner);
+
     // Check again.
     if (runningFlows.containsKey(execId)) {
       throw new ExecutorManagerException("Execution " + execId
@@ -508,6 +513,21 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
+  /**
+   * Configure Azkaban metrics tracking for a new flowRunner instance
+   * @param flowRunner
+   */
+  private void configureFlowLevelMetrics(FlowRunner flowRunner) {
+    logger.info("Configuring Azkaban metrics tracking for flow runner object");
+
+    if (MetricReportManager.isAvailable()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      //Adding NumFailedFlow Metric listener
+      flowRunner.addListener((NumFailedFlowMetric) metricManager
+          .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
+    }
+  }
+
   private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
     int execId = flow.getExecutionId();
     File execPath = new File(executionDirectory, String.valueOf(execId));
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
new file mode 100644
index 0000000..b77c74f
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of failed flows in between the tracking events
+ */
+public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+  public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
+  private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
+
+  public NumFailedFlowMetric(MetricReportManager manager, long interval) throws MetricException {
+    super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumFailedJobMetric");
+  }
+
+  /**
+  * Listen for events to maintain correct value of number of failed flows
+  * {@inheritDoc}
+  * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+  */
+  @Override
+  public synchronized void handleEvent(Event event) {
+    if (event.getType() == Type.FLOW_FINISHED) {
+      FlowRunner runner = (FlowRunner) event.getRunner();
+      if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
+        value = value + 1;
+      }
+    }
+  }
+
+  @Override
+  protected void preTrackingEventMethod() {
+    // Nothing to finalize before tracking event
+  }
+
+  @Override
+  protected synchronized void postTrackingEventMethod() {
+    value = 0;
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
new file mode 100644
index 0000000..bb463d4
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of failed jobs in between the tracking events
+ */
+public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+  public static final String NUM_FAILED_JOB_METRIC_NAME = "NumFailedJobMetric";
+  private static final String NUM_FAILED_JOB_METRIC_TYPE = "uint16";
+
+  public NumFailedJobMetric(MetricReportManager manager, long interval) throws MetricException {
+    super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumFailedJobMetric");
+  }
+
+  /**
+   * Listen for events to maintain correct value of number of failed jobs
+   * {@inheritDoc}
+   * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+   */
+  @Override
+  public synchronized void handleEvent(Event event) {
+    JobRunner runner = (JobRunner) event.getRunner();
+    if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
+      value = value + 1;
+    }
+  }
+
+  @Override
+  protected void preTrackingEventMethod() {
+    // Nothing to finalize before tracking event
+  }
+
+  @Override
+  protected synchronized void postTrackingEventMethod() {
+    value = 0;
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
new file mode 100644
index 0000000..c44917a
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of queued flows in Azkaban exec server
+ */
+public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
+  public static final String NUM_QUEUED_FLOW_METRIC_NAME = "NumQueuedFlowMetric";
+  private static final String NUM_QUEUED_FLOW_METRIC_TYPE = "uint16";
+
+  private FlowRunnerManager flowManager;
+
+  /**
+   * @param flowRunnerManager Flow runner manager
+   * @param manager metric report manager
+   * @param interval reporting interval
+   * @throws MetricException
+   */
+  public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
+    super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumQueuedFlowMetric");
+    flowManager = flowRunnerManager;
+  }
+
+  /**
+   * Update value using flow manager
+   * {@inheritDoc}
+   * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
+   */
+  @Override
+  protected synchronized void preTrackingEventMethod() {
+    value = flowManager.getNumQueuedFlows();
+  }
+
+  @Override
+  protected void postTrackingEventMethod() {
+    // nothing to post process
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
new file mode 100644
index 0000000..d7d09cb
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of running flows in Azkaban exec server
+ */
+public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
+  public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
+  private static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
+
+  private FlowRunnerManager flowManager;
+
+  /**
+   * @param flowRunnerManager Flow runner manager
+   * @param manager metric report manager
+   * @param interval reporting interval
+   * @throws MetricException
+   */
+  public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) throws MetricException {
+    super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumRunningFlowMetric");
+    flowManager = flowRunnerManager;
+  }
+
+  /**
+   * Update value using flow manager
+   * {@inheritDoc}
+   * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
+   */
+  @Override
+  protected synchronized void preTrackingEventMethod() {
+    value = flowManager.getNumRunningFlows();
+  }
+
+  @Override
+  protected void postTrackingEventMethod() {
+    // nothing to post process
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
new file mode 100644
index 0000000..84ebfd4
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.metric.MetricException;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of running jobs in Azkaban exec server
+ */
+public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+  public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
+  private static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
+
+  /**
+   * @param manager metric manager
+   * @param interval reporting interval
+   * @throws MetricException
+   */
+  public NumRunningJobMetric(MetricReportManager manager, long interval) throws MetricException {
+    super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumRunningJobMetric");
+  }
+
+  /**
+   * Listen for events to maintain correct value of number of running jobs
+   * {@inheritDoc}
+   * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+   */
+  @Override
+  public synchronized void handleEvent(Event event) {
+    if (event.getType() == Type.JOB_STARTED) {
+      value = value + 1;
+    } else if (event.getType() == Type.JOB_FINISHED) {
+      value = value - 1;
+    }
+  }
+
+  @Override
+  protected void preTrackingEventMethod() {
+    // nothing to finalize value is already updated
+  }
+
+  @Override
+  protected void postTrackingEventMethod() {
+    // nothing to post process
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
new file mode 100644
index 0000000..152a3aa
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ConnectorParams;
+import azkaban.metric.IMetric;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
+import azkaban.server.HttpRequestUtils;
+import azkaban.server.ServerConstants;
+import azkaban.utils.JSONUtils;
+
+
+/**
+ * Servlet to communicate with Azkaban exec server
+ * This servlet get requests from stats servlet in Azkaban Web server
+ */
+public class StatsServlet extends HttpServlet implements ConnectorParams {
+  private static final long serialVersionUID = 2L;
+  private static final Logger logger = Logger.getLogger(StatsServlet.class);
+
+  public boolean hasParam(HttpServletRequest request, String param) {
+    return HttpRequestUtils.hasParam(request, param);
+  }
+
+  public String getParam(HttpServletRequest request, String name) throws ServletException {
+    return HttpRequestUtils.getParam(request, name);
+  }
+
+  public Boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException {
+    return HttpRequestUtils.getBooleanParam(request, name);
+  }
+
+  public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+    return HttpRequestUtils.getLongParam(request, name);
+  }
+
+  /**
+   * Handle all get request to Stats Servlet
+   * {@inheritDoc}
+   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
+   */
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    Map<String, Object> ret = new HashMap<String, Object>();
+
+    if (hasParam(req, ACTION_PARAM)) {
+      String action = getParam(req, ACTION_PARAM);
+      if (action.equals(STATS_SET_REPORTINGINTERVAL)) {
+        handleChangeMetricInterval(req, ret);
+      } else if (action.equals(STATS_SET_CLEANINGINTERVAL)) {
+        handleChangeCleaningInterval(req, ret);
+      } else if (action.equals(STATS_SET_MAXREPORTERPOINTS)) {
+        handleChangeEmitterPoints(req, ret);
+      } else if (action.equals(STATS_GET_ALLMETRICSNAME)) {
+        handleGetAllMMetricsName(req, ret);
+      } else if (action.equals(STATS_GET_METRICHISTORY)) {
+        handleGetMetricHistory(req, ret);
+      } else if (action.equals(STATS_SET_ENABLEMETRICS)) {
+        handleChangeManagerStatusRequest(req, ret, true);
+      } else if (action.equals(STATS_SET_DISABLEMETRICS)) {
+        handleChangeManagerStatusRequest(req, ret, false);
+      } else {
+        ret.put(RESPONSE_ERROR, "Invalid action");
+      }
+    }
+
+    JSONUtils.toJSON(ret, resp.getOutputStream(), true);
+  }
+
+  /**
+   * enable or disable metric Manager
+   * A disable will also purge all data from all metric emitters
+   */
+  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret,
+      boolean enableMetricManager) {
+    try {
+      logger.info("Updating metric manager status");
+      if ((enableMetricManager && MetricReportManager.isInstantiated()) || MetricReportManager.isAvailable()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        if (enableMetricManager) {
+          metricManager.enableManager();
+        } else {
+          metricManager.disableManager();
+        }
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
+      }
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  /**
+   * Update number of display snapshots for /stats graphs
+   */
+  private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
+    try {
+      long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
+      if (MetricReportManager.isAvailable()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+        memoryEmitter.setReportingInstances(numInstance);
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
+      }
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  /**
+   * Update InMemoryMetricEmitter interval to maintain metric snapshots
+   */
+  private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
+    try {
+      long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
+      if (MetricReportManager.isAvailable()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+        memoryEmitter.setReportingInterval(newInterval);
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
+      }
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  /**
+   * Get metric snapshots for a metric and date specification
+   * @throws ServletException
+   */
+  private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+    if (MetricReportManager.isAvailable()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+
+      // if we have a memory emitter
+      if (memoryEmitter != null) {
+        try {
+          List<InMemoryHistoryNode> result =
+              memoryEmitter.getMetrics(getParam(req, STATS_MAP_METRICNAMEPARAM),
+                  parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)),
+                  getBooleanParam(req, STATS_MAP_METRICRETRIEVALMODE));
+
+          if (result != null && result.size() > 0) {
+            ret.put("data", result);
+          } else {
+            ret.put(RESPONSE_ERROR, "No metric stats available");
+          }
+
+        } catch (ParseException ex) {
+          ret.put(RESPONSE_ERROR, "Invalid Date filter");
+        }
+      } else {
+        ret.put(RESPONSE_ERROR, "InMemoryMetricEmitter not instantiated");
+      }
+    } else {
+      ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
+    }
+  }
+
+  /**
+   * Get InMemoryMetricEmitter, if available else null
+   */
+  private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
+    InMemoryMetricEmitter memoryEmitter = null;
+    for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
+      if (emitter instanceof InMemoryMetricEmitter) {
+        memoryEmitter = (InMemoryMetricEmitter) emitter;
+        break;
+      }
+    }
+    return memoryEmitter;
+  }
+
+  /**
+   * Get all the metrics tracked by metric manager
+   */
+  private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
+    if (MetricReportManager.isAvailable()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      List<IMetric<?>> result = metricManager.getAllMetrics();
+      if (result.size() == 0) {
+        ret.put(RESPONSE_ERROR, "No Metric being tracked");
+      } else {
+        List<String> metricNames = new LinkedList<String>();
+        for(IMetric<?> metric: result) {
+          metricNames.add(metric.getName());
+        }
+        ret.put("data", metricNames);
+      }
+    } else {
+      ret.put(RESPONSE_ERROR, "MetricReportManager is not available");
+    }
+  }
+
+  /**
+   * Update tracking interval for a given metrics
+   * @throws ServletException
+   */
+  private void handleChangeMetricInterval(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
+    try {
+      String metricName = getParam(req, STATS_MAP_METRICNAMEPARAM);
+      long newInterval = getLongParam(req, STATS_MAP_REPORTINGINTERVAL);
+      if (MetricReportManager.isAvailable()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        TimeBasedReportingMetric<?> metric = (TimeBasedReportingMetric<?>) metricManager.getMetricFromName(metricName);
+        metric.updateInterval(newInterval);
+        ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      } else {
+        ret.put(RESPONSE_ERROR, "MetricManager is not available");
+      }
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  private Date parseDate(String date) throws ParseException {
+    DateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz");
+    return format.parse(date);
+  }
+}
diff --git a/azkaban-execserver/src/package/conf/azkaban.properties b/azkaban-execserver/src/package/conf/azkaban.properties
index fd27529..79e20e1 100644
--- a/azkaban-execserver/src/package/conf/azkaban.properties
+++ b/azkaban-execserver/src/package/conf/azkaban.properties
@@ -24,3 +24,7 @@ executor.flow.threads=30
 # JMX stats
 jetty.connector.stats=true
 executor.connector.stats=true
+
+# uncomment to enable inmemory stats for azkaban
+#executor.metric.reports=true
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file
diff --git a/azkaban-soloserver/src/package/conf/azkaban.properties b/azkaban-soloserver/src/package/conf/azkaban.properties
index 386cd43..325fd12 100644
--- a/azkaban-soloserver/src/package/conf/azkaban.properties
+++ b/azkaban-soloserver/src/package/conf/azkaban.properties
@@ -46,3 +46,7 @@ lockdown.create.projects=false
 # JMX stats
 jetty.connector.stats=true
 executor.connector.stats=true
+
+# uncomment to enable inmemory stats for azkaban
+#executor.metric.reports=true
+#executor.metric.milisecinterval.default=60000
\ No newline at end of file
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index b4140cb..92e8fd3 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -41,9 +41,7 @@ import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
 import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-
 import org.joda.time.DateTimeZone;
-
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
@@ -96,6 +94,7 @@ import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
 import azkaban.webapp.plugin.TriggerPlugin;
 import azkaban.webapp.plugin.ViewerPlugin;
@@ -772,6 +771,7 @@ public class AzkabanWebServer extends AzkabanServer {
     root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
+    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
 
     ServletHolder restliHolder = new ServletHolder(new RestliServlet());
     restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
new file mode 100644
index 0000000..6d14839
--- /dev/null
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import azkaban.executor.ConnectorParams;
+import azkaban.executor.ExecutorManager;
+import azkaban.server.session.Session;
+import azkaban.user.Permission;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.utils.Pair;
+import azkaban.webapp.AzkabanWebServer;
+
+/**
+ * User facing servlet for Azkaban default metric display
+ */
+public class StatsServlet extends LoginAbstractAzkabanServlet {
+  private static final long serialVersionUID = 1L;
+  private UserManager userManager;
+  private ExecutorManager execManager;
+
+  @Override
+  public void init(ServletConfig config) throws ServletException {
+    super.init(config);
+    AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
+    execManager = server.getExecutorManager();
+  }
+
+  @Override
+  protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+      IOException {
+    if (hasParam(req, ConnectorParams.ACTION_PARAM)) {
+      handleAJAXAction(req, resp, session);
+    } else {
+      handleStatePageLoad(req, resp, session);
+    }
+  }
+
+  private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
+      throws ServletException, IOException {
+    HashMap<String, Object> ret = new HashMap<String, Object>();
+    String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
+
+    if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
+      handleGetMetricHistory(req, ret, session.getUser());
+    } else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+    }
+
+    writeJSON(resp, ret);
+  }
+
+  /**
+   * Generic method to facilitate actionName action using Azkaban exec server
+   * @param actionName  Name of the action
+   */
+  private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+      throws ServletException, IOException {
+    Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
+    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
+    } else {
+      ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+    }
+  }
+
+  /**
+   * Get metric snapshots for a metric and date specification
+   * @throws ServletException
+   */
+  private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
+      throws IOException, ServletException {
+    Map<String, Object> result =
+        execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
+    } else {
+      ret.put("data", result.get("data"));
+    }
+  }
+
+  /**
+   *
+   */
+  private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
+      throws ServletException {
+    Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/statsPage.vm");
+    if (!hasPermission(session.getUser(), Permission.Type.METRICS)) {
+      page.add("errorMsg", "User " + session.getUser().getUserId() + " has no permission.");
+      page.render();
+      return;
+    }
+
+    try {
+      Map<String, Object> result =
+          execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        page.add("metricList", result.get("data"));
+      }
+    } catch (IOException e) {
+      page.add("errorMsg", "Failed to get a response from Azkaban exec server");
+    }
+
+    page.render();
+  }
+
+  @Override
+  protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException,
+      IOException {
+  }
+
+  protected boolean hasPermission(User user, Permission.Type type) {
+    for (String roleName : user.getRoles()) {
+      Role role = userManager.getRole(roleName);
+      if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Parse all Http request params
+   * @return
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private Pair<String, String>[] getAllParams(HttpServletRequest req) {
+    List<Pair<String, String>> allParams = new LinkedList<Pair<String, String>>();
+
+    Iterator it = req.getParameterMap().entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry pairs = (Map.Entry) it.next();
+      for (Object value : (String[]) pairs.getValue()) {
+        allParams.add(new Pair<String, String>((String) pairs.getKey(), (String) value));
+      }
+    }
+
+    return allParams.toArray(new Pair[allParams.size()]);
+  }
+}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
new file mode 100644
index 0000000..4596ac2
--- /dev/null
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -0,0 +1,122 @@
+#*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html lang="en">
+   <head>
+      #parse("azkaban/webapp/servlet/velocity/style.vm")
+      #parse("azkaban/webapp/servlet/velocity/javascript.vm")
+      <link rel="stylesheet" type="text/css" href="${context}/css/bootstrap-datetimepicker.css" />
+      <script type="text/javascript" src="${context}/js/raphael.min.js"></script>
+      <script type="text/javascript" src="${context}/js/morris.min.js"></script>
+      <script type="text/javascript" src="${context}/js/moment.min.js"></script>
+      <script type="text/javascript" src="${context}/js/bootstrap-datetimepicker.min.js"></script>
+      <script type="text/javascript">
+         var contextURL = "${context}";
+         var currentTime = ${currentTime};
+         var timezone = "${timezone}";
+
+         function refreshMetricChart() {
+               var requestURL = '/stats';
+               var requestData = {
+                 'action': 'getMetricHistory',
+                 'from': new Date($('#datetimebegin').val()).toUTCString(),
+                 'to'  : new Date($('#datetimeend').val()).toUTCString(),
+                 'metricName': $('#metricName').val(),
+                 'useStats': $("#useStats").is(':checked')
+               };
+               var successHandler = function(responseData) {
+                 if(responseData.error != null) {
+                   $('#reportedMetric').html(responseData.error);
+                 } else {
+                   var graphDiv = document.createElement('div');
+                   $('#reportedMetric').html(graphDiv);
+
+                   Morris.Line({
+                                 element: graphDiv,
+                                 data: responseData.data,
+                                 xkey: 'timestamp',
+                                 ykeys: ['value'],
+                                 labels: [$('#metricName').val()]
+                               });
+                 }
+               };
+               $.get(requestURL, requestData, successHandler, 'json');
+         }
+
+         $(document).ready(function () {
+             $('#datetimebegin').datetimepicker();
+             $('#datetimeend').datetimepicker();
+             $('#datetimebegin').on('change.dp', function(e) {
+                 $('#datetimeend').data('DateTimePicker').setStartDate(e.date);
+               });
+             $('#datetimeend').on('change.dp', function(e) {
+                 $('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
+               });
+             $('#retrieve').click(refreshMetricChart);
+         });
+
+      </script>
+   </head>
+   <body>
+      #set ($current_page="Statistics")
+      #parse ("azkaban/webapp/servlet/velocity/nav.vm")
+      #if ($errorMsg)
+      #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+      #else
+      ## Page header.
+      <div class="az-page-header">
+         <div class="container-full">
+            <div class="row">
+               <div class="header-title" style="width: 17%;">
+                  <h1><a href="${context}/stats">Statistics</a></h1>
+               </div>
+               <div class="header-control" style="width: 900px; padding-top: 5px;">
+                  <form id="metric-form" method="get">
+                     <label for="metricLabel" >Metric</label>
+                     #if (!$metricList.isEmpty())
+                     <select id="metricName"  name="metricName" style="width:200px">
+                        #foreach ($metric in $metricList)
+                        <option value="${metric}" style="width:200px">${metric}</option>
+                        #end
+                     </select>
+                     #end
+                     <label for="datetimebegin" >Between</label>
+                     <input type="text" id="datetimebegin" value="" class="ui-datetime-container" style="width:150px">
+                     <label for="datetimeend" >and</label>
+                     <input type="text" id="datetimeend" value="" class="ui-datetime-container" style="width:150px">
+                     <input type="checkbox" name="useStats" id="useStats" value="true"> useStats
+                     <input type="button" id="retrieve" value="Retrieve" class="btn btn-success" >
+               </div>
+            </div>
+            </form>
+         </div>
+      </div>
+      </div>
+      </div>
+      <div class="container-full">
+         #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+         <div class="row">
+            <div id="reportedMetric" style="padding: 60px 10px 10px 10px;height: 750px;">
+            </div>
+         </div>
+         <!-- /row -->
+         #parse ("azkaban/webapp/servlet/velocity/invalidsessionmodal.vm")
+      </div>
+      <!-- /container-full -->
+      #end
+   </body>
+   <html>
\ No newline at end of file

build.gradle 1(+1 -0)

diff --git a/build.gradle b/build.gradle
index 6fd240d..2ad1434 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,7 @@ project(':azkaban-common') {
     compile('net.sf.jopt-simple:jopt-simple:4.3')
     compile('org.apache.commons:commons-email:1.2')
     compile('org.apache.commons:commons-jexl:2.1.1')
+    compile('org.apache.commons:commons-math3:3.0')
     compile('org.apache.httpcomponents:httpclient:4.2.1')
     compile('org.apache.httpcomponents:httpcore:4.2.1')
     compile('org.apache.velocity:velocity:1.7')