azkaban-memoizeit

Details

diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 066d3a5..4512a6c 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -44,7 +44,9 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
   Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
   private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
   private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+  private static final String INMEMORY_METRIC_DEVIAION_FACTOR = "azkaban.metric.inmemory.statisticalDeviationFactor";
 
+  double statisticalDeviationFactor;
   /**
    * Interval (in millisecond) from today for which we should maintain the in memory snapshots
    */
@@ -61,6 +63,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
     historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
     interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
     numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
+    statisticalDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_DEVIAION_FACTOR, 2);
   }
 
   /**
@@ -148,7 +151,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
       InMemoryHistoryNode currentNode = ite.next();
       double value = ((Number) currentNode.getValue()).doubleValue();
       // remove all elements which lies in 95% value band
-      if (value > mean + 2 * std && value < mean + 2 * std) {
+      if (value > mean + statisticalDeviationFactor * std && value < mean - statisticalDeviationFactor * std) {
         ite.remove();
       }
     }
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index e58023e..db6db1b 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -48,8 +48,9 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
     TimerTask recurringReporting = new TimerTask() {
       @Override
       public void run() {
-        finalizeValue();
+        preTrackingEventMethod();
         notifyManager();
+        postTrackingEventMethod();
       }
     };
     return recurringReporting;
@@ -67,8 +68,13 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
   }
 
   /**
-   * This method is responsible for making a final update to value, if any
+   * This method is responsible for making any last minute update to value, if any
    */
-  protected abstract void finalizeValue();
+  protected abstract void preTrackingEventMethod();
+
+  /**
+   * This method is responsible for making any post processing after tracking
+   */
+  protected abstract void postTrackingEventMethod();
 
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index aa35498..367da00 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -39,6 +39,9 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
+import azkaban.execapp.metric.NumQueuedFlowMetric;
 import azkaban.execapp.metric.NumRunningFlowMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.jmx.JmxJettyServer;
@@ -138,15 +141,28 @@ public class AzkabanExecutorServer {
       IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
       metricManager.addMetricEmitter(metricEmitter);
 
-      logger.info("Adding number of Jobs metric");
+      logger.info("Adding number of failed flow metric");
+      metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt("executor.metric.interval."
+          + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+
+      logger.info("Adding number of failed jobs metric");
+      metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt("executor.metric.interval."
+          + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+
+      logger.info("Adding number of running Jobs metric");
       metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
           + NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
 
-      logger.info("Adding number of flows metric");
+      logger.info("Adding number of running flows metric");
       metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
           "executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
           props.getInt("executor.metric.interval.default"))));
 
+      logger.info("Adding number of queued flows metric");
+      metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
+          "executor.metric.interval." + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+          props.getInt("executor.metric.interval.default"))));
+
       logger.info("Completed configuring Metric Reports");
     }
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index d3611b4..226a68a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,7 @@ import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
@@ -53,7 +54,6 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
-import azkaban.metric.IMetric;
 import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
@@ -61,13 +61,13 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.SwapQueue;
 
+
 /**
  * Class that handles the running of a ExecutableFlow DAG
  *
  */
 public class FlowRunner extends EventHandler implements Runnable {
-  private static final Layout DEFAULT_LAYOUT = new PatternLayout(
-      "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+  private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
   // We check update every 5 minutes, just in case things get stuck. But for the
   // most part, we'll be idling.
   private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -96,8 +96,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private final JobTypeManager jobtypeManager;
 
   private JobRunnerEventListener listener = new JobRunnerEventListener();
-  private Set<JobRunner> activeJobRunners = Collections
-      .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+  private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
 
   // Thread safe swap queue for finishedExecutions.
   private SwapQueue<ExecutableNode> finishedNodes;
@@ -132,9 +131,8 @@ public class FlowRunner extends EventHandler implements Runnable {
    * @param jobtypeManager
    * @throws ExecutorManagerException
    */
-  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, JobTypeManager jobtypeManager)
-      throws ExecutorManagerException {
+  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+      JobTypeManager jobtypeManager) throws ExecutorManagerException {
     this(flow, executorLoader, projectLoader, jobtypeManager, null);
   }
 
@@ -149,9 +147,8 @@ public class FlowRunner extends EventHandler implements Runnable {
    * @param executorService
    * @throws ExecutorManagerException
    */
-  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, JobTypeManager jobtypeManager,
-      ExecutorService executorService) throws ExecutorManagerException {
+  public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+      JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
     this.executorLoader = executorLoader;
@@ -213,23 +210,18 @@ public class FlowRunner extends EventHandler implements Runnable {
       runFlow();
     } catch (Throwable t) {
       if (logger != null) {
-        logger
-            .error(
-                "An error has occurred during the running of the flow. Quiting.",
-                t);
+        logger.error("An error has occurred during the running of the flow. Quiting.", t);
       }
       flow.setStatus(Status.FAILED);
     } finally {
       if (watcher != null) {
         logger.info("Watcher is attached. Stopping watcher.");
         watcher.stopWatcher();
-        logger
-            .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+        logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
       }
 
       flow.setEndTime(System.currentTimeMillis());
-      logger.info("Setting end time for flow " + execId + " to "
-          + System.currentTimeMillis());
+      logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
       closeLogger();
 
       updateFlow();
@@ -254,8 +246,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     // If there are flow overrides, we apply them now.
-    Map<String, String> flowParam =
-        flow.getExecutionOptions().getFlowParameters();
+    Map<String, String> flowParam = flow.getExecutionOptions().getFlowParameters();
     if (flowParam != null && !flowParam.isEmpty()) {
       commonFlowProps = new Props(commonFlowProps, flowParam);
     }
@@ -268,11 +259,9 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(logger);
     }
 
-    logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
-        + projectId + " version:" + version);
+    logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
     if (pipelineExecId != null) {
-      logger.info("Running simulateously with " + pipelineExecId
-          + ". Pipelining level " + pipelineLevel);
+      logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
     }
 
     // The current thread is used for interrupting blocks
@@ -282,10 +271,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private void updateFlowReference() throws ExecutorManagerException {
     logger.info("Update active reference");
-    if (!executorLoader.updateExecutableReference(execId,
-        System.currentTimeMillis())) {
-      throw new ExecutorManagerException(
-          "The executor reference doesn't exist. May have been killed prematurely.");
+    if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
+      throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
     }
   }
 
@@ -408,8 +395,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     resetFailedState(this.flow, retryJobs);
 
     for (ExecutableNode node : retryJobs) {
-      if (node.getStatus() == Status.READY
-          || node.getStatus() == Status.DISABLED) {
+      if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
         runReadyJob(node);
       } else if (node.getStatus() == Status.SUCCEEDED) {
         for (String outNodeId : node.getOutNodes()) {
@@ -478,8 +464,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Instant kill or skip if necessary.
     boolean jobsRun = false;
     for (ExecutableNode node : nodesToCheck) {
-      if (Status.isStatusFinished(node.getStatus())
-          || Status.isStatusRunning(node.getStatus())) {
+      if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
         // Really shouldn't get in here.
         continue;
       }
@@ -496,8 +481,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   private boolean runReadyJob(ExecutableNode node) throws IOException {
-    if (Status.isStatusFinished(node.getStatus())
-        || Status.isStatusRunning(node.getStatus())) {
+    if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
       return false;
     }
 
@@ -507,8 +491,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (nextNodeStatus == Status.CANCELLED) {
-      logger.info("Cancelling '" + node.getNestedId()
-          + "' due to prior errors.");
+      logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
       node.cancelNode(System.currentTimeMillis());
       finishExecutableNode(node);
     } else if (nextNodeStatus == Status.SKIPPED) {
@@ -540,8 +523,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (node.getRetries() > node.getAttempt()) {
-      logger.info("Job '" + node.getId() + "' will be retried. Attempt "
-          + node.getAttempt() + " of " + node.getRetries());
+      logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of "
+          + node.getRetries());
       node.setDelayedExecution(node.getRetryBackoff());
       node.resetForRetry();
       return true;
@@ -582,8 +565,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     for (String end : flow.getEndNodes()) {
       ExecutableNode node = flow.getExecutableNode(end);
 
-      if (node.getStatus() == Status.KILLED
-          || node.getStatus() == Status.FAILED
+      if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED
           || node.getStatus() == Status.CANCELLED) {
         succeeded = false;
       }
@@ -605,22 +587,19 @@ public class FlowRunner extends EventHandler implements Runnable {
     flow.setUpdateTime(System.currentTimeMillis());
     long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
     switch (flow.getStatus()) {
-    case FAILED_FINISHING:
-      logger.info("Setting flow '" + id + "' status to FAILED in "
-          + durationSec + " seconds");
-      flow.setStatus(Status.FAILED);
-      break;
-    case FAILED:
-    case KILLED:
-    case CANCELLED:
-    case FAILED_SUCCEEDED:
-      logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
-          + " in " + durationSec + " seconds");
-      break;
-    default:
-      flow.setStatus(Status.SUCCEEDED);
-      logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
-          + " in " + durationSec + " seconds");
+      case FAILED_FINISHING:
+        logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
+        flow.setStatus(Status.FAILED);
+        break;
+      case FAILED:
+      case KILLED:
+      case CANCELLED:
+      case FAILED_SUCCEEDED:
+        logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+        break;
+      default:
+        flow.setStatus(Status.SUCCEEDED);
+        logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
     }
 
     // If the finalized flow is actually the top level flow, than we finish
@@ -681,13 +660,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     // load the override props if any
     try {
-      props =
-          projectLoader.fetchProjectProperty(flow.getProjectId(),
-              flow.getVersion(), node.getId() + ".jor");
+      props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId() + ".jor");
     } catch (ProjectManagerException e) {
       e.printStackTrace();
-      logger.error("Error loading job override property for job "
-          + node.getId());
+      logger.error("Error loading job override property for job " + node.getId());
     }
 
     File path = new File(execDir, source);
@@ -697,8 +673,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         props = new Props(null, path);
       } catch (IOException e) {
         e.printStackTrace();
-        logger.error("Error loading job file " + source + " for job "
-            + node.getId());
+        logger.error("Error loading job file " + source + " for job " + node.getId());
       }
     }
     // setting this fake source as this will be used to determine the location
@@ -735,8 +710,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   public Status getImpliedStatus(ExecutableNode node) {
     // If it's running or finished with 'SUCCEEDED', than don't even
     // bother starting this job.
-    if (Status.isStatusRunning(node.getStatus())
-        || node.getStatus() == Status.SUCCEEDED) {
+    if (Status.isStatusRunning(node.getStatus()) || node.getStatus() == Status.SUCCEEDED) {
       return null;
     }
 
@@ -751,8 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
       if (!Status.isStatusFinished(depStatus)) {
         return null;
-      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
-          || depStatus == Status.KILLED) {
+      } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
         // We propagate failures as KILLED states.
         shouldKill = true;
       }
@@ -760,16 +733,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     // If it's disabled but ready to run, we want to make sure it continues
     // being disabled.
-    if (node.getStatus() == Status.DISABLED
-        || node.getStatus() == Status.SKIPPED) {
+    if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
       return Status.SKIPPED;
     }
 
     // If the flow has failed, and we want to finish only the currently running
     // jobs, we just
     // kill everything else. We also kill, if the flow has been cancelled.
-    if (flowFailed
-        && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+    if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
       return Status.CANCELLED;
     } else if (shouldKill || isKilled()) {
       return Status.CANCELLED;
@@ -783,8 +754,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     Props previousOutput = null;
     // Iterate the in nodes again and create the dependencies
     for (String dependency : node.getInNodes()) {
-      Props output =
-          node.getParentFlow().getExecutableNode(dependency).getOutputProps();
+      Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
       if (output != null) {
         output = Props.clone(output);
         output.setParent(previousOutput);
@@ -799,9 +769,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Load job file.
     File path = new File(execDir, node.getJobSource());
 
-    JobRunner jobRunner =
-        new JobRunner(node, path.getParentFile(), executorLoader,
-            jobtypeManager);
+    JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
     if (watcher != null) {
       jobRunner.setPipeline(watcher, pipelineLevel);
     }
@@ -824,10 +792,17 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
     logger.info("Configuring Azkaban metrics tracking for jobrunner object");
-    if(MetricReportManager.isInstantiated()) {
+    if (MetricReportManager.isInstantiated()) {
       MetricReportManager metricManager = MetricReportManager.getInstance();
-      NumRunningJobMetric metric = (NumRunningJobMetric) metricManager.getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME);
-      jobRunner.addListener(metric);
+
+      //Adding NumRunningJobMetric listener
+      jobRunner.addListener((NumRunningJobMetric) metricManager
+          .getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME));
+
+      //Adding NumFailedJobMetric listener
+      jobRunner.addListener((NumFailedJobMetric) metricManager
+          .getMetricFromName(NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME));
+
     }
   }
 
@@ -890,8 +865,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       if (watcher != null) {
         logger.info("Watcher is attached. Stopping watcher.");
         watcher.stopWatcher();
-        logger
-            .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+        logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
       }
 
       logger.info("Killing " + activeJobRunners.size() + " jobs.");
@@ -909,8 +883,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-  private void resetFailedState(ExecutableFlowBase flow,
-      List<ExecutableNode> nodesToRetry) {
+  private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
     // bottom up
     LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
     for (String id : flow.getEndNodes()) {
@@ -939,24 +912,24 @@ public class FlowRunner extends EventHandler implements Runnable {
       } else if (node instanceof ExecutableFlowBase) {
         ExecutableFlowBase base = (ExecutableFlowBase) node;
         switch (base.getStatus()) {
-        case CANCELLED:
-          node.setStatus(Status.READY);
-          node.setEndTime(-1);
-          node.setStartTime(-1);
-          node.setUpdateTime(currentTime);
-          // Break out of the switch. We'll reset the flow just like a normal
-          // node
-          break;
-        case KILLED:
-        case FAILED:
-        case FAILED_FINISHING:
-          resetFailedState(base, nodesToRetry);
-          continue;
-        default:
-          // Continue the while loop. If the job is in a finished state that's
-          // not
-          // a failure, we don't want to reset the job.
-          continue;
+          case CANCELLED:
+            node.setStatus(Status.READY);
+            node.setEndTime(-1);
+            node.setStartTime(-1);
+            node.setUpdateTime(currentTime);
+            // Break out of the switch. We'll reset the flow just like a normal
+            // node
+            break;
+          case KILLED:
+          case FAILED:
+          case FAILED_FINISHING:
+            resetFailedState(base, nodesToRetry);
+            continue;
+          default:
+            // Continue the while loop. If the job is in a finished state that's
+            // not
+            // a failure, we don't want to reset the job.
+            continue;
         }
       } else if (node.getStatus() == Status.CANCELLED) {
         // Not a flow, but killed
@@ -964,16 +937,13 @@ public class FlowRunner extends EventHandler implements Runnable {
         node.setStartTime(-1);
         node.setEndTime(-1);
         node.setUpdateTime(currentTime);
-      } else if (node.getStatus() == Status.FAILED
-          || node.getStatus() == Status.KILLED) {
+      } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
         node.resetForRetry();
         nodesToRetry.add(node);
       }
 
-      if (!(node instanceof ExecutableFlowBase)
-          && node.getStatus() != oldStatus) {
-        logger.info("Resetting job '" + node.getNestedId() + "' from "
-            + oldStatus + " to " + node.getStatus());
+      if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
+        logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
       }
 
       for (String inId : node.getInNodes()) {
@@ -995,16 +965,14 @@ public class FlowRunner extends EventHandler implements Runnable {
       // start node has not.
       for (String id : flow.getStartNodes()) {
         ExecutableNode node = flow.getExecutableNode(id);
-        if (node.getStatus() == Status.READY
-            || node.getStatus() == Status.DISABLED) {
+        if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
           nodesToRetry.add(node);
         }
       }
     }
     flow.setUpdateTime(System.currentTimeMillis());
     flow.setEndTime(-1);
-    logger.info("Resetting flow '" + flow.getNestedId() + "' from "
-        + oldFlowState + " to " + flow.getStatus());
+    logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
   }
 
   private void interrupt() {
@@ -1025,14 +993,13 @@ public class FlowRunner extends EventHandler implements Runnable {
         ExecutableNode node = runner.getNode();
         long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (mainSyncObj) {
-          logger.info("Job " + node.getNestedId() + " finished with status "
-              + node.getStatus() + " in " + seconds + " seconds");
+          logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds
+              + " seconds");
 
           // Cancellation is handled in the main thread, but if the flow is
           // paused, the main thread is paused too.
           // This unpauses the flow for cancellation.
-          if (flowPaused && node.getStatus() == Status.FAILED
-              && failureAction == FailureAction.CANCEL_ALL) {
+          if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
             flowPaused = false;
           }
 
@@ -1075,8 +1042,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     ExecutableNode node = flow.getExecutableNodePath(jobId);
     File path = new File(execDir, node.getJobSource());
 
-    String attachmentFileName =
-        JobRunner.createAttachmentFileName(node, attempt);
+    String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
     File attachmentFile = new File(path.getParentFile(), attachmentFileName);
     if (!attachmentFile.exists()) {
       return null;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 54d82e8..9aa5420 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -42,12 +42,15 @@ import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -60,26 +63,26 @@ import azkaban.utils.TrackingThreadPool;
 
 /**
  * Execution manager for the server side execution.
- * 
+ *
  * When a flow is submitted to FlowRunnerManager, it is the
  * {@link Status.PREPARING} status. When a flow is about to be executed by
  * FlowRunner, its status is updated to {@link Status.RUNNING}
- * 
+ *
  * Two main data structures are used in this class to maintain flows.
- * 
+ *
  * runningFlows: this is used as a bookkeeping for submitted flows in
  * FlowRunnerManager. It has nothing to do with the executor service that is
  * used to execute the flows. This bookkeeping is used at the time of canceling
  * or killing a flow. The flows in this data structure is removed in the
  * handleEvent method.
- * 
+ *
  * submittedFlows: this is used to keep track the execution of the flows, so it
  * has the mapping between a Future<?> and an execution id. This would allow us
  * to find out the execution ids of the flows that are in the Status.PREPARING
  * status. The entries in this map is removed once the flow execution is
  * completed.
- * 
- * 
+ *
+ *
  */
 public class FlowRunnerManager implements EventListener,
     ThreadPoolExecutingListener {
@@ -483,6 +486,8 @@ public class FlowRunnerManager implements EventListener,
         .setValidateProxyUser(validateProxyUser)
         .setNumJobThreads(numJobThreads).addListener(this);
 
+    configureFlowLevelMetrics(runner);
+
     // Check again.
     if (runningFlows.containsKey(execId)) {
       throw new ExecutorManagerException("Execution " + execId
@@ -508,6 +513,21 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
+  /**
+   * Configure Azkaban metrics tracking for a new flowRunner instance
+   * @param flowRunner
+   */
+  private void configureFlowLevelMetrics(FlowRunner flowRunner) {
+    logger.info("Configuring Azkaban metrics tracking for flow runner object");
+
+    if (MetricReportManager.isInstantiated()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      //Adding NumFailedFlow Metric listener
+      flowRunner.addListener((NumFailedFlowMetric) metricManager
+          .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
+    }
+  }
+
   private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
     int execId = flow.getExecutionId();
     File execPath = new File(executionDirectory, String.valueOf(execId));
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
new file mode 100644
index 0000000..99f6b03
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+
+public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+  public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
+  private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
+
+  public NumFailedFlowMetric(MetricReportManager manager, long interval) {
+    super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumFailedJobMetric");
+  }
+
+  /**
+  * Listen for events to maintain correct value of number of failed flows
+  * {@inheritDoc}
+  * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+  */
+  @Override
+  public synchronized void handleEvent(Event event) {
+    if (event.getType() == Type.FLOW_FINISHED) {
+      FlowRunner runner = (FlowRunner) event.getRunner();
+      if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
+        value = value + 1;
+      }
+    }
+  }
+
+  @Override
+  protected void preTrackingEventMethod() {
+    // Nothing to finalize before tracking event
+  }
+
+  @Override
+  protected synchronized void postTrackingEventMethod() {
+    value = 0;
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
new file mode 100644
index 0000000..6e16899
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of failed jobs in between the tracking events
+ */
+public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+  public static final String NUM_FAILED_JOB_METRIC_NAME = "NumFailedJobMetric";
+  private static final String NUM_FAILED_JOB_METRIC_TYPE = "uint16";
+
+  public NumFailedJobMetric(MetricReportManager manager, long interval) {
+    super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumFailedJobMetric");
+  }
+
+  /**
+   * Listen for events to maintain correct value of number of failed jobs
+   * {@inheritDoc}
+   * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+   */
+  @Override
+  public synchronized void handleEvent(Event event) {
+    JobRunner runner = (JobRunner) event.getRunner();
+    if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
+      value = value + 1;
+    }
+  }
+
+  @Override
+  protected void preTrackingEventMethod() {
+    // Nothing to finalize before tracking event
+  }
+
+  @Override
+  protected synchronized void postTrackingEventMethod() {
+    value = 0;
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
new file mode 100644
index 0000000..5036078
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
+  public static final String NUM_QUEUED_FLOW_METRIC_NAME = "NumRunningFlowMetric";
+  private static final String NUM_QUEUED_FLOW_METRIC_TYPE = "uint16";
+
+  private FlowRunnerManager flowManager;
+
+  /**
+   * @param flowRunnerManager Flow runner manager
+   * @param manager metric report manager
+   * @param interval reporting interval
+   */
+  public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+    super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
+    logger.debug("Instantiated NumQueuedFlowMetric");
+    flowManager = flowRunnerManager;
+  }
+
+  /**
+   * Update value using flow manager
+   * {@inheritDoc}
+   * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
+   */
+  @Override
+  protected synchronized void preTrackingEventMethod() {
+    value = flowManager.getNumQueuedFlows();
+  }
+
+  @Override
+  protected void postTrackingEventMethod() {
+    // nothing to post process
+  }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index 2ac8654..b611151 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -43,11 +43,16 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
   /**
    * Update value using flow manager
    * {@inheritDoc}
-   * @see azkaban.metric.TimeBasedReportingMetric#finalizeValue()
+   * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
    */
   @Override
-  protected synchronized void finalizeValue() {
+  protected synchronized void preTrackingEventMethod() {
     value = flowManager.getNumRunningFlows();
   }
 
+  @Override
+  protected void postTrackingEventMethod() {
+    // nothing to post process
+  }
+
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 58146bc..1ad39d1 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -53,8 +53,13 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
   }
 
   @Override
-  protected synchronized void finalizeValue() {
+  protected synchronized void preTrackingEventMethod() {
     // nothing to finalize value is already updated
   }
 
+  @Override
+  protected void postTrackingEventMethod() {
+    // nothing to post process
+  }
+
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e204088..a06b859 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -248,7 +248,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
   }
 
   private Date parseDate(String date) throws ParseException {
-    DateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm a");
+    DateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz");
     return format.parse(date);
   }
 }
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index ffecbcc..636d989 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -36,8 +36,8 @@
             var requestURL = '/stats';
             var requestData = {
               'action': 'metricHistory',
-              'from': $('#datetimebegin').val(),
-              'to'  : $('#datetimeend').val(),
+              'from': new Date($('#datetimebegin').val()).toUTCString(),
+              'to'  : new Date($('#datetimeend').val()).toUTCString(),
               'metricName': $('#metricName').val(),
               'useStats': $("#useStats").is(':checked')
             };