InMemoryMetricEmitter.java

109 lines | 3.99 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.metric;

import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import azkaban.utils.Props;


public class InMemoryMetricEmitter implements IMetricEmitter {
  Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
  private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
  private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";

  long Interval;
  long numInstances;

  public InMemoryMetricEmitter(Props azkProps) {
    historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
    Interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7);
    numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 10000);
  }

  @Override
  public void reportMetric(IMetric<?> metric) throws Exception {
    String metricName = metric.getName();
    if (!historyListMapping.containsKey(metricName)) {
      historyListMapping.put(metricName, new LinkedList<InMemoryHistoryNode>());
    }
    synchronized (historyListMapping.get(metricName)) {
      historyListMapping.get(metricName).add(new InMemoryHistoryNode(metric.getValue()));
      cleanUsingTime(metricName, historyListMapping.get(metricName).peekLast().getTimestamp());
    }
  }

  public List<InMemoryHistoryNode> getDrawMetric(String metricName, Date from, Date to) {
    LinkedList<InMemoryHistoryNode> selectedLists = new LinkedList<InMemoryHistoryNode>();
    if (historyListMapping.containsKey(metricName)) {

      // selecting nodes within time frame
      synchronized (historyListMapping.get(metricName)) {
        for (InMemoryHistoryNode node : historyListMapping.get(metricName)) {
          if (node.getTimestamp().after(from) && node.getTimestamp().before(to)) {
            selectedLists.add(node);
          }
          if (node.getTimestamp().after(to)) {
            break;
          }
        }
      }

      // selecting nodes if num of nodes > numInstances
      if (selectedLists.size() > numInstances) {
        double step = (double) selectedLists.size() / numInstances;
        long nextIndex = 0, currentIndex = 0, numSelectedInstances = 1;
        Iterator<InMemoryHistoryNode> ite = selectedLists.iterator();

        while(ite.hasNext()) {
          ite.next();
          if (currentIndex == nextIndex) {
            nextIndex = (long) Math.floor(numSelectedInstances * step + 0.5);
            numSelectedInstances++;
          } else {
            ite.remove();
          }
          currentIndex++;
        }
      }
    }
    cleanUsingTime(metricName, new Date());
    return selectedLists;
  }

  private void cleanUsingTime(String metricName, Date firstAllowedDate) {
    if (historyListMapping.containsKey(metricName) && historyListMapping.get(metricName) != null) {
      synchronized (historyListMapping.get(metricName)) {

        InMemoryHistoryNode firstNode = historyListMapping.get(metricName).peekFirst();
        // removing objects older than Interval time from firstAllowedDate
        while (firstNode != null
            && TimeUnit.MILLISECONDS.toSeconds(firstAllowedDate.getTime() - firstNode.getTimestamp().getTime()) > Interval) {
          historyListMapping.get(metricName).removeFirst();
          firstNode = historyListMapping.get(metricName).peekFirst();
        }
      }
    }
  }
}