killbill-memoizeit

meter: revisit GET APIs Create various JsonSamplesOutputer

12/5/2012 4:32:26 AM

Details

diff --git a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
index 597cf74..de7e9ae 100644
--- a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
+++ b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
@@ -21,6 +21,8 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.callcontext.CallContext;
@@ -28,11 +30,86 @@ import com.ning.billing.util.callcontext.TenantContext;
 
 public interface MeterUserApi {
 
-    public void getAggregateUsage(OutputStream outputStream, String source, Collection<String> categories,
-                                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+    /**
+     * Get usage data for all metrics (given a set of categories) as Json using an accumulating filter.
+     * <p/>
+     * Emulate: select sum(value) from chunks group by source, category, date_trunc('timeAggregationMode', timestamp);
+     *
+     * @param outputStream        stream to write the data to
+     * @param timeAggregationMode granularity of the accumulator
+     * @param source              source
+     * @param categories          categories
+     * @param fromTimestamp       earliest timestamp to consider (inclusive)
+     * @param toTimestamp         latest timestamp to consider (inclusive)
+     * @param context             call context
+     * @throws IOException
+     */
+    void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+                  String source, Collection<String> categories,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
 
-    public void getUsage(OutputStream outputStream, String source, Map<String, Collection<String>> metricsPerCategory,
-                         DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+    /**
+     * Get usage data as Json using an accumulating filter.
+     * <p/>
+     * Emulate: select sum(value) from chunks group by source, category, metric, date_trunc('timeAggregationMode', timestamp);
+     *
+     * @param outputStream        stream to write the data to
+     * @param timeAggregationMode granularity of the accumulator
+     * @param source              source
+     * @param metricsPerCategory  mapping of metrics per category
+     * @param fromTimestamp       earliest timestamp to consider (inclusive)
+     * @param toTimestamp         latest timestamp to consider (inclusive)
+     * @param context             call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+                  String source, Map<String, Collection<String>> metricsPerCategory,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    /**
+     * Get usage data as Json using a decimating filter to reduce the number of points
+     *
+     * @param outputStream       stream to write the data to
+     * @param decimationMode     decimation mode for the decimating filter
+     * @param outputCount        number of data point the decimating filter should output, can be null
+     * @param source             source
+     * @param metricsPerCategory mapping of metrics per category
+     * @param fromTimestamp      earliest timestamp to consider (inclusive)
+     * @param toTimestamp        latest timestamp to consider (inclusive)
+     * @param context            call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, DecimationMode decimationMode, @Nullable Integer outputCount,
+                  String source, Map<String, Collection<String>> metricsPerCategory,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    /**
+     * Get usage data for all metrics (given a set of categories) as Json.
+     *
+     * @param outputStream  stream to write the data to
+     * @param source        source
+     * @param categories    categories
+     * @param fromTimestamp earliest timestamp to consider (inclusive)
+     * @param toTimestamp   latest timestamp to consider (inclusive)
+     * @param context       call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, String source, Collection<String> categories,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    /**
+     * Get usage data as Json.
+     *
+     * @param outputStream       stream to write the data to
+     * @param source             source
+     * @param metricsPerCategory mapping of metrics per category
+     * @param fromTimestamp      earliest timestamp to consider (inclusive)
+     * @param toTimestamp        latest timestamp to consider (inclusive)
+     * @param context            call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, String source, Map<String, Collection<String>> metricsPerCategory,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
 
     /**
      * Shortcut API to record a usage value of "1" for a given category and metric.
diff --git a/api/src/main/java/com/ning/billing/meter/api/TimeAggregationMode.java b/api/src/main/java/com/ning/billing/meter/api/TimeAggregationMode.java
new file mode 100644
index 0000000..a35bfb8
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/meter/api/TimeAggregationMode.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api;
+
+public enum TimeAggregationMode {
+    SECONDS,
+    MINUTES,
+    HOURS,
+    DAYS,
+    MONTHS,
+    YEARS
+}
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 82d6d7d..1be8fd9 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -79,7 +79,8 @@ public interface JaxrsResource {
 
     public static final String QUERY_NOTIFICATION_CALLBACK = "cb";
 
-    public static final String QUERY_METER_WITH_AGGREGATE = "aggregate";
+    public static final String QUERY_METER_WITH_CATEGORY_AGGREGATE = "withCategoryAggregate";
+    public static final String QUERY_METER_TIME_AGGREGATION_MODE = "timeAggregationMode";
     public static final String QUERY_METER_TIMESTAMP = "timestamp";
     public static final String QUERY_METER_FROM = "from";
     public static final String QUERY_METER_TO = "to";
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
index 634d63f..f44ada5 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
@@ -43,6 +43,7 @@ import org.joda.time.DateTime;
 import com.ning.billing.jaxrs.util.Context;
 import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
 import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.util.api.AuditUserApi;
 import com.ning.billing.util.api.CustomFieldUserApi;
 import com.ning.billing.util.api.TagUserApi;
@@ -50,6 +51,7 @@ import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.TenantContext;
 import com.ning.billing.util.clock.Clock;
 
+import com.google.common.base.Strings;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -79,12 +81,13 @@ public class MeterResource extends JaxRsResourceBase {
     @Path("/{source:" + STRING_PATTERN + "}")
     @Produces(APPLICATION_JSON)
     public StreamingOutput getUsage(@PathParam("source") final String source,
+                                    // Aggregates per category
                                     @QueryParam(QUERY_METER_CATEGORY) final List<String> categories,
                                     // Format: category,metric
                                     @QueryParam(QUERY_METER_CATEGORY_AND_METRIC) final List<String> categoriesAndMetrics,
                                     @QueryParam(QUERY_METER_FROM) final String fromTimestampString,
                                     @QueryParam(QUERY_METER_TO) final String toTimestampString,
-                                    @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+                                    @QueryParam(QUERY_METER_TIME_AGGREGATION_MODE) @DefaultValue("") final String timeAggregationModeString,
                                     @javax.ws.rs.core.Context final HttpServletRequest request) {
         final DateTime fromTimestamp = DATE_TIME_FORMATTER.parseDateTime(fromTimestampString);
         final DateTime toTimestamp = DATE_TIME_FORMATTER.parseDateTime(toTimestampString);
@@ -93,11 +96,22 @@ public class MeterResource extends JaxRsResourceBase {
         return new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException, WebApplicationException {
-                if (withAggregate) {
-                    meterApi.getAggregateUsage(output, source, categories, fromTimestamp, toTimestamp, tenantContext);
+                // Look at aggregates per category?
+                if (categories != null) {
+                    if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+                        meterApi.getUsage(output, source, categories, fromTimestamp, toTimestamp, tenantContext);
+                    } else {
+                        final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+                        meterApi.getUsage(output, timeAggregationMode, source, categories, fromTimestamp, toTimestamp, tenantContext);
+                    }
                 } else {
                     final Map<String, Collection<String>> metricsPerCategory = retrieveMetricsPerCategory(categoriesAndMetrics);
-                    meterApi.getUsage(output, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                    if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+                        meterApi.getUsage(output, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                    } else {
+                        final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+                        meterApi.getUsage(output, timeAggregationMode, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                    }
                 }
             }
         };
@@ -132,7 +146,7 @@ public class MeterResource extends JaxRsResourceBase {
     public Response recordUsage(@PathParam("source") final String source,
                                 @PathParam("categoryName") final String categoryName,
                                 @PathParam("metricName") final String metricName,
-                                @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+                                @QueryParam(QUERY_METER_WITH_CATEGORY_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
                                 @QueryParam(QUERY_METER_TIMESTAMP) final String timestampString,
                                 @HeaderParam(HDR_CREATED_BY) final String createdBy,
                                 @HeaderParam(HDR_REASON) final String reason,
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..2f933d1
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.api.TimeAggregationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class AccumulatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    private final AccumulatorSampleConsumer accumulatorSampleConsumer;
+
+    private String lastSource;
+    private String lastEventCategory;
+    private String lastMetric;
+
+    public AccumulatingJsonSamplesOutputer(final TimeAggregationMode timeAggregationMode, final TimelineEventHandler timelineEventHandler,
+                                           final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+        this.accumulatorSampleConsumer = new AccumulatorSampleConsumer(timeAggregationMode, new CSVSampleProcessor());
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            final String source = timelineDao.getSource(chunk.getSourceId(), context);
+            final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+            final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+            final String metric = categoryIdAndMetric.getMetric();
+
+            final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, accumulatorSampleConsumer);
+
+            // Don't write out empty samples
+            if (!Strings.isNullOrEmpty(samples)) {
+                generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+            }
+
+            lastSource = source;
+            lastEventCategory = eventCategory;
+            lastMetric = metric;
+        }
+    }
+
+    @Override
+    protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+        final String samples = accumulatorSampleConsumer.flush();
+        // Don't write out empty samples
+        if (!Strings.isNullOrEmpty(samples)) {
+            generator.writeObject(new SamplesForMetricAndSource(lastSource, lastEventCategory, lastMetric, samples));
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
new file mode 100644
index 0000000..4ffd3f6
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Strings;
+
+public class DebugJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    private final boolean withBinaryData;
+    private final ObjectWriter writer;
+
+    public DebugJsonSamplesOutputer(final boolean withBinaryData, final TimelineEventHandler timelineEventHandler,
+                                    final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+        this.withBinaryData = withBinaryData;
+        if (withBinaryData) {
+            this.writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
+        } else {
+            this.writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
+        }
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            if (withBinaryData) {
+                writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
+            } else {
+                final String source = timelineDao.getSource(chunk.getSourceId(), context);
+                final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+                final String category = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+                final String metric = categoryIdAndMetric.getMetric();
+                final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+                // Don't write out empty samples
+                if (!Strings.isNullOrEmpty(samples)) {
+                    generator.writeObject(new SamplesForMetricAndSource(source, category, metric, samples));
+                }
+            }
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..7fed8a7
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.meter.api.DecimationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.filter.DecimatingSampleFilter;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DecimatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    private final Integer outputCount;
+    private final DecimationMode decimationMode;
+
+    private Map<Integer, Map<Integer, DecimatingSampleFilter>> filters;
+
+    public DecimatingJsonSamplesOutputer(final DecimationMode decimationMode, @Nullable final Integer outputCount,
+                                         final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+        this.outputCount = outputCount;
+        this.decimationMode = decimationMode;
+    }
+
+    @Override
+    protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds, final DateTime startTime, final DateTime endTime) throws IOException {
+        // Create the decimating filters
+        filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
+
+        super.output(output, sourceIds, metricIds, startTime, endTime);
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            final String source = timelineDao.getSource(chunk.getSourceId(), context);
+            final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+            final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+            final String metric = categoryIdAndMetric.getMetric();
+            final TimeRangeSampleProcessor filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
+
+            final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
+
+            // Don't write out empty samples
+            if (!Strings.isNullOrEmpty(samples)) {
+                generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+            }
+        }
+    }
+
+    private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> sourceIds, final List<Integer> metricIds, final DecimationMode decimationMode,
+                                                                                             final DateTime startTime, final DateTime endTime, final Integer outputCount) {
+        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
+        for (final Integer sourceId : sourceIds) {
+            filters.put(sourceId, new HashMap<Integer, DecimatingSampleFilter>());
+            for (final Integer metric : metricIds) {
+                filters.get(sourceId).put(metric, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
+            }
+        }
+        return filters;
+    }
+
+    private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
+        final DecimatingSampleFilter rangeSampleProcessor;
+        if (outputCount == null) {
+            rangeSampleProcessor = null;
+        } else {
+            // TODO Fix the polling interval
+            rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleProcessor());
+        }
+
+        return rangeSampleProcessor;
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
new file mode 100644
index 0000000..d11e3f3
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DefaultJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    public DefaultJsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            final String source = timelineDao.getSource(chunk.getSourceId(), context);
+            final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+            final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+            final String metric = categoryIdAndMetric.getMetric();
+
+            final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+            // Don't write out empty samples
+            if (!Strings.isNullOrEmpty(samples)) {
+                generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+            }
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
index 92023cc..880baf3 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
@@ -21,11 +21,14 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.meter.api.DecimationMode;
 import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.meter.timeline.TimelineEventHandler;
 import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.util.callcontext.CallContext;
@@ -56,12 +59,43 @@ public class DefaultMeterUserApi implements MeterUserApi {
     }
 
     @Override
-    public void getAggregateUsage(final OutputStream outputStream, final String source, final Collection<String> categories,
-                                  final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+    public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+                         final String source, final Collection<String> categories,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
+        for (final String category : categories) {
+            metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
+        }
+
+        getUsage(outputStream, timeAggregationMode, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+    }
+
+    @Override
+    public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+                         final String source, final Map<String, Collection<String>> metricsPerCategory,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+        final JsonSamplesOutputer outputerJson = new AccumulatingJsonSamplesOutputer(timeAggregationMode, timelineEventHandler, timelineDao, internalTenantContext);
+        outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+    }
+
+    @Override
+    public void getUsage(final OutputStream outputStream, final DecimationMode decimationMode, @Nullable final Integer outputCount,
+                         final String source, final Map<String, Collection<String>> metricsPerCategory,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+        final JsonSamplesOutputer outputerJson = new DecimatingJsonSamplesOutputer(decimationMode, outputCount, timelineEventHandler, timelineDao, internalTenantContext);
+        outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+    }
+
+    @Override
+    public void getUsage(final OutputStream outputStream, final String source, final Collection<String> categories,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
         final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
         for (final String category : categories) {
             metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
         }
+
         getUsage(outputStream, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
     }
 
@@ -69,8 +103,7 @@ public class DefaultMeterUserApi implements MeterUserApi {
     public void getUsage(final OutputStream outputStream, final String source, final Map<String, Collection<String>> metricsPerCategory,
                          final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
         final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
-
-        final JsonSamplesOutputer outputerJson = new JsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
+        final JsonSamplesOutputer outputerJson = new DefaultJsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
         outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
     }
 
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
index 5309ed7..9478edf 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -29,37 +28,26 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
-import org.skife.config.TimeSpan;
 
 import com.ning.billing.meter.timeline.TimelineEventHandler;
-import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
-import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
 import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
 import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.consumer.CSVConsumer;
-import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
-import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
-import com.ning.billing.meter.timeline.consumer.filter.DecimatingSampleFilter;
-import com.ning.billing.meter.timeline.consumer.filter.DecimationMode;
-import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
 import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Strings;
 
-public class JsonSamplesOutputer {
+public abstract class JsonSamplesOutputer {
 
-    private static final ObjectMapper objectMapper = new ObjectMapper();
+    protected static final ObjectMapper objectMapper = new ObjectMapper();
 
-    private final TimelineEventHandler timelineEventHandler;
-    private final TimelineDao timelineDao;
-    private final SampleCoder sampleCoder;
-    private final InternalTenantContext context;
+    protected final TimelineEventHandler timelineEventHandler;
+    protected final TimelineDao timelineDao;
+    protected final SampleCoder sampleCoder;
+    protected final InternalTenantContext context;
 
     public JsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
         this.timelineEventHandler = timelineEventHandler;
@@ -68,38 +56,31 @@ public class JsonSamplesOutputer {
         this.context = context;
     }
 
-    public void output(final OutputStream output, final List<String> sources, final Map<String, Collection<String>> metricsPerCategory,
-                       final DateTime startTime, final DateTime endTime) throws IOException {
-        // Default - output all data points as CSV
-        output(output, sources, metricsPerCategory, DecimationMode.PEAK_PICK, null, false, false, startTime, endTime);
-    }
+    protected abstract void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException;
 
     public void output(final OutputStream output, final List<String> sources, final Map<String, Collection<String>> metricsPerCategory,
-                       final DecimationMode decimationMode, @Nullable final Integer outputCount, final boolean decodeSamples, final boolean compact,
                        final DateTime startTime, final DateTime endTime) throws IOException {
         // Retrieve the source and metric ids
         final List<Integer> sourceIds = translateSourcesToSourceIds(sources);
         final List<Integer> metricIds = translateCategoriesAndMetricNamesToMetricIds(metricsPerCategory);
+        output(output, sourceIds, metricIds, startTime, endTime);
+    }
 
-        // Create the decimating filters, if needed
-        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
-
+    protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds,
+                          final DateTime startTime, final DateTime endTime) throws IOException {
         // Setup Jackson
-        final ObjectWriter writer;
-        if (compact) {
-            writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
-        } else {
-            writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
-        }
         final JsonGenerator generator = objectMapper.getJsonFactory().createJsonGenerator(output);
 
         generator.writeStartArray();
 
         // First, return all data stored in the database
-        writeJsonForStoredChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+        writeJsonForStoredChunks(generator, sourceIds, metricIds, startTime, endTime);
 
         // Now return all data in memory
-        writeJsonForInMemoryChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+        writeJsonForInMemoryChunks(generator, sourceIds, metricIds, startTime, endTime);
+
+        // Allow implementers to flush their buffers
+        writeRemainingData(generator);
 
         generator.writeEndArray();
 
@@ -107,6 +88,10 @@ public class JsonSamplesOutputer {
         generator.close();
     }
 
+    protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+        // No-op
+    }
+
     private List<Integer> translateSourcesToSourceIds(final List<String> sources) {
         final List<Integer> hostIds = new ArrayList<Integer>(sources.size());
         for (final String source : sources) {
@@ -139,32 +124,8 @@ public class JsonSamplesOutputer {
         return metricIds;
     }
 
-    private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> hostIds, final List<Integer> sampleKindIds, final DecimationMode decimationMode,
-                                                                                             final DateTime startTime, final DateTime endTime, final Integer outputCount) {
-        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
-        for (final Integer hostId : hostIds) {
-            filters.put(hostId, new HashMap<Integer, DecimatingSampleFilter>());
-            for (final Integer sampleKindId : sampleKindIds) {
-                filters.get(hostId).put(sampleKindId, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
-            }
-        }
-        return filters;
-    }
-
-    private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
-        final DecimatingSampleFilter rangeSampleProcessor;
-        if (outputCount == null) {
-            rangeSampleProcessor = null;
-        } else {
-            // TODO Fix the polling interval
-            rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleProcessor());
-        }
-
-        return rangeSampleProcessor;
-    }
-
-    private void writeJsonForStoredChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
-                                          final List<Integer> sampleKindIdsList, final DateTime startTime, final DateTime endTime, final boolean decodeSamples) throws IOException {
+    private void writeJsonForStoredChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+                                          final DateTime startTime, final DateTime endTime) throws IOException {
         final AtomicReference<Integer> lastHostId = new AtomicReference<Integer>(null);
         final AtomicReference<Integer> lastSampleKindId = new AtomicReference<Integer>(null);
         final List<TimelineChunk> chunksForHostAndSampleKind = new ArrayList<TimelineChunk>();
@@ -180,7 +141,7 @@ public class JsonSamplesOutputer {
                 chunksForHostAndSampleKind.add(chunks);
                 if (previousHostId != null && (!previousHostId.equals(currentHostId) || !previousSampleKindId.equals(currentSampleKindId))) {
                     try {
-                        writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+                        writeJsonForChunks(generator, chunksForHostAndSampleKind);
                     } catch (IOException e) {
                         throw new RuntimeException(e);
                     }
@@ -193,13 +154,13 @@ public class JsonSamplesOutputer {
         }, context);
 
         if (chunksForHostAndSampleKind.size() > 0) {
-            writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+            writeJsonForChunks(generator, chunksForHostAndSampleKind);
             chunksForHostAndSampleKind.clear();
         }
     }
 
-    private void writeJsonForInMemoryChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
-                                            final List<Integer> sampleKindIdsList, @Nullable final DateTime startTime, @Nullable final DateTime endTime, final boolean decodeSamples) throws IOException {
+    private void writeJsonForInMemoryChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+                                            @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
 
         for (final Integer hostId : hostIdsList) {
             final Collection<? extends TimelineChunk> inMemorySamples;
@@ -208,30 +169,7 @@ public class JsonSamplesOutputer {
             } catch (ExecutionException e) {
                 throw new IOException(e);
             }
-            writeJsonForChunks(generator, writer, filters, inMemorySamples, decodeSamples);
-        }
-    }
-
-    private void writeJsonForChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters,
-                                    final Iterable<? extends TimelineChunk> chunksForHostAndSampleKind, final boolean decodeSamples) throws IOException {
-        for (final TimelineChunk chunk : chunksForHostAndSampleKind) {
-            if (decodeSamples) {
-                writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
-            } else {
-                final String hostName = timelineDao.getSource(chunk.getSourceId(), context);
-                final CategoryRecordIdAndMetric categoryIdAndSampleKind = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
-                final String eventCategory = timelineDao.getEventCategory(categoryIdAndSampleKind.getEventCategoryId(), context);
-                final String sampleKind = categoryIdAndSampleKind.getMetric();
-                // TODO pass compact form
-                final DecimatingSampleFilter filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
-                // TODO CSV only for now
-                final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
-
-                // Don't write out empty samples
-                if (!Strings.isNullOrEmpty(samples)) {
-                    generator.writeObject(new SamplesForMetricAndSource(hostName, eventCategory, sampleKind, samples));
-                }
-            }
+            writeJsonForChunks(generator, inMemorySamples);
         }
     }
 }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
index 31f8260..0c05cba 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
@@ -22,20 +22,12 @@ import java.util.Map;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.meter.timeline.samples.ScalarSample;
 
 public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
 
-    public enum TimeAggregationMode {
-        SECONDS,
-        MINUTES,
-        HOURS,
-        DAYS,
-        MONTHS,
-        YEARS
-    }
-
     private final StringBuilder builder = new StringBuilder();
     // Linked HashMap to keep ordering of opcodes as they came
     private final Map<SampleOpcode, Double> accumulators = new LinkedHashMap<SampleOpcode, Double>();
@@ -46,11 +38,10 @@ public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
     private DateTime lastRoundedTime = null;
     private int aggregatedSampleNumber = 0;
 
-    public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode) {
+    public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode, final TimeRangeSampleProcessor sampleProcessor) {
         super(null, null);
         this.timeAggregationMode = timeAggregationMode;
-        // TODO should be configurable
-        this.sampleProcessor = new CSVSampleProcessor();
+        this.sampleProcessor = sampleProcessor;
     }
 
     @Override
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java
index 401e1b4..51a0e6f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java
@@ -19,6 +19,7 @@ package com.ning.billing.meter.timeline.consumer.filter;
 import org.joda.time.DateTime;
 import org.skife.config.TimeSpan;
 
+import com.ning.billing.meter.api.DecimationMode;
 import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.meter.timeline.samples.ScalarSample;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java
index f4186a7..a5d6954 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java
@@ -25,6 +25,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.api.DecimationMode;
 import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
index 933eaa5..2aa9513 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
@@ -22,7 +22,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.ning.billing.meter.MeterTestSuite;
-import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer.TimeAggregationMode;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.util.clock.ClockMock;
 
@@ -35,7 +35,7 @@ public class TestAccumulatorSampleConsumer extends MeterTestSuite {
         clock.setTime(new DateTime(2012, 12, 1, 12, 40, DateTimeZone.UTC));
         final DateTime start = clock.getUTCNow();
 
-        final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS);
+        final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS, new CSVSampleProcessor());
 
         // 5 for day 1
         sampleConsumer.processOneSample(start, SampleOpcode.DOUBLE, (double) 1);
diff --git a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
index eb09494..238af51 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
@@ -811,7 +811,7 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
 
     protected void recordMeteringUsage(final String source, final String category, final String metric, final DateTime timestamp) throws IOException {
         final String meterURI = JaxrsResource.METER_PATH + "/" + source + "/" + category + "/" + metric;
-        final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true",
+        final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_CATEGORY_AGGREGATE, "true",
                                                                                               JaxrsResource.QUERY_METER_TIMESTAMP, timestamp.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
         assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
     }
@@ -820,8 +820,7 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
         final String meterURI = JaxrsResource.METER_PATH + "/" + source;
         final Response meterResponse = doGet(meterURI, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_CATEGORY, category,
                                                                                        JaxrsResource.QUERY_METER_FROM, from.toString(),
-                                                                                       JaxrsResource.QUERY_METER_TO, to.toString(),
-                                                                                       JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true"), DEFAULT_HTTP_TIMEOUT_SEC);
+                                                                                       JaxrsResource.QUERY_METER_TO, to.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
         assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
 
         return mapper.readValue(meterResponse.getResponseBody(), new TypeReference<List<Map<String, Object>>>() {});