killbill-memoizeit

Changes

meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVOutputProcessor.java 45(+0 -45)

Details

diff --git a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
index b3aee1f..de7e9ae 100644
--- a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
+++ b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
@@ -20,7 +20,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.UUID;
+
+import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
 
@@ -29,21 +30,96 @@ import com.ning.billing.util.callcontext.TenantContext;
 
 public interface MeterUserApi {
 
-    public void getAggregateUsage(OutputStream outputStream, UUID bundleId, Collection<String> categories,
-                                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+    /**
+     * Get usage data for all metrics (given a set of categories) as Json using an accumulating filter.
+     * <p/>
+     * Emulate: select sum(value) from chunks group by source, category, date_trunc('timeAggregationMode', timestamp);
+     *
+     * @param outputStream        stream to write the data to
+     * @param timeAggregationMode granularity of the accumulator
+     * @param source              source
+     * @param categories          categories
+     * @param fromTimestamp       earliest timestamp to consider (inclusive)
+     * @param toTimestamp         latest timestamp to consider (inclusive)
+     * @param context             call context
+     * @throws IOException
+     */
+    void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+                  String source, Collection<String> categories,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
 
-    public void getUsage(OutputStream outputStream, UUID bundleId, Map<String, Collection<String>> metricsPerCategory,
-                         DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+    /**
+     * Get usage data as Json using an accumulating filter.
+     * <p/>
+     * Emulate: select sum(value) from chunks group by source, category, metric, date_trunc('timeAggregationMode', timestamp);
+     *
+     * @param outputStream        stream to write the data to
+     * @param timeAggregationMode granularity of the accumulator
+     * @param source              source
+     * @param metricsPerCategory  mapping of metrics per category
+     * @param fromTimestamp       earliest timestamp to consider (inclusive)
+     * @param toTimestamp         latest timestamp to consider (inclusive)
+     * @param context             call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+                  String source, Map<String, Collection<String>> metricsPerCategory,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    /**
+     * Get usage data as Json using a decimating filter to reduce the number of points
+     *
+     * @param outputStream       stream to write the data to
+     * @param decimationMode     decimation mode for the decimating filter
+     * @param outputCount        number of data point the decimating filter should output, can be null
+     * @param source             source
+     * @param metricsPerCategory mapping of metrics per category
+     * @param fromTimestamp      earliest timestamp to consider (inclusive)
+     * @param toTimestamp        latest timestamp to consider (inclusive)
+     * @param context            call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, DecimationMode decimationMode, @Nullable Integer outputCount,
+                  String source, Map<String, Collection<String>> metricsPerCategory,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    /**
+     * Get usage data for all metrics (given a set of categories) as Json.
+     *
+     * @param outputStream  stream to write the data to
+     * @param source        source
+     * @param categories    categories
+     * @param fromTimestamp earliest timestamp to consider (inclusive)
+     * @param toTimestamp   latest timestamp to consider (inclusive)
+     * @param context       call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, String source, Collection<String> categories,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    /**
+     * Get usage data as Json.
+     *
+     * @param outputStream       stream to write the data to
+     * @param source             source
+     * @param metricsPerCategory mapping of metrics per category
+     * @param fromTimestamp      earliest timestamp to consider (inclusive)
+     * @param toTimestamp        latest timestamp to consider (inclusive)
+     * @param context            call context
+     * @throws IOException generic I/O exception
+     */
+    void getUsage(OutputStream outputStream, String source, Map<String, Collection<String>> metricsPerCategory,
+                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
 
     /**
      * Shortcut API to record a usage value of "1" for a given category and metric.
      *
-     * @param bundleId     bundle id source
+     * @param source       source name for this usage
      * @param categoryName category name for this usage
      * @param metricName   metric name associated with this category
      * @param context      call context
      */
-    public void incrementUsage(UUID bundleId, String categoryName, String metricName, DateTime timestamp, CallContext context);
+    public void incrementUsage(String source, String categoryName, String metricName, DateTime timestamp, CallContext context);
 
     /**
      * Shortcut API to record a usage value of "1" for a given category and metric.
@@ -52,20 +128,20 @@ public interface MeterUserApi {
      * This is useful if one wants to store fine grained usage information (e.g. number of minutes used per cell phone number),
      * while keeping a fast access path to the aggregate (e.g. number of minutes used across all cell phone numbers).
      *
-     * @param bundleId     bundle id source
+     * @param source       source name for this usage
      * @param categoryName category name for this usage
      * @param metricName   metric name associated with this category
      * @param context      call context
      */
-    public void incrementUsageAndAggregate(UUID bundleId, String categoryName, String metricName, DateTime timestamp, CallContext context);
+    public void incrementUsageAndAggregate(String source, String categoryName, String metricName, DateTime timestamp, CallContext context);
 
     /**
      * Fine grained usage API. This is used to record e.g. "X has used 2 credits at 2012/02/04 4:12pm".
      *
-     * @param bundleId                       bundle id source
+     * @param source                         source name for this usage
      * @param samplesForCategoriesAndMetrics samples per metric and category
      * @param timestamp                      timestamp of this usage
      * @param context                        tenant context
      */
-    public void recordUsage(UUID bundleId, Map<String, Map<String, Object>> samplesForCategoriesAndMetrics, DateTime timestamp, CallContext context);
+    public void recordUsage(String source, Map<String, Map<String, Object>> samplesForCategoriesAndMetrics, DateTime timestamp, CallContext context);
 }
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index d2973cb..1be8fd9 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -37,7 +37,7 @@ public interface JaxrsResource {
     /*
      * Patterns
      */
-    public static String STRING_PATTERN = "\\w+";
+    public static String STRING_PATTERN = "[\\w-]+";
     public static String UUID_PATTERN = "\\w+-\\w+-\\w+-\\w+-\\w+";
 
     /*
@@ -79,7 +79,8 @@ public interface JaxrsResource {
 
     public static final String QUERY_NOTIFICATION_CALLBACK = "cb";
 
-    public static final String QUERY_METER_WITH_AGGREGATE = "aggregate";
+    public static final String QUERY_METER_WITH_CATEGORY_AGGREGATE = "withCategoryAggregate";
+    public static final String QUERY_METER_TIME_AGGREGATION_MODE = "timeAggregationMode";
     public static final String QUERY_METER_TIMESTAMP = "timestamp";
     public static final String QUERY_METER_FROM = "from";
     public static final String QUERY_METER_TO = "to";
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
index 9b81ac9..f44ada5 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -44,6 +43,7 @@ import org.joda.time.DateTime;
 import com.ning.billing.jaxrs.util.Context;
 import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
 import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.util.api.AuditUserApi;
 import com.ning.billing.util.api.CustomFieldUserApi;
 import com.ning.billing.util.api.TagUserApi;
@@ -51,7 +51,7 @@ import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.TenantContext;
 import com.ning.billing.util.clock.Clock;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -61,8 +61,6 @@ import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
 @Path(JaxrsResource.METER_PATH)
 public class MeterResource extends JaxRsResourceBase {
 
-    private static final ObjectMapper objectMapper = new ObjectMapper();
-
     private final MeterUserApi meterApi;
     private final Clock clock;
 
@@ -80,17 +78,17 @@ public class MeterResource extends JaxRsResourceBase {
     }
 
     @GET
-    @Path("/{bundleId:" + UUID_PATTERN + "}")
+    @Path("/{source:" + STRING_PATTERN + "}")
     @Produces(APPLICATION_JSON)
-    public StreamingOutput getUsage(@PathParam("bundleId") final String bundleIdString,
+    public StreamingOutput getUsage(@PathParam("source") final String source,
+                                    // Aggregates per category
                                     @QueryParam(QUERY_METER_CATEGORY) final List<String> categories,
                                     // Format: category,metric
                                     @QueryParam(QUERY_METER_CATEGORY_AND_METRIC) final List<String> categoriesAndMetrics,
                                     @QueryParam(QUERY_METER_FROM) final String fromTimestampString,
                                     @QueryParam(QUERY_METER_TO) final String toTimestampString,
-                                    @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+                                    @QueryParam(QUERY_METER_TIME_AGGREGATION_MODE) @DefaultValue("") final String timeAggregationModeString,
                                     @javax.ws.rs.core.Context final HttpServletRequest request) {
-        final UUID bundleId = UUID.fromString(bundleIdString);
         final DateTime fromTimestamp = DATE_TIME_FORMATTER.parseDateTime(fromTimestampString);
         final DateTime toTimestamp = DATE_TIME_FORMATTER.parseDateTime(toTimestampString);
         final TenantContext tenantContext = context.createContext(request);
@@ -98,11 +96,22 @@ public class MeterResource extends JaxRsResourceBase {
         return new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException, WebApplicationException {
-                if (withAggregate) {
-                    meterApi.getAggregateUsage(output, bundleId, categories, fromTimestamp, toTimestamp, tenantContext);
+                // Look at aggregates per category?
+                if (categories != null) {
+                    if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+                        meterApi.getUsage(output, source, categories, fromTimestamp, toTimestamp, tenantContext);
+                    } else {
+                        final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+                        meterApi.getUsage(output, timeAggregationMode, source, categories, fromTimestamp, toTimestamp, tenantContext);
+                    }
                 } else {
                     final Map<String, Collection<String>> metricsPerCategory = retrieveMetricsPerCategory(categoriesAndMetrics);
-                    meterApi.getUsage(output, bundleId, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                    if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+                        meterApi.getUsage(output, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                    } else {
+                        final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+                        meterApi.getUsage(output, timeAggregationMode, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                    }
                 }
             }
         };
@@ -131,19 +140,18 @@ public class MeterResource extends JaxRsResourceBase {
     }
 
     @POST
-    @Path("/{bundleId:" + UUID_PATTERN + "}/{categoryName:" + STRING_PATTERN + "}/{metricName:" + STRING_PATTERN + "}")
+    @Path("/{source:" + STRING_PATTERN + "}/{categoryName:" + STRING_PATTERN + "}/{metricName:" + STRING_PATTERN + "}")
     @Consumes(APPLICATION_JSON)
     @Produces(APPLICATION_JSON)
-    public Response recordUsage(@PathParam("bundleId") final String bundleIdString,
+    public Response recordUsage(@PathParam("source") final String source,
                                 @PathParam("categoryName") final String categoryName,
                                 @PathParam("metricName") final String metricName,
-                                @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+                                @QueryParam(QUERY_METER_WITH_CATEGORY_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
                                 @QueryParam(QUERY_METER_TIMESTAMP) final String timestampString,
                                 @HeaderParam(HDR_CREATED_BY) final String createdBy,
                                 @HeaderParam(HDR_REASON) final String reason,
                                 @HeaderParam(HDR_COMMENT) final String comment,
                                 @javax.ws.rs.core.Context final HttpServletRequest request) {
-        final UUID bundleId = UUID.fromString(bundleIdString);
         final CallContext callContext = context.createContext(createdBy, reason, comment, request);
 
         final DateTime timestamp;
@@ -154,9 +162,9 @@ public class MeterResource extends JaxRsResourceBase {
         }
 
         if (withAggregate) {
-            meterApi.incrementUsageAndAggregate(bundleId, categoryName, metricName, timestamp, callContext);
+            meterApi.incrementUsageAndAggregate(source, categoryName, metricName, timestamp, callContext);
         } else {
-            meterApi.incrementUsage(bundleId, categoryName, metricName, timestamp, callContext);
+            meterApi.incrementUsage(source, categoryName, metricName, timestamp, callContext);
         }
 
         return Response.ok().build();
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..2f933d1
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.api.TimeAggregationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class AccumulatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    private final AccumulatorSampleConsumer accumulatorSampleConsumer;
+
+    private String lastSource;
+    private String lastEventCategory;
+    private String lastMetric;
+
+    public AccumulatingJsonSamplesOutputer(final TimeAggregationMode timeAggregationMode, final TimelineEventHandler timelineEventHandler,
+                                           final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+        this.accumulatorSampleConsumer = new AccumulatorSampleConsumer(timeAggregationMode, new CSVSampleProcessor());
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            final String source = timelineDao.getSource(chunk.getSourceId(), context);
+            final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+            final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+            final String metric = categoryIdAndMetric.getMetric();
+
+            final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, accumulatorSampleConsumer);
+
+            // Don't write out empty samples
+            if (!Strings.isNullOrEmpty(samples)) {
+                generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+            }
+
+            lastSource = source;
+            lastEventCategory = eventCategory;
+            lastMetric = metric;
+        }
+    }
+
+    @Override
+    protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+        final String samples = accumulatorSampleConsumer.flush();
+        // Don't write out empty samples
+        if (!Strings.isNullOrEmpty(samples)) {
+            generator.writeObject(new SamplesForMetricAndSource(lastSource, lastEventCategory, lastMetric, samples));
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
new file mode 100644
index 0000000..4ffd3f6
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Strings;
+
+public class DebugJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    private final boolean withBinaryData;
+    private final ObjectWriter writer;
+
+    public DebugJsonSamplesOutputer(final boolean withBinaryData, final TimelineEventHandler timelineEventHandler,
+                                    final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+        this.withBinaryData = withBinaryData;
+        if (withBinaryData) {
+            this.writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
+        } else {
+            this.writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
+        }
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            if (withBinaryData) {
+                writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
+            } else {
+                final String source = timelineDao.getSource(chunk.getSourceId(), context);
+                final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+                final String category = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+                final String metric = categoryIdAndMetric.getMetric();
+                final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+                // Don't write out empty samples
+                if (!Strings.isNullOrEmpty(samples)) {
+                    generator.writeObject(new SamplesForMetricAndSource(source, category, metric, samples));
+                }
+            }
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..7fed8a7
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.meter.api.DecimationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.filter.DecimatingSampleFilter;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DecimatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    private final Integer outputCount;
+    private final DecimationMode decimationMode;
+
+    private Map<Integer, Map<Integer, DecimatingSampleFilter>> filters;
+
+    public DecimatingJsonSamplesOutputer(final DecimationMode decimationMode, @Nullable final Integer outputCount,
+                                         final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+        this.outputCount = outputCount;
+        this.decimationMode = decimationMode;
+    }
+
+    @Override
+    protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds, final DateTime startTime, final DateTime endTime) throws IOException {
+        // Create the decimating filters
+        filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
+
+        super.output(output, sourceIds, metricIds, startTime, endTime);
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            final String source = timelineDao.getSource(chunk.getSourceId(), context);
+            final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+            final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+            final String metric = categoryIdAndMetric.getMetric();
+            final TimeRangeSampleProcessor filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
+
+            final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
+
+            // Don't write out empty samples
+            if (!Strings.isNullOrEmpty(samples)) {
+                generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+            }
+        }
+    }
+
+    private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> sourceIds, final List<Integer> metricIds, final DecimationMode decimationMode,
+                                                                                             final DateTime startTime, final DateTime endTime, final Integer outputCount) {
+        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
+        for (final Integer sourceId : sourceIds) {
+            filters.put(sourceId, new HashMap<Integer, DecimatingSampleFilter>());
+            for (final Integer metric : metricIds) {
+                filters.get(sourceId).put(metric, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
+            }
+        }
+        return filters;
+    }
+
+    private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
+        final DecimatingSampleFilter rangeSampleProcessor;
+        if (outputCount == null) {
+            rangeSampleProcessor = null;
+        } else {
+            // TODO Fix the polling interval
+            rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleProcessor());
+        }
+
+        return rangeSampleProcessor;
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
new file mode 100644
index 0000000..d11e3f3
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DefaultJsonSamplesOutputer extends JsonSamplesOutputer {
+
+    public DefaultJsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+        super(timelineEventHandler, timelineDao, context);
+    }
+
+    @Override
+    protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+        for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+            final String source = timelineDao.getSource(chunk.getSourceId(), context);
+            final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+            final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+            final String metric = categoryIdAndMetric.getMetric();
+
+            final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+            // Don't write out empty samples
+            if (!Strings.isNullOrEmpty(samples)) {
+                generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+            }
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
index 8ea781b..880baf3 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
@@ -20,14 +20,15 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.UUID;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
 
-import com.ning.billing.ObjectType;
+import com.ning.billing.meter.api.DecimationMode;
 import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.meter.timeline.TimelineEventHandler;
 import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.util.callcontext.CallContext;
@@ -58,50 +59,78 @@ public class DefaultMeterUserApi implements MeterUserApi {
     }
 
     @Override
-    public void getAggregateUsage(final OutputStream outputStream, final UUID bundleId, final Collection<String> categories,
-                                  final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+    public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+                         final String source, final Collection<String> categories,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
         final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
         for (final String category : categories) {
             metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
         }
-        getUsage(outputStream, bundleId, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+
+        getUsage(outputStream, timeAggregationMode, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
     }
 
     @Override
-    public void getUsage(final OutputStream outputStream, final UUID bundleId, final Map<String, Collection<String>> metricsPerCategory,
+    public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+                         final String source, final Map<String, Collection<String>> metricsPerCategory,
                          final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
         final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+        final JsonSamplesOutputer outputerJson = new AccumulatingJsonSamplesOutputer(timeAggregationMode, timelineEventHandler, timelineDao, internalTenantContext);
+        outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+    }
 
-        final JsonSamplesOutputer outputerJson = new JsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
-        outputerJson.output(outputStream, ImmutableList.<UUID>of(bundleId), metricsPerCategory, fromTimestamp, toTimestamp);
+    @Override
+    public void getUsage(final OutputStream outputStream, final DecimationMode decimationMode, @Nullable final Integer outputCount,
+                         final String source, final Map<String, Collection<String>> metricsPerCategory,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+        final JsonSamplesOutputer outputerJson = new DecimatingJsonSamplesOutputer(decimationMode, outputCount, timelineEventHandler, timelineDao, internalTenantContext);
+        outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
     }
 
     @Override
-    public void incrementUsage(final UUID bundleId, final String categoryName, final String metricName,
+    public void getUsage(final OutputStream outputStream, final String source, final Collection<String> categories,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
+        for (final String category : categories) {
+            metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
+        }
+
+        getUsage(outputStream, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+    }
+
+    @Override
+    public void getUsage(final OutputStream outputStream, final String source, final Map<String, Collection<String>> metricsPerCategory,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+        final JsonSamplesOutputer outputerJson = new DefaultJsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
+        outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+    }
+
+    @Override
+    public void incrementUsage(final String source, final String categoryName, final String metricName,
                                final DateTime timestamp, final CallContext context) {
-        recordUsage(bundleId,
+        recordUsage(source,
                     ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1)),
                     timestamp,
                     context);
     }
 
     @Override
-    public void incrementUsageAndAggregate(final UUID bundleId, final String categoryName, final String metricName,
+    public void incrementUsageAndAggregate(final String source, final String categoryName, final String metricName,
                                            final DateTime timestamp, final CallContext context) {
-        recordUsage(bundleId,
+        recordUsage(source,
                     ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1, AGGREGATE_METRIC_NAME, (short) 1)),
                     timestamp,
                     context);
     }
 
     @Override
-    public void recordUsage(final UUID bundleId, final Map<String, Map<String, Object>> samplesForCategoriesAndMetrics,
+    public void recordUsage(final String source, final Map<String, Map<String, Object>> samplesForCategoriesAndMetrics,
                             final DateTime timestamp, final CallContext context) {
-        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(bundleId, ObjectType.BUNDLE, context);
-
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(context);
         for (final String category : samplesForCategoriesAndMetrics.keySet()) {
-            final String sourceName = bundleId.toString();
-            timelineEventHandler.record(sourceName, category, timestamp, samplesForCategoriesAndMetrics.get(category),
+            timelineEventHandler.record(source, category, timestamp, samplesForCategoriesAndMetrics.get(category),
                                         internalCallContext);
         }
     }
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
index b589af5..9478edf 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
@@ -20,47 +20,34 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
-import org.skife.config.TimeSpan;
 
 import com.ning.billing.meter.timeline.TimelineEventHandler;
-import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
-import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
 import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
 import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.codec.TimelineChunkDecoded;
-import com.ning.billing.meter.timeline.consumer.CSVConsumer;
-import com.ning.billing.meter.timeline.consumer.CSVSampleConsumer;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
-import com.ning.billing.meter.timeline.filter.DecimatingSampleFilter;
-import com.ning.billing.meter.timeline.filter.DecimationMode;
-import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
 import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Strings;
 
-public class JsonSamplesOutputer {
+public abstract class JsonSamplesOutputer {
 
-    private static final ObjectMapper objectMapper = new ObjectMapper();
+    protected static final ObjectMapper objectMapper = new ObjectMapper();
 
-    private final TimelineEventHandler timelineEventHandler;
-    private final TimelineDao timelineDao;
-    private final SampleCoder sampleCoder;
-    private final InternalTenantContext context;
+    protected final TimelineEventHandler timelineEventHandler;
+    protected final TimelineDao timelineDao;
+    protected final SampleCoder sampleCoder;
+    protected final InternalTenantContext context;
 
     public JsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
         this.timelineEventHandler = timelineEventHandler;
@@ -69,38 +56,31 @@ public class JsonSamplesOutputer {
         this.context = context;
     }
 
-    public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
-                       final DateTime startTime, final DateTime endTime) throws IOException {
-        // Default - output all data points as CSV
-        output(output, bundleIds, metricsPerCategory, DecimationMode.PEAK_PICK, null, false, false, startTime, endTime);
-    }
+    protected abstract void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException;
 
-    public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
-                       final DecimationMode decimationMode, @Nullable final Integer outputCount, final boolean decodeSamples, final boolean compact,
+    public void output(final OutputStream output, final List<String> sources, final Map<String, Collection<String>> metricsPerCategory,
                        final DateTime startTime, final DateTime endTime) throws IOException {
         // Retrieve the source and metric ids
-        final List<Integer> sourceIds = translateBundleIdsToSourceIds(bundleIds);
+        final List<Integer> sourceIds = translateSourcesToSourceIds(sources);
         final List<Integer> metricIds = translateCategoriesAndMetricNamesToMetricIds(metricsPerCategory);
+        output(output, sourceIds, metricIds, startTime, endTime);
+    }
 
-        // Create the decimating filters, if needed
-        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
-
+    protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds,
+                          final DateTime startTime, final DateTime endTime) throws IOException {
         // Setup Jackson
-        final ObjectWriter writer;
-        if (compact) {
-            writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
-        } else {
-            writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
-        }
         final JsonGenerator generator = objectMapper.getJsonFactory().createJsonGenerator(output);
 
         generator.writeStartArray();
 
         // First, return all data stored in the database
-        writeJsonForStoredChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+        writeJsonForStoredChunks(generator, sourceIds, metricIds, startTime, endTime);
 
         // Now return all data in memory
-        writeJsonForInMemoryChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+        writeJsonForInMemoryChunks(generator, sourceIds, metricIds, startTime, endTime);
+
+        // Allow implementers to flush their buffers
+        writeRemainingData(generator);
 
         generator.writeEndArray();
 
@@ -108,10 +88,14 @@ public class JsonSamplesOutputer {
         generator.close();
     }
 
-    private List<Integer> translateBundleIdsToSourceIds(final List<UUID> bundleIds) {
-        final List<Integer> hostIds = new ArrayList<Integer>(bundleIds.size());
-        for (final UUID bundleId : bundleIds) {
-            hostIds.add(timelineDao.getSourceId(bundleId.toString(), context));
+    protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+        // No-op
+    }
+
+    private List<Integer> translateSourcesToSourceIds(final List<String> sources) {
+        final List<Integer> hostIds = new ArrayList<Integer>(sources.size());
+        for (final String source : sources) {
+            hostIds.add(timelineDao.getSourceId(source, context));
         }
 
         return hostIds;
@@ -140,32 +124,8 @@ public class JsonSamplesOutputer {
         return metricIds;
     }
 
-    private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> hostIds, final List<Integer> sampleKindIds, final DecimationMode decimationMode,
-                                                                                             final DateTime startTime, final DateTime endTime, final Integer outputCount) {
-        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
-        for (final Integer hostId : hostIds) {
-            filters.put(hostId, new HashMap<Integer, DecimatingSampleFilter>());
-            for (final Integer sampleKindId : sampleKindIds) {
-                filters.get(hostId).put(sampleKindId, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
-            }
-        }
-        return filters;
-    }
-
-    private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
-        final DecimatingSampleFilter rangeSampleProcessor;
-        if (outputCount == null) {
-            rangeSampleProcessor = null;
-        } else {
-            // TODO Fix the polling interval
-            rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleConsumer());
-        }
-
-        return rangeSampleProcessor;
-    }
-
-    private void writeJsonForStoredChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
-                                          final List<Integer> sampleKindIdsList, final DateTime startTime, final DateTime endTime, final boolean decodeSamples) throws IOException {
+    private void writeJsonForStoredChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+                                          final DateTime startTime, final DateTime endTime) throws IOException {
         final AtomicReference<Integer> lastHostId = new AtomicReference<Integer>(null);
         final AtomicReference<Integer> lastSampleKindId = new AtomicReference<Integer>(null);
         final List<TimelineChunk> chunksForHostAndSampleKind = new ArrayList<TimelineChunk>();
@@ -181,7 +141,7 @@ public class JsonSamplesOutputer {
                 chunksForHostAndSampleKind.add(chunks);
                 if (previousHostId != null && (!previousHostId.equals(currentHostId) || !previousSampleKindId.equals(currentSampleKindId))) {
                     try {
-                        writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+                        writeJsonForChunks(generator, chunksForHostAndSampleKind);
                     } catch (IOException e) {
                         throw new RuntimeException(e);
                     }
@@ -194,13 +154,13 @@ public class JsonSamplesOutputer {
         }, context);
 
         if (chunksForHostAndSampleKind.size() > 0) {
-            writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+            writeJsonForChunks(generator, chunksForHostAndSampleKind);
             chunksForHostAndSampleKind.clear();
         }
     }
 
-    private void writeJsonForInMemoryChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
-                                            final List<Integer> sampleKindIdsList, @Nullable final DateTime startTime, @Nullable final DateTime endTime, final boolean decodeSamples) throws IOException {
+    private void writeJsonForInMemoryChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+                                            @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
 
         for (final Integer hostId : hostIdsList) {
             final Collection<? extends TimelineChunk> inMemorySamples;
@@ -209,30 +169,7 @@ public class JsonSamplesOutputer {
             } catch (ExecutionException e) {
                 throw new IOException(e);
             }
-            writeJsonForChunks(generator, writer, filters, inMemorySamples, decodeSamples);
-        }
-    }
-
-    private void writeJsonForChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters,
-                                    final Iterable<? extends TimelineChunk> chunksForHostAndSampleKind, final boolean decodeSamples) throws IOException {
-        for (final TimelineChunk chunk : chunksForHostAndSampleKind) {
-            if (decodeSamples) {
-                writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
-            } else {
-                final String hostName = timelineDao.getSource(chunk.getSourceId(), context);
-                final CategoryRecordIdAndMetric categoryIdAndSampleKind = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
-                final String eventCategory = timelineDao.getEventCategory(categoryIdAndSampleKind.getEventCategoryId(), context);
-                final String sampleKind = categoryIdAndSampleKind.getMetric();
-                // TODO pass compact form
-                final DecimatingSampleFilter filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
-                // TODO CSV only for now
-                final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
-
-                // Don't write out empty samples
-                if (!Strings.isNullOrEmpty(samples)) {
-                    generator.writeObject(new SamplesForMetricAndSource(hostName, eventCategory, sampleKind, samples));
-                }
-            }
+            writeJsonForChunks(generator, inMemorySamples);
         }
     }
 }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java b/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java
index 982641a..dd286c1 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java
@@ -31,13 +31,13 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.config.MeterConfig;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.config.MeterConfig;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java b/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java
index 743f677..3e79502 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.SampleProcessor;
 import com.ning.billing.meter.timeline.samples.HalfFloat;
 import com.ning.billing.meter.timeline.samples.RepeatSample;
 import com.ning.billing.meter.timeline.samples.SampleBase;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java b/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java
index d690bcd..3a9fdf7 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.SampleProcessor;
 import com.ning.billing.meter.timeline.samples.SampleBase;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.meter.timeline.samples.ScalarSample;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
index d71f14b..0c05cba 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
@@ -22,36 +22,26 @@ import java.util.Map;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-import com.ning.billing.meter.timeline.codec.TimeRangeSampleProcessor;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.meter.timeline.samples.ScalarSample;
 
 public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
 
-    public enum TimeAggregationMode {
-        SECONDS,
-        MINUTES,
-        HOURS,
-        DAYS,
-        MONTHS,
-        YEARS
-    }
-
     private final StringBuilder builder = new StringBuilder();
     // Linked HashMap to keep ordering of opcodes as they came
     private final Map<SampleOpcode, Double> accumulators = new LinkedHashMap<SampleOpcode, Double>();
 
     private final TimeAggregationMode timeAggregationMode;
-    private final SampleConsumer sampleConsumer;
+    private final TimeRangeSampleProcessor sampleProcessor;
 
     private DateTime lastRoundedTime = null;
     private int aggregatedSampleNumber = 0;
 
-    public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode) {
+    public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode, final TimeRangeSampleProcessor sampleProcessor) {
         super(null, null);
         this.timeAggregationMode = timeAggregationMode;
-        // TODO should be configurable
-        this.sampleConsumer = new CSVSampleConsumer();
+        this.sampleProcessor = sampleProcessor;
     }
 
     @Override
@@ -109,10 +99,10 @@ public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
         // Output one opcode at a time
         for (final SampleOpcode opcode : accumulators.keySet()) {
             aggregatedSampleNumber++;
-            sampleConsumer.consumeSample(aggregatedSampleNumber, opcode, accumulators.get(opcode), lastRoundedTime);
+            sampleProcessor.processOneSample(lastRoundedTime, opcode, accumulators.get(opcode));
         }
         // This will flush (clear) the sample consumer
-        builder.append(sampleConsumer.toString());
+        builder.append(sampleProcessor.toString());
 
         accumulators.clear();
     }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
index 3c4531e..c62f717 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
@@ -24,25 +24,22 @@ import org.joda.time.DateTime;
 
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.filter.DecimatingSampleFilter;
 
 public class CSVConsumer {
 
-    private CSVConsumer() {
-    }
-
-    public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, final DecimatingSampleFilter rangeSampleProcessor) throws IOException {
-        sampleCoder.scan(chunk, rangeSampleProcessor);
-        return rangeSampleProcessor.getSampleConsumer().toString();
-    }
+    private CSVConsumer() {}
 
     public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk) throws IOException {
         return getSamplesAsCSV(sampleCoder, chunk, null, null);
     }
 
     public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
-        final CSVOutputProcessor processor = new CSVOutputProcessor(startTime, endTime);
-        sampleCoder.scan(chunk, processor);
-        return processor.toString();
+        final CSVSampleProcessor processor = new CSVSampleProcessor(startTime, endTime);
+        return getSamplesAsCSV(sampleCoder, chunk, processor);
+    }
+
+    public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, final TimeRangeSampleProcessor rangeSampleProcessor) throws IOException {
+        sampleCoder.scan(chunk, rangeSampleProcessor);
+        return rangeSampleProcessor.toString();
     }
 }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index 44658fa..e1b3413 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -38,7 +38,6 @@ import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.meter.timeline.util.DateTimeUtils;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
index 805f068..42cbb13 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
@@ -38,7 +38,6 @@ import com.ning.billing.meter.timeline.shutdown.StartTimes;
 import com.ning.billing.meter.timeline.shutdown.StartTimesBinder;
 import com.ning.billing.meter.timeline.shutdown.StartTimesMapper;
 import com.ning.billing.meter.timeline.sources.SourceIdAndMetricIdMapper;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.InternalTenantContextBinder;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java
index 115bdf0..b86c237 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java
@@ -31,6 +31,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
 import com.ning.billing.meter.timeline.samples.RepeatSample;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.meter.timeline.samples.ScalarSample;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java
index 3e5b31d..246ce96 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java
@@ -25,6 +25,8 @@ import org.testng.annotations.Test;
 
 import com.ning.billing.meter.MeterTestSuite;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.SampleProcessor;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.meter.timeline.samples.ScalarSample;
 import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
index 933eaa5..2aa9513 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
@@ -22,7 +22,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.ning.billing.meter.MeterTestSuite;
-import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer.TimeAggregationMode;
+import com.ning.billing.meter.api.TimeAggregationMode;
 import com.ning.billing.meter.timeline.samples.SampleOpcode;
 import com.ning.billing.util.clock.ClockMock;
 
@@ -35,7 +35,7 @@ public class TestAccumulatorSampleConsumer extends MeterTestSuite {
         clock.setTime(new DateTime(2012, 12, 1, 12, 40, DateTimeZone.UTC));
         final DateTime start = clock.getUTCNow();
 
-        final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS);
+        final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS, new CSVSampleProcessor());
 
         // 5 for day 1
         sampleConsumer.processOneSample(start, SampleOpcode.DOUBLE, (double) 1);
diff --git a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
index 1780d91..238af51 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
@@ -809,19 +809,18 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
     // METERING
     //
 
-    protected void recordMeteringUsage(final UUID bundleId, final String category, final String metric, final DateTime timestamp) throws IOException {
-        final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString() + "/" + category + "/" + metric;
-        final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true",
+    protected void recordMeteringUsage(final String source, final String category, final String metric, final DateTime timestamp) throws IOException {
+        final String meterURI = JaxrsResource.METER_PATH + "/" + source + "/" + category + "/" + metric;
+        final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_CATEGORY_AGGREGATE, "true",
                                                                                               JaxrsResource.QUERY_METER_TIMESTAMP, timestamp.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
         assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
     }
 
-    protected List<Map<String, Object>> getMeteringAggregateUsage(final UUID bundleId, final String category, final DateTime from, final DateTime to) throws IOException {
-        final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString();
+    protected List<Map<String, Object>> getMeteringAggregateUsage(final String source, final String category, final DateTime from, final DateTime to) throws IOException {
+        final String meterURI = JaxrsResource.METER_PATH + "/" + source;
         final Response meterResponse = doGet(meterURI, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_CATEGORY, category,
                                                                                        JaxrsResource.QUERY_METER_FROM, from.toString(),
-                                                                                       JaxrsResource.QUERY_METER_TO, to.toString(),
-                                                                                       JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true"), DEFAULT_HTTP_TIMEOUT_SEC);
+                                                                                       JaxrsResource.QUERY_METER_TO, to.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
         assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
 
         return mapper.readValue(meterResponse.getResponseBody(), new TypeReference<List<Map<String, Object>>>() {});
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
index 05e2759..0f88e9b 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -34,7 +33,7 @@ import com.google.common.collect.Ordering;
 
 public class TestMeter extends TestJaxrsBase {
 
-    private final UUID bundleId = UUID.randomUUID();
+    private final String source = UUID.randomUUID().toString();
     private final String category = "PageView";
     private final String visitor1 = "pierre";
     private final String visitor2 = "stephane";
@@ -56,10 +55,10 @@ public class TestMeter extends TestJaxrsBase {
         final DateTime end = dateTimeOrdering.max(visits);
 
         // Verify the visits recorded
-        final List<Map<String, Object>> meteringAggregateUsage = getMeteringAggregateUsage(bundleId, category, start, end);
+        final List<Map<String, Object>> meteringAggregateUsage = getMeteringAggregateUsage(source, category, start, end);
         final List<DateTime> visitsFound = new ArrayList<DateTime>();
         for (final Map<String, Object> oneUsage : meteringAggregateUsage) {
-            Assert.assertEquals(oneUsage.get("sourceName"), bundleId.toString());
+            Assert.assertEquals(oneUsage.get("sourceName"), source);
             Assert.assertEquals(oneUsage.get("eventCategory"), category);
             Assert.assertEquals(oneUsage.get("metric"), "__AGGREGATE__");
 
@@ -79,11 +78,11 @@ public class TestMeter extends TestJaxrsBase {
         final List<DateTime> visits = new ArrayList<DateTime>();
         for (int i = 0; i < nbVisits; i++) {
             DateTime visitDate = lastVisit.plusSeconds(i);
-            recordMeteringUsage(bundleId, category, visitor1, visitDate);
+            recordMeteringUsage(source, category, visitor1, visitDate);
             visits.add(visitDate);
 
             visitDate = visitDate.plusSeconds(1);
-            recordMeteringUsage(bundleId, category, visitor2, visitDate);
+            recordMeteringUsage(source, category, visitor2, visitDate);
             visits.add(visitDate);
 
             lastVisit = visitDate;