azkaban-uncached

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index d6a9ab4..5eb19a4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,8 +32,7 @@ public interface ConnectorParams {
   public static final String LOG_ACTION = "log";
   public static final String ATTACHMENTS_ACTION = "attachments";
   public static final String METADATA_ACTION = "metadata";
-  public static final String RELOAD_JOBTYPE_PLUGINS_ACTION =
-      "reloadJobTypePlugins";
+  public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
 
   public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
   public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
@@ -80,21 +79,27 @@ public interface ConnectorParams {
   public static final String JMX_GET_MBEANS = "getMBeans";
   public static final String JMX_GET_MBEAN_INFO = "getMBeanInfo";
   public static final String JMX_GET_MBEAN_ATTRIBUTE = "getAttribute";
-  public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES =
-      "getAllMBeanAttributes";
+  public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES = "getAllMBeanAttributes";
   public static final String JMX_ATTRIBUTE = "attribute";
   public static final String JMX_MBEAN = "mBean";
 
-  public static final String JMX_GET_ALL_EXECUTOR_ATTRIBUTES =
-      "getAllExecutorAttributes";
+  public static final String JMX_GET_ALL_EXECUTOR_ATTRIBUTES = "getAllExecutorAttributes";
   public static final String JMX_HOSTPORT = "hostPort";
 
   public static final String STATS_GET_ALLMETRICSNAME = "getAllMetricNames";
   public static final String STATS_GET_METRICHISTORY = "metricHistory";
   public static final String STATS_SET_REPORTINGINTERVAL = "changeMetricInterval";
-  public static final String STATS_MAP_METRICNAMEPARAM =  "metricName";
-  public static final String STATS_MAP_STARTDATE =  "from";
-  public static final String STATS_MAP_ENDDATE =  "to";
-    public static final String STATS_MAP_REPORTINGINTERVAL =  "interval";
+  public static final String STATS_SET_CLEANINGINTERVAL = "changeCleaningInterval";
+  public static final String STATS_SET_MAXREPORTERPOINTS = "changeEmitterPoints";
+  public static final String STATS_SET_ENABLEMETRICS = "enableMetrics";
+  public static final String STATS_SET_DISBLEMETRICS = "disableMetrics";
+  public static final String STATS_MAP_METRICNAMEPARAM = "metricName";
+  public static final String STATS_MAP_METRICRETRIVALMODE = "useStats";
+  public static final String STATS_MAP_STARTDATE = "from";
+  public static final String STATS_MAP_ENDDATE = "to";
+  public static final String STATS_MAP_REPORTINGINTERVAL = "interval";
+  public static final String STATS_MAP_CLEANINGINTERVAL = "interval";
+  public static final String STATS_MAP_EMITTERNUMINSTANCES = "numInstances";
+
 
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 8cdccbc..1c6a4f9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -754,10 +754,6 @@ public class ExecutorManager extends EventHandler implements
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
         (Map<String, Object>) JSONUtils.parseJSONFromString(response);
-    String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
-    if (error != null) {
-      throw new IOException(error);
-    }
 
     return jsonResponse;
   }
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 1b5f195..5eb1890 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -44,4 +44,9 @@ public class GangliaMetricEmitter implements IMetricEmitter {
       }
     }
   }
+
+  @Override
+  public void purgeAllData() throws Exception {
+
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
index 963aa7c..f485d23 100644
--- a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -18,4 +18,5 @@ package azkaban.metric;
 
 public interface IMetricEmitter {
   void reportMetric(final IMetric<?> metric) throws Exception;
+  void purgeAllData() throws Exception;
 }
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
new file mode 100644
index 0000000..adc617e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryHistoryStatistics.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric.inmemoryemitter;
+
+import java.util.List;
+
+public final class InMemoryHistoryStatistics {
+
+  /**
+   * Returns the average
+   */
+  public static double mean(List<InMemoryHistoryNode> data) {
+    double total = 0.0;
+    for (InMemoryHistoryNode node : data) {
+      total = total + ((Number)node.getValue()).doubleValue() ;
+    }
+    return total / data.size();
+  }
+
+  /**
+   * Returns the sample standard deviation
+   */
+  public static double sdev(List<InMemoryHistoryNode> data) {
+    return Math.sqrt(variance(data));
+  }
+
+  /**
+   * Returns the sample variance
+   */
+  public static double variance(List<InMemoryHistoryNode> data) {
+    double mu = mean(data);
+    double sumsq = 0.0;
+    for (InMemoryHistoryNode node : data) {
+      sumsq += sqr(mu - ((Number)node.getValue()).doubleValue());
+    }
+    return sumsq / data.size();
+  }
+
+  public static double sqr(double x) {
+    return x * x;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index 9d411eb..3444e7b 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -33,12 +33,14 @@ public class MetricReportManager {
   private List<IMetricEmitter> metricEmitters;
   private ExecutorService executorService;
   private static volatile MetricReportManager instance = null;
+  boolean isManagerEnabled;
 
   // For singleton
   private MetricReportManager() {
     executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
     metrics = new ArrayList<IMetric<?>>();
     metricEmitters = new LinkedList<IMetricEmitter>();
+    enableManager();
   }
 
   public static boolean isInstantiated() {
@@ -59,7 +61,7 @@ public class MetricReportManager {
 
   // each element of metrics List is responsible to call this method and report metrics
   public void reportMetric(final IMetric<?> metric) {
-    if (metric != null) {
+    if (metric != null && isManagerEnabled) {
 
       // Report metric to all the emitters
       synchronized (metric) {
@@ -118,6 +120,23 @@ public class MetricReportManager {
     return metrics;
   }
 
+  public void enableManager() {
+    isManagerEnabled = true;
+  }
+
+  public void disableManager() {
+    if(isManagerEnabled) {
+      isManagerEnabled = false;
+      for(IMetricEmitter emitter: metricEmitters) {
+        try {
+          emitter.purgeAllData();
+        } catch (Exception ex) {
+          logger.error("Failed to purge data "  + ex.toString());
+        }
+      }
+    }
+  }
+
   protected void finalize() {
     executorService.shutdown();
   }
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index 985dd74..b8c8a98 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -21,24 +21,30 @@ import java.util.TimerTask;
 
 
 public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
-  private Timer timer = new Timer();
-  private TimerTask recurringReporting = new TimerTask() {
-    @Override
-    public void run() {
-      finalizeValue();
-      notifyManager();
-    }
-  };
+  private Timer timer;
 
   public TimeBasedReportingMetric(String metricName, String metricType, T initialValue, MetricReportManager manager,
       long interval) {
     super(metricName, metricType, initialValue, manager);
-    timer.schedule(recurringReporting, interval, interval);
+    timer = new Timer();
+    timer.schedule(getTimerTask(), interval, interval);
+  }
+
+  private TimerTask getTimerTask() {
+    TimerTask recurringReporting = new TimerTask() {
+      @Override
+      public void run() {
+        finalizeValue();
+        notifyManager();
+      }
+    };
+    return recurringReporting;
   }
 
   public void updateInterval(long interval) {
     timer.cancel();
-    timer.schedule(recurringReporting, interval, interval);
+    timer = new Timer();
+    timer.schedule(getTimerTask(), interval, interval);
   }
 
   /**
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 2d08ed2..151f0b7 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -43,8 +43,8 @@ import azkaban.execapp.metric.NumRunningFlowMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.metric.IMetricEmitter;
-import azkaban.metric.InMemoryMetricEmitter;
 import azkaban.metric.MetricReportManager;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.server.AzkabanServer;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index 1d1a569..d31605d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -36,10 +36,10 @@ import org.apache.log4j.Logger;
 import azkaban.executor.ConnectorParams;
 import azkaban.metric.IMetric;
 import azkaban.metric.IMetricEmitter;
-import azkaban.metric.InMemoryHistoryNode;
-import azkaban.metric.InMemoryMetricEmitter;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.metric.inmemoryemitter.InMemoryHistoryNode;
+import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.server.HttpRequestUtils;
 import azkaban.server.ServerConstants;
 import azkaban.utils.JSONUtils;
@@ -63,6 +63,10 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     return HttpRequestUtils.getParam(request, name);
   }
 
+  public Boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException {
+    return HttpRequestUtils.getBooleanParam(request, name);
+  }
+
   public long getLongParam(HttpServletRequest request, String name) throws ServletException {
     return HttpRequestUtils.getLongParam(request, name);
   }
@@ -74,34 +78,83 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
       String action = getParam(req, ACTION_PARAM);
       if (action.equals(STATS_SET_REPORTINGINTERVAL)) {
         handleChangeMetricInterval(req, ret);
+      } else if (action.equals(STATS_SET_CLEANINGINTERVAL)) {
+        handleChangeCleaningInterval(req, ret);
+      } else if (action.equals(STATS_SET_MAXREPORTERPOINTS)) {
+        handleChangeEmitterPoints(req, ret);
       } else if (action.equals(STATS_GET_ALLMETRICSNAME)) {
         handleGetAllMMetricsName(req, ret);
       } else if (action.equals(STATS_GET_METRICHISTORY)) {
         handleGetMetricHistory(req, ret);
+      } else if (action.equals(STATS_SET_ENABLEMETRICS)) {
+        handleChangeManagerStatusRequest(req, ret, true);
+      } else if (action.equals(STATS_SET_DISBLEMETRICS)) {
+        handleChangeManagerStatusRequest(req, ret, false);
       }
     }
 
     JSONUtils.toJSON(ret, resp.getOutputStream(), true);
   }
 
+  private void handleChangeManagerStatusRequest(HttpServletRequest req, Map<String, Object> ret, boolean enable) {
+    try {
+      if (MetricReportManager.isInstantiated()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        if (enable) {
+          metricManager.enableManager();
+        } else {
+          metricManager.disableManager();
+        }
+      }
+      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  private void handleChangeEmitterPoints(HttpServletRequest req, Map<String, Object> ret) {
+    try {
+      long numInstance = getLongParam(req, STATS_MAP_EMITTERNUMINSTANCES);
+      if (MetricReportManager.isInstantiated()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+        memoryEmitter.setReportingInstances(numInstance);
+      }
+      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
+  private void handleChangeCleaningInterval(HttpServletRequest req, Map<String, Object> ret) {
+    try {
+      long newInterval = getLongParam(req, STATS_MAP_CLEANINGINTERVAL);
+      if (MetricReportManager.isInstantiated()) {
+        MetricReportManager metricManager = MetricReportManager.getInstance();
+        InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
+        memoryEmitter.setReportingInterval(newInterval);
+      }
+      ret.put(STATUS_PARAM, RESPONSE_SUCCESS);
+    } catch (Exception e) {
+      logger.error(e);
+      ret.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
   private void handleGetMetricHistory(HttpServletRequest req, Map<String, Object> ret) throws ServletException {
     if (MetricReportManager.isInstantiated()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
-      InMemoryMetricEmitter memoryEmitter = null;
-
-      for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
-        if (emitter instanceof InMemoryMetricEmitter) {
-          memoryEmitter = (InMemoryMetricEmitter) emitter;
-          break;
-        }
-      }
+      InMemoryMetricEmitter memoryEmitter = extractInMemoryMetricEmitter(metricManager);
 
       // if we have a memory emitter
       if (memoryEmitter != null) {
         try {
           List<InMemoryHistoryNode> result =
               memoryEmitter.getDrawMetric(getParam(req, STATS_MAP_METRICNAMEPARAM),
-                  parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)));
+                  parseDate(getParam(req, STATS_MAP_STARTDATE)), parseDate(getParam(req, STATS_MAP_ENDDATE)),
+                  getBooleanParam(req, STATS_MAP_METRICRETRIVALMODE));
 
           if (result != null && result.size() > 0) {
             ret.put("data", result);
@@ -120,11 +173,22 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  private InMemoryMetricEmitter extractInMemoryMetricEmitter(MetricReportManager metricManager) {
+    InMemoryMetricEmitter memoryEmitter = null;
+    for (IMetricEmitter emitter : metricManager.getMetricEmitters()) {
+      if (emitter instanceof InMemoryMetricEmitter) {
+        memoryEmitter = (InMemoryMetricEmitter) emitter;
+        break;
+      }
+    }
+    return memoryEmitter;
+  }
+
   private void handleGetAllMMetricsName(HttpServletRequest req, Map<String, Object> ret) {
     if (MetricReportManager.isInstantiated()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
       List<IMetric<?>> result = metricManager.getAllMetrics();
-      if(result.size() == 0) {
+      if (result.size() == 0) {
         ret.put(RESPONSE_ERROR, "No Metric being tracked");
       } else {
         ret.put("data", result);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index bacdf4c..3c54e93 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
@@ -56,38 +72,38 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
       handleGetMetricHistory(req, ret, session.getUser());
     } else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
-      handleChangeMetricInterval(req, ret, session);
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+    } else if (actionName.equals(ConnectorParams.STATS_SET_DISBLEMETRICS)) {
+      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISBLEMETRICS, req, ret);
     }
 
     writeJSON(resp, ret);
   }
 
-  private void handleChangeMetricInterval(HttpServletRequest req, HashMap<String, Object> ret, Session session)
+  private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
       throws ServletException, IOException {
-    try {
-      Map<String, Object> result =
-          execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, getAllParams(req));
-
-      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-        throw new Exception(result.get(ConnectorParams.RESPONSE_ERROR).toString());
-      }
+    Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
+    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
+    } else {
       ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
-    } catch (Exception e) {
-      ret.put(ConnectorParams.RESPONSE_ERROR, e.toString());
     }
   }
 
   private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
       throws IOException, ServletException {
-    try {
-      Map<String, Object> result =
-          execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
-      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-        throw new Exception(result.get(ConnectorParams.RESPONSE_ERROR).toString());
-      }
+    Map<String, Object> result =
+        execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
+    } else {
       ret.put("data", result.get("data"));
-    } catch (Exception e) {
-      ret.put(ConnectorParams.RESPONSE_ERROR, e.toString());
     }
   }
 
@@ -99,16 +115,19 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
       page.render();
       return;
     }
+
     try {
       Map<String, Object> result =
           execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
       if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-        throw new Exception(result.get(ConnectorParams.RESPONSE_ERROR).toString());
+        page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        page.add("metricList", result.get("data"));
       }
-      page.add("metricList", result.get("data"));
-    } catch (Exception e) {
-      page.add("errorMsg", e.toString());
+    } catch (IOException e) {
+      page.add("errorMsg", "Failed to get a response from Azkaban exec serve");
     }
+
     page.render();
   }
 
@@ -128,16 +147,16 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     return false;
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   private Pair<String, String>[] getAllParams(HttpServletRequest req) {
     List<Pair<String, String>> allParams = new LinkedList<Pair<String, String>>();
 
     Iterator it = req.getParameterMap().entrySet().iterator();
     while (it.hasNext()) {
-        Map.Entry pairs = (Map.Entry)it.next();
-        for(Object value : (String [])pairs.getValue()) {
-          allParams.add(new Pair<String, String>((String) pairs.getKey(), (String) value));
-        }
+      Map.Entry pairs = (Map.Entry) it.next();
+      for (Object value : (String[]) pairs.getValue()) {
+        allParams.add(new Pair<String, String>((String) pairs.getKey(), (String) value));
+      }
     }
 
     return allParams.toArray(new Pair[allParams.size()]);
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index 8ab8b72..ffecbcc 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -38,7 +38,8 @@
               'action': 'metricHistory',
               'from': $('#datetimebegin').val(),
               'to'  : $('#datetimeend').val(),
-              'metricName': $('#metricName').val()
+              'metricName': $('#metricName').val(),
+              'useStats': $("#useStats").is(':checked')
             };
             var successHandler = function(responseData) {
               if(responseData.error != null) {
@@ -87,10 +88,10 @@
     <div class="az-page-header">
       <div class="container-full">
         <div class="row">
-          <div class="header-title">
+          <div class="header-title" style="width: 17%;">
             <h1><a href="${context}/stats">Statistics</a></h1>
           </div>
-          <div class="header-control" style="width:900px">
+          <div class="header-control" style="width: 900px; padding-top: 5px;">
 
             <form id="metric-form" method="get">
                         <label for="metricLabel" >Metric</label>
@@ -103,12 +104,12 @@
               #end
                    <label for="datetimebegin" >Between</label>
 
-                    <input type="text" id="datetimebegin" value="" class="ui-datetime-container" style="width:200px">
+                    <input type="text" id="datetimebegin" value="" class="ui-datetime-container" style="width:150px">
 
                    <label for="datetimeend" >and</label>
 
-                     <input type="text" id="datetimeend" value="" class="ui-datetime-container" style="width:200px">
-
+                     <input type="text" id="datetimeend" value="" class="ui-datetime-container" style="width:150px">
+                    <input type="checkbox" name="useStats" id="useStats" value="true"> useStats
                     <input type="button" id="retrieve" value="Retrieve" class="btn btn-success" >
                 </div>
               </div>