diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
new file mode 100644
index 0000000..d71f14b
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.consumer;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import com.ning.billing.meter.timeline.codec.TimeRangeSampleProcessor;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+
+public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
+
+ public enum TimeAggregationMode {
+ SECONDS,
+ MINUTES,
+ HOURS,
+ DAYS,
+ MONTHS,
+ YEARS
+ }
+
+ private final StringBuilder builder = new StringBuilder();
+ // Linked HashMap to keep ordering of opcodes as they came
+ private final Map<SampleOpcode, Double> accumulators = new LinkedHashMap<SampleOpcode, Double>();
+
+ private final TimeAggregationMode timeAggregationMode;
+ private final SampleConsumer sampleConsumer;
+
+ private DateTime lastRoundedTime = null;
+ private int aggregatedSampleNumber = 0;
+
+ public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode) {
+ super(null, null);
+ this.timeAggregationMode = timeAggregationMode;
+ // TODO should be configurable
+ this.sampleConsumer = new CSVSampleConsumer();
+ }
+
+ @Override
+ public void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value) {
+ // Round the sample timestamp according to the aggregation mode
+ final long millis = time.toDateTime(DateTimeZone.UTC).getMillis();
+ final DateTime roundedTime;
+ switch (timeAggregationMode) {
+ case SECONDS:
+ roundedTime = new DateTime((millis / 1000) * 1000L, DateTimeZone.UTC);
+ break;
+ case MINUTES:
+ roundedTime = new DateTime((millis / (60 * 1000)) * 60 * 1000L, DateTimeZone.UTC);
+ break;
+ case HOURS:
+ roundedTime = new DateTime((millis / (60 * 60 * 1000)) * 60 * 60 * 1000L, DateTimeZone.UTC);
+ break;
+ case DAYS:
+ roundedTime = new DateTime((millis / (24 * 60 * 60 * 1000)) * 24 * 60 * 60 * 1000L, DateTimeZone.UTC);
+ break;
+ case MONTHS:
+ roundedTime = new DateTime(time.getYear(), time.getMonthOfYear(), 1, 0, 0, 0, 0, DateTimeZone.UTC);
+ break;
+ case YEARS:
+ roundedTime = new DateTime(time.getYear(), 1, 1, 0, 0, 0, 0, DateTimeZone.UTC);
+ break;
+ default:
+ roundedTime = time;
+ break;
+ }
+
+ // Get the sample value to aggregate
+ // TODO Should we ignore conversion errors (e.g. Strings)?
+ final double doubleValue = ScalarSample.getDoubleValue(opcode, value);
+
+ // Output if it's not the first value and the current rounded time differ from the previous one
+ if (lastRoundedTime != null && !lastRoundedTime.equals(roundedTime)) {
+ outputAndResetAccumulators();
+ }
+
+ // Perform (or restart) the aggregation
+ if (accumulators.get(opcode) == null) {
+ accumulators.put(opcode, Double.valueOf("0"));
+ }
+ accumulators.put(opcode, accumulators.get(opcode) + doubleValue);
+
+ lastRoundedTime = roundedTime;
+ }
+
+ private void outputAndResetAccumulators() {
+ if (aggregatedSampleNumber != 0) {
+ // TODO Assume CSV
+ builder.append(",");
+ }
+ // Output one opcode at a time
+ for (final SampleOpcode opcode : accumulators.keySet()) {
+ aggregatedSampleNumber++;
+ sampleConsumer.consumeSample(aggregatedSampleNumber, opcode, accumulators.get(opcode), lastRoundedTime);
+ }
+ // This will flush (clear) the sample consumer
+ builder.append(sampleConsumer.toString());
+
+ accumulators.clear();
+ }
+
+ @Override
+ public synchronized String toString() {
+ // Often empty
+ final String value = builder.toString();
+ // Allow for re-use
+ builder.setLength(0);
+ return value;
+ }
+
+ public String flush() {
+ outputAndResetAccumulators();
+ return toString();
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
new file mode 100644
index 0000000..933eaa5
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.consumer;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer.TimeAggregationMode;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.util.clock.ClockMock;
+
+public class TestAccumulatorSampleConsumer extends MeterTestSuite {
+
+ private final ClockMock clock = new ClockMock();
+
+ @Test(groups = "fast")
+ public void testDailyAggregation() throws Exception {
+ clock.setTime(new DateTime(2012, 12, 1, 12, 40, DateTimeZone.UTC));
+ final DateTime start = clock.getUTCNow();
+
+ final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS);
+
+ // 5 for day 1
+ sampleConsumer.processOneSample(start, SampleOpcode.DOUBLE, (double) 1);
+ sampleConsumer.processOneSample(start.plusHours(4), SampleOpcode.DOUBLE, (double) 4);
+ // 1 for day 2
+ sampleConsumer.processOneSample(start.plusDays(1), SampleOpcode.DOUBLE, (double) 1);
+ // 10 and 20 for day 3 (with different opcode)
+ sampleConsumer.processOneSample(start.plusDays(2), SampleOpcode.DOUBLE, (double) 10);
+ sampleConsumer.processOneSample(start.plusDays(2), SampleOpcode.INT, 20);
+
+ Assert.assertEquals(sampleConsumer.flush(), "1354320000,5.0,1354406400,1.0,1354492800,10.0,1354492800,20.0");
+ }
+}