killbill-memoizeit

jaxrs: add endpoint for meter Add server test to record and

12/1/2012 4:29:15 AM

Details

diff --git a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
index 5869d67..3ee17a1 100644
--- a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
+++ b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
@@ -16,15 +16,25 @@
 
 package com.ning.billing.meter.api;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.TenantContext;
 
 public interface MeterUserApi {
 
+    public void getAggregateUsage(OutputStream outputStream, UUID bundleId, Collection<String> categories,
+                                  DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+    public void getUsage(OutputStream outputStream, UUID bundleId, Map<String, Collection<String>> metricsPerCategory,
+                         DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
     /**
      * Shortcut API to record a usage value of "1" for a given category and metric.
      *
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 7d0ec23..d2973cb 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -79,6 +79,13 @@ public interface JaxrsResource {
 
     public static final String QUERY_NOTIFICATION_CALLBACK = "cb";
 
+    public static final String QUERY_METER_WITH_AGGREGATE = "aggregate";
+    public static final String QUERY_METER_TIMESTAMP = "timestamp";
+    public static final String QUERY_METER_FROM = "from";
+    public static final String QUERY_METER_TO = "to";
+    public static final String QUERY_METER_CATEGORY = "category";
+    public static final String QUERY_METER_CATEGORY_AND_METRIC = "category_and_metric";
+
     public static final String ACCOUNTS = "accounts";
     public static final String ACCOUNTS_PATH = PREFIX + "/" + ACCOUNTS;
 
@@ -135,4 +142,6 @@ public interface JaxrsResource {
 
     public static final String CBA_REBALANCING = "cbaRebalancing";
 
+    public static final String METER = "meter";
+    public static final String METER_PATH = PREFIX + "/" + METER;
 }
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
new file mode 100644
index 0000000..9b81ac9
--- /dev/null
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.jaxrs.resources;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.joda.time.DateTime;
+
+import com.ning.billing.jaxrs.util.Context;
+import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
+import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.util.api.AuditUserApi;
+import com.ning.billing.util.api.CustomFieldUserApi;
+import com.ning.billing.util.api.TagUserApi;
+import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.clock.Clock;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+
+@Singleton
+@Path(JaxrsResource.METER_PATH)
+public class MeterResource extends JaxRsResourceBase {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final MeterUserApi meterApi;
+    private final Clock clock;
+
+    @Inject
+    public MeterResource(final MeterUserApi meterApi,
+                         final Clock clock,
+                         final JaxrsUriBuilder uriBuilder,
+                         final TagUserApi tagUserApi,
+                         final CustomFieldUserApi customFieldUserApi,
+                         final AuditUserApi auditUserApi,
+                         final Context context) {
+        super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, context);
+        this.meterApi = meterApi;
+        this.clock = clock;
+    }
+
+    @GET
+    @Path("/{bundleId:" + UUID_PATTERN + "}")
+    @Produces(APPLICATION_JSON)
+    public StreamingOutput getUsage(@PathParam("bundleId") final String bundleIdString,
+                                    @QueryParam(QUERY_METER_CATEGORY) final List<String> categories,
+                                    // Format: category,metric
+                                    @QueryParam(QUERY_METER_CATEGORY_AND_METRIC) final List<String> categoriesAndMetrics,
+                                    @QueryParam(QUERY_METER_FROM) final String fromTimestampString,
+                                    @QueryParam(QUERY_METER_TO) final String toTimestampString,
+                                    @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+                                    @javax.ws.rs.core.Context final HttpServletRequest request) {
+        final UUID bundleId = UUID.fromString(bundleIdString);
+        final DateTime fromTimestamp = DATE_TIME_FORMATTER.parseDateTime(fromTimestampString);
+        final DateTime toTimestamp = DATE_TIME_FORMATTER.parseDateTime(toTimestampString);
+        final TenantContext tenantContext = context.createContext(request);
+
+        return new StreamingOutput() {
+            @Override
+            public void write(final OutputStream output) throws IOException, WebApplicationException {
+                if (withAggregate) {
+                    meterApi.getAggregateUsage(output, bundleId, categories, fromTimestamp, toTimestamp, tenantContext);
+                } else {
+                    final Map<String, Collection<String>> metricsPerCategory = retrieveMetricsPerCategory(categoriesAndMetrics);
+                    meterApi.getUsage(output, bundleId, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+                }
+            }
+        };
+    }
+
+    private Map<String, Collection<String>> retrieveMetricsPerCategory(final List<String> categoriesAndMetrics) {
+        final Map<String, Collection<String>> metricsPerCategory = new HashMap<String, Collection<String>>();
+        for (final String categoryAndSampleKind : categoriesAndMetrics) {
+            final String[] categoryAndMetric = getCategoryAndMetricFromQueryParameter(categoryAndSampleKind);
+            if (metricsPerCategory.get(categoryAndMetric[0]) == null) {
+                metricsPerCategory.put(categoryAndMetric[0], new ArrayList<String>());
+            }
+
+            metricsPerCategory.get(categoryAndMetric[0]).add(categoryAndMetric[1]);
+        }
+
+        return metricsPerCategory;
+    }
+
+    private String[] getCategoryAndMetricFromQueryParameter(final String categoryAndMetric) {
+        final String[] parts = categoryAndMetric.split(",");
+        if (parts.length != 2) {
+            throw new WebApplicationException(Response.Status.BAD_REQUEST);
+        }
+        return parts;
+    }
+
+    @POST
+    @Path("/{bundleId:" + UUID_PATTERN + "}/{categoryName:" + STRING_PATTERN + "}/{metricName:" + STRING_PATTERN + "}")
+    @Consumes(APPLICATION_JSON)
+    @Produces(APPLICATION_JSON)
+    public Response recordUsage(@PathParam("bundleId") final String bundleIdString,
+                                @PathParam("categoryName") final String categoryName,
+                                @PathParam("metricName") final String metricName,
+                                @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+                                @QueryParam(QUERY_METER_TIMESTAMP) final String timestampString,
+                                @HeaderParam(HDR_CREATED_BY) final String createdBy,
+                                @HeaderParam(HDR_REASON) final String reason,
+                                @HeaderParam(HDR_COMMENT) final String comment,
+                                @javax.ws.rs.core.Context final HttpServletRequest request) {
+        final UUID bundleId = UUID.fromString(bundleIdString);
+        final CallContext callContext = context.createContext(createdBy, reason, comment, request);
+
+        final DateTime timestamp;
+        if (timestampString == null) {
+            timestamp = clock.getUTCNow();
+        } else {
+            timestamp = DATE_TIME_FORMATTER.parseDateTime(timestampString);
+        }
+
+        if (withAggregate) {
+            meterApi.incrementUsageAndAggregate(bundleId, categoryName, metricName, timestamp, callContext);
+        } else {
+            meterApi.incrementUsage(bundleId, categoryName, metricName, timestamp, callContext);
+        }
+
+        return Response.ok().build();
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
index 28024af..840d295 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
@@ -16,6 +16,9 @@
 
 package com.ning.billing.meter.api.user;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
@@ -26,27 +29,54 @@ import org.joda.time.DateTime;
 import com.ning.billing.ObjectType;
 import com.ning.billing.meter.api.MeterUserApi;
 import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+import com.ning.billing.util.callcontext.TenantContext;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 public class DefaultMeterUserApi implements MeterUserApi {
 
     private static final String AGGREGATE_METRIC_NAME = "__AGGREGATE__";
 
     private final TimelineEventHandler timelineEventHandler;
+    private final TimelineDao timelineDao;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultMeterUserApi(final TimelineEventHandler timelineEventHandler,
+                               final TimelineDao timelineDao,
                                final InternalCallContextFactory internalCallContextFactory) {
         this.timelineEventHandler = timelineEventHandler;
+        this.timelineDao = timelineDao;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
     @Override
+    public void getAggregateUsage(final OutputStream outputStream, final UUID bundleId, final Collection<String> categories,
+                                  final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
+        for (final String category : categories) {
+            metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
+        }
+        getUsage(outputStream, bundleId, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+    }
+
+    @Override
+    public void getUsage(final OutputStream outputStream, final UUID bundleId, final Map<String, Collection<String>> metricsPerCategory,
+                         final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+
+        final JsonSamplesOutputer outputerJson = new JsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
+        outputerJson.output(outputStream, ImmutableList.<UUID>of(bundleId), metricsPerCategory, fromTimestamp, toTimestamp);
+    }
+
+    @Override
     public void incrementUsage(final UUID bundleId, final String categoryName, final String metricName,
                                final DateTime timestamp, final CallContext context) {
         incrementUsage(bundleId,
@@ -59,8 +89,7 @@ public class DefaultMeterUserApi implements MeterUserApi {
     public void incrementUsageAndAggregate(final UUID bundleId, final String categoryName, final String metricName,
                                            final DateTime timestamp, final CallContext context) {
         incrementUsage(bundleId,
-                       ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1,
-                                                                                                                  AGGREGATE_METRIC_NAME, (short) 1)),
+                       ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1, AGGREGATE_METRIC_NAME, (short) 1)),
                        timestamp,
                        context);
     }
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
new file mode 100644
index 0000000..b589af5
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.codec.TimelineChunkDecoded;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleConsumer;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+import com.ning.billing.meter.timeline.filter.DecimatingSampleFilter;
+import com.ning.billing.meter.timeline.filter.DecimationMode;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Strings;
+
+public class JsonSamplesOutputer {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final TimelineEventHandler timelineEventHandler;
+    private final TimelineDao timelineDao;
+    private final SampleCoder sampleCoder;
+    private final InternalTenantContext context;
+
+    public JsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+        this.timelineEventHandler = timelineEventHandler;
+        this.timelineDao = timelineDao;
+        this.sampleCoder = new DefaultSampleCoder();
+        this.context = context;
+    }
+
+    public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
+                       final DateTime startTime, final DateTime endTime) throws IOException {
+        // Default - output all data points as CSV
+        output(output, bundleIds, metricsPerCategory, DecimationMode.PEAK_PICK, null, false, false, startTime, endTime);
+    }
+
+    public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
+                       final DecimationMode decimationMode, @Nullable final Integer outputCount, final boolean decodeSamples, final boolean compact,
+                       final DateTime startTime, final DateTime endTime) throws IOException {
+        // Retrieve the source and metric ids
+        final List<Integer> sourceIds = translateBundleIdsToSourceIds(bundleIds);
+        final List<Integer> metricIds = translateCategoriesAndMetricNamesToMetricIds(metricsPerCategory);
+
+        // Create the decimating filters, if needed
+        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
+
+        // Setup Jackson
+        final ObjectWriter writer;
+        if (compact) {
+            writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
+        } else {
+            writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
+        }
+        final JsonGenerator generator = objectMapper.getJsonFactory().createJsonGenerator(output);
+
+        generator.writeStartArray();
+
+        // First, return all data stored in the database
+        writeJsonForStoredChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+
+        // Now return all data in memory
+        writeJsonForInMemoryChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+
+        generator.writeEndArray();
+
+        generator.flush();
+        generator.close();
+    }
+
+    private List<Integer> translateBundleIdsToSourceIds(final List<UUID> bundleIds) {
+        final List<Integer> hostIds = new ArrayList<Integer>(bundleIds.size());
+        for (final UUID bundleId : bundleIds) {
+            hostIds.add(timelineDao.getSourceId(bundleId.toString(), context));
+        }
+
+        return hostIds;
+    }
+
+    private List<Integer> translateCategoriesAndMetricNamesToMetricIds(final Map<String, Collection<String>> metricsPerCategory) {
+        final List<Integer> metricIds = new ArrayList<Integer>(metricsPerCategory.keySet().size());
+        for (final String category : metricsPerCategory.keySet()) {
+            final Integer categoryId = timelineDao.getEventCategoryId(category, context);
+            if (categoryId == null) {
+                // Ignore
+                continue;
+            }
+
+            for (final String metricName : metricsPerCategory.get(category)) {
+                final Integer sampleKindId = timelineDao.getMetricId(categoryId, metricName, context);
+                if (sampleKindId == null) {
+                    // Ignore
+                    continue;
+                }
+
+                metricIds.add(sampleKindId);
+            }
+        }
+
+        return metricIds;
+    }
+
+    private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> hostIds, final List<Integer> sampleKindIds, final DecimationMode decimationMode,
+                                                                                             final DateTime startTime, final DateTime endTime, final Integer outputCount) {
+        final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
+        for (final Integer hostId : hostIds) {
+            filters.put(hostId, new HashMap<Integer, DecimatingSampleFilter>());
+            for (final Integer sampleKindId : sampleKindIds) {
+                filters.get(hostId).put(sampleKindId, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
+            }
+        }
+        return filters;
+    }
+
+    private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
+        final DecimatingSampleFilter rangeSampleProcessor;
+        if (outputCount == null) {
+            rangeSampleProcessor = null;
+        } else {
+            // TODO Fix the polling interval
+            rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleConsumer());
+        }
+
+        return rangeSampleProcessor;
+    }
+
+    private void writeJsonForStoredChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
+                                          final List<Integer> sampleKindIdsList, final DateTime startTime, final DateTime endTime, final boolean decodeSamples) throws IOException {
+        final AtomicReference<Integer> lastHostId = new AtomicReference<Integer>(null);
+        final AtomicReference<Integer> lastSampleKindId = new AtomicReference<Integer>(null);
+        final List<TimelineChunk> chunksForHostAndSampleKind = new ArrayList<TimelineChunk>();
+
+        timelineDao.getSamplesBySourceIdsAndMetricIds(hostIdsList, sampleKindIdsList, startTime, endTime, new TimelineChunkConsumer() {
+            @Override
+            public void processTimelineChunk(final TimelineChunk chunks) {
+                final Integer previousHostId = lastHostId.get();
+                final Integer previousSampleKindId = lastSampleKindId.get();
+                final Integer currentHostId = chunks.getSourceId();
+                final Integer currentSampleKindId = chunks.getMetricId();
+
+                chunksForHostAndSampleKind.add(chunks);
+                if (previousHostId != null && (!previousHostId.equals(currentHostId) || !previousSampleKindId.equals(currentSampleKindId))) {
+                    try {
+                        writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                    chunksForHostAndSampleKind.clear();
+                }
+
+                lastHostId.set(currentHostId);
+                lastSampleKindId.set(currentSampleKindId);
+            }
+        }, context);
+
+        if (chunksForHostAndSampleKind.size() > 0) {
+            writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+            chunksForHostAndSampleKind.clear();
+        }
+    }
+
+    private void writeJsonForInMemoryChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
+                                            final List<Integer> sampleKindIdsList, @Nullable final DateTime startTime, @Nullable final DateTime endTime, final boolean decodeSamples) throws IOException {
+
+        for (final Integer hostId : hostIdsList) {
+            final Collection<? extends TimelineChunk> inMemorySamples;
+            try {
+                inMemorySamples = timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindIdsList, startTime, endTime, context);
+            } catch (ExecutionException e) {
+                throw new IOException(e);
+            }
+            writeJsonForChunks(generator, writer, filters, inMemorySamples, decodeSamples);
+        }
+    }
+
+    private void writeJsonForChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters,
+                                    final Iterable<? extends TimelineChunk> chunksForHostAndSampleKind, final boolean decodeSamples) throws IOException {
+        for (final TimelineChunk chunk : chunksForHostAndSampleKind) {
+            if (decodeSamples) {
+                writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
+            } else {
+                final String hostName = timelineDao.getSource(chunk.getSourceId(), context);
+                final CategoryRecordIdAndMetric categoryIdAndSampleKind = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+                final String eventCategory = timelineDao.getEventCategory(categoryIdAndSampleKind.getEventCategoryId(), context);
+                final String sampleKind = categoryIdAndSampleKind.getMetric();
+                // TODO pass compact form
+                final DecimatingSampleFilter filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
+                // TODO CSV only for now
+                final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
+
+                // Don't write out empty samples
+                if (!Strings.isNullOrEmpty(samples)) {
+                    generator.writeObject(new SamplesForMetricAndSource(hostName, eventCategory, sampleKind, samples));
+                }
+            }
+        }
+    }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java b/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java
index da8feab..a9fd403 100644
--- a/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java
+++ b/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java
@@ -18,7 +18,7 @@ package com.ning.billing.meter.glue;
 
 import javax.inject.Provider;
 
-import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
 
 import com.ning.billing.meter.timeline.persistent.CachingTimelineDao;
 import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
@@ -29,11 +29,11 @@ import com.google.inject.Inject;
 
 public class CachingDefaultTimelineDaoProvider implements Provider<TimelineDao> {
 
-    private final DBI dbi;
+    private final IDBI dbi;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
-    public CachingDefaultTimelineDaoProvider(final DBI dbi, final InternalCallContextFactory internalCallContextFactory) {
+    public CachingDefaultTimelineDaoProvider(final IDBI dbi, final InternalCallContextFactory internalCallContextFactory) {
         this.dbi = dbi;
         this.internalCallContextFactory = internalCallContextFactory;
     }
diff --git a/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java b/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
index 5eef3b9..20c725f 100644
--- a/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
+++ b/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
@@ -22,6 +22,8 @@ import org.skife.config.ConfigSource;
 import org.skife.config.ConfigurationObjectFactory;
 import org.skife.config.SimplePropertyConfigSource;
 
+import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.user.DefaultMeterUserApi;
 import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
 import com.ning.billing.meter.timeline.codec.SampleCoder;
 import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
@@ -71,9 +73,12 @@ public class MeterModule extends AbstractModule {
         }
     }
 
+    protected void installMeterUserApi() {
+        bind(MeterUserApi.class).to(DefaultMeterUserApi.class).asEagerSingleton();
+    }
+
     @Override
     protected void configure() {
-
         final MeterConfig config = installConfig();
 
         configureFileBackedBuffer(config);
@@ -84,5 +89,7 @@ public class MeterModule extends AbstractModule {
         //configureTimelineAggregator();
         //configureBackgroundDBChunkWriter();
         //configureReplayer();
+
+        installMeterUserApi();
     }
 }
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
index 3ccc68b..fcc61ab 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
@@ -53,7 +53,7 @@ insert into sources (
 
 getCategoryRecordId() ::= <<
 select
-  category_id
+  record_id
 from categories
 where category = :category
 <AND_CHECK_TENANT()>
diff --git a/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java b/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
index 188644c..a1b3303 100644
--- a/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
+++ b/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
@@ -29,6 +29,7 @@ import com.ning.billing.jaxrs.resources.AccountResource;
 import com.ning.billing.jaxrs.resources.BundleResource;
 import com.ning.billing.jaxrs.resources.CatalogResource;
 import com.ning.billing.jaxrs.resources.InvoiceResource;
+import com.ning.billing.jaxrs.resources.MeterResource;
 import com.ning.billing.jaxrs.resources.PaymentMethodResource;
 import com.ning.billing.jaxrs.resources.PaymentResource;
 import com.ning.billing.jaxrs.resources.RefundResource;
@@ -37,6 +38,7 @@ import com.ning.billing.jaxrs.resources.TagResource;
 import com.ning.billing.jaxrs.resources.TenantResource;
 import com.ning.billing.jaxrs.util.KillbillEventHandler;
 import com.ning.billing.junction.glue.DefaultJunctionModule;
+import com.ning.billing.meter.glue.MeterModule;
 import com.ning.billing.overdue.glue.DefaultOverdueModule;
 import com.ning.billing.payment.glue.PaymentModule;
 import com.ning.billing.server.DefaultServerService;
@@ -53,7 +55,6 @@ import com.ning.billing.util.glue.CustomFieldModule;
 import com.ning.billing.util.glue.ExportModule;
 import com.ning.billing.util.glue.GlobalLockerModule;
 import com.ning.billing.util.glue.NotificationQueueModule;
-import com.ning.billing.util.glue.TagStoreModule;
 
 import com.google.inject.AbstractModule;
 
@@ -93,6 +94,7 @@ public class KillbillServerModule extends AbstractModule {
         bind(PaymentResource.class).asEagerSingleton();
         bind(RefundResource.class).asEagerSingleton();
         bind(TenantResource.class).asEagerSingleton();
+        bind(MeterResource.class).asEagerSingleton();
         bind(KillbillEventHandler.class).asEagerSingleton();
     }
 
@@ -120,6 +122,7 @@ public class KillbillServerModule extends AbstractModule {
         install(new DefaultOverdueModule());
         install(new TenantModule());
         install(new ExportModule());
+        install(new MeterModule());
         installClock();
     }
 }
diff --git a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
index 9e3fb83..1780d91 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
@@ -16,13 +16,6 @@
 
 package com.ning.billing.jaxrs;
 
-import static com.ning.billing.jaxrs.resources.JaxrsResource.ACCOUNTS;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.BUNDLES;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.QUERY_PAYMENT_METHOD_PLUGIN_INFO;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.SUBSCRIPTIONS;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -83,6 +76,13 @@ import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableMap;
 
+import static com.ning.billing.jaxrs.resources.JaxrsResource.ACCOUNTS;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.BUNDLES;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.QUERY_PAYMENT_METHOD_PLUGIN_INFO;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.SUBSCRIPTIONS;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
 public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
 
     protected static final String PLUGIN_NAME = "noop";
@@ -149,11 +149,10 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
         return response.getHeader("Location");
     }
 
-
-    protected String registerCallbackNotificationForTenant(final String callback)  throws Exception {
+    protected String registerCallbackNotificationForTenant(final String callback) throws Exception {
         final Map<String, String> queryParams = new HashMap<String, String>();
         queryParams.put(JaxrsResource.QUERY_NOTIFICATION_CALLBACK, callback);
-        final String uri = JaxrsResource.TENANTS_PATH + "/" + JaxrsResource.REGISTER_NOTIFICATION_CALLBACK ;
+        final String uri = JaxrsResource.TENANTS_PATH + "/" + JaxrsResource.REGISTER_NOTIFICATION_CALLBACK;
         final Response response = doPost(uri, null, queryParams, DEFAULT_HTTP_TIMEOUT_SEC);
         Assert.assertEquals(response.getStatusCode(), Status.CREATED.getStatusCode());
         return response.getHeader("Location");
@@ -807,6 +806,28 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
     }
 
     //
+    // METERING
+    //
+
+    protected void recordMeteringUsage(final UUID bundleId, final String category, final String metric, final DateTime timestamp) throws IOException {
+        final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString() + "/" + category + "/" + metric;
+        final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true",
+                                                                                              JaxrsResource.QUERY_METER_TIMESTAMP, timestamp.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
+        assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
+    }
+
+    protected List<Map<String, Object>> getMeteringAggregateUsage(final UUID bundleId, final String category, final DateTime from, final DateTime to) throws IOException {
+        final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString();
+        final Response meterResponse = doGet(meterURI, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_CATEGORY, category,
+                                                                                       JaxrsResource.QUERY_METER_FROM, from.toString(),
+                                                                                       JaxrsResource.QUERY_METER_TO, to.toString(),
+                                                                                       JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true"), DEFAULT_HTTP_TIMEOUT_SEC);
+        assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
+
+        return mapper.readValue(meterResponse.getResponseBody(), new TypeReference<List<Map<String, Object>>>() {});
+    }
+
+    //
     // HTTP CLIENT HELPERS
     //
     protected Response doPost(final String uri, @Nullable final String body, final Map<String, String> queryParams, final int timeoutSec) {
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
index 8d7fcab..a69d0df 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
@@ -47,6 +47,7 @@ import com.ning.billing.invoice.api.InvoiceNotifier;
 import com.ning.billing.invoice.glue.DefaultInvoiceModule;
 import com.ning.billing.invoice.notification.NullInvoiceNotifier;
 import com.ning.billing.junction.glue.DefaultJunctionModule;
+import com.ning.billing.meter.glue.MeterModule;
 import com.ning.billing.overdue.glue.DefaultOverdueModule;
 import com.ning.billing.payment.glue.PaymentModule;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
@@ -184,6 +185,7 @@ public class TestJaxrsBase extends KillbillClient {
             install(new DefaultOverdueModule());
             install(new TenantModule());
             install(new ExportModule());
+            install(new MeterModule());
             installClock();
         }
 
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
new file mode 100644
index 0000000..e7fd51f
--- /dev/null
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.jaxrs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+public class TestMeter extends TestJaxrsBase {
+
+    private final Random rand = new Random();
+    private final UUID bundleId = UUID.randomUUID();
+    private final String category = "PageView";
+    private final String visitor1 = "pierre";
+    private final String visitor2 = "stephane";
+    private final int nbVisits = 20;
+
+    private final Ordering<DateTime> dateTimeOrdering = new Ordering<DateTime>() {
+
+        @Override
+        public int compare(final DateTime left, final DateTime right) {
+            return left.compareTo(right);
+        }
+    };
+
+    @Test(groups = "slow")
+    public void testRecordPageViews() throws Exception {
+        // Record a bunch of random visits by two visitors
+        final DateTime start = clock.getUTCNow();
+        final List<DateTime> visits = generatePageViews(nbVisits, start);
+        final DateTime end = dateTimeOrdering.max(visits);
+
+        // Verify the visits recorded
+        final List<Map<String, Object>> meteringAggregateUsage = getMeteringAggregateUsage(bundleId, category, start, end);
+        final List<DateTime> visitsFound = new ArrayList<DateTime>();
+        for (final Map<String, Object> oneUsage : meteringAggregateUsage) {
+            Assert.assertEquals(oneUsage.get("sourceName"), bundleId.toString());
+            Assert.assertEquals(oneUsage.get("eventCategory"), category);
+            Assert.assertEquals(oneUsage.get("metric"), "__AGGREGATE__");
+
+            // Retrieve the timestamps
+            final ImmutableList<String> samples = ImmutableList.<String>copyOf(Splitter.on(",").split((String) oneUsage.get("samples")));
+            for (int i = 0; i < samples.size(); i++) {
+                visitsFound.add(new DateTime(Long.valueOf(samples.get(i)) * 1000, DateTimeZone.UTC));
+                i++;
+            }
+        }
+
+        Assert.assertEquals(dateTimeOrdering.immutableSortedCopy(visitsFound), dateTimeOrdering.immutableSortedCopy(visits));
+    }
+
+    private List<DateTime> generatePageViews(final int nbVisits, final DateTime start) throws IOException {
+        DateTime lastVisit = start;
+        final List<DateTime> visits = new ArrayList<DateTime>();
+        for (int i = 0; i < nbVisits; i++) {
+            DateTime visitDate = lastVisit.plusSeconds(i);
+            recordMeteringUsage(bundleId, category, visitor1, visitDate);
+            visits.add(visitDate);
+
+            visitDate = visitDate.plusSeconds(1);
+            recordMeteringUsage(bundleId, category, visitor2, visitDate);
+            visits.add(visitDate);
+
+            lastVisit = visitDate;
+        }
+
+        return visits;
+    }
+}