Details
diff --git a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
index 5869d67..3ee17a1 100644
--- a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
+++ b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
@@ -16,15 +16,25 @@
package com.ning.billing.meter.api;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.joda.time.DateTime;
import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.TenantContext;
public interface MeterUserApi {
+ public void getAggregateUsage(OutputStream outputStream, UUID bundleId, Collection<String> categories,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ public void getUsage(OutputStream outputStream, UUID bundleId, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
/**
* Shortcut API to record a usage value of "1" for a given category and metric.
*
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 7d0ec23..d2973cb 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -79,6 +79,13 @@ public interface JaxrsResource {
public static final String QUERY_NOTIFICATION_CALLBACK = "cb";
+ public static final String QUERY_METER_WITH_AGGREGATE = "aggregate";
+ public static final String QUERY_METER_TIMESTAMP = "timestamp";
+ public static final String QUERY_METER_FROM = "from";
+ public static final String QUERY_METER_TO = "to";
+ public static final String QUERY_METER_CATEGORY = "category";
+ public static final String QUERY_METER_CATEGORY_AND_METRIC = "category_and_metric";
+
public static final String ACCOUNTS = "accounts";
public static final String ACCOUNTS_PATH = PREFIX + "/" + ACCOUNTS;
@@ -135,4 +142,6 @@ public interface JaxrsResource {
public static final String CBA_REBALANCING = "cbaRebalancing";
+ public static final String METER = "meter";
+ public static final String METER_PATH = PREFIX + "/" + METER;
}
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
new file mode 100644
index 0000000..9b81ac9
--- /dev/null
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.jaxrs.resources;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.joda.time.DateTime;
+
+import com.ning.billing.jaxrs.util.Context;
+import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
+import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.util.api.AuditUserApi;
+import com.ning.billing.util.api.CustomFieldUserApi;
+import com.ning.billing.util.api.TagUserApi;
+import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.clock.Clock;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+
+@Singleton
+@Path(JaxrsResource.METER_PATH)
+public class MeterResource extends JaxRsResourceBase {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final MeterUserApi meterApi;
+ private final Clock clock;
+
+ @Inject
+ public MeterResource(final MeterUserApi meterApi,
+ final Clock clock,
+ final JaxrsUriBuilder uriBuilder,
+ final TagUserApi tagUserApi,
+ final CustomFieldUserApi customFieldUserApi,
+ final AuditUserApi auditUserApi,
+ final Context context) {
+ super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, context);
+ this.meterApi = meterApi;
+ this.clock = clock;
+ }
+
+ @GET
+ @Path("/{bundleId:" + UUID_PATTERN + "}")
+ @Produces(APPLICATION_JSON)
+ public StreamingOutput getUsage(@PathParam("bundleId") final String bundleIdString,
+ @QueryParam(QUERY_METER_CATEGORY) final List<String> categories,
+ // Format: category,metric
+ @QueryParam(QUERY_METER_CATEGORY_AND_METRIC) final List<String> categoriesAndMetrics,
+ @QueryParam(QUERY_METER_FROM) final String fromTimestampString,
+ @QueryParam(QUERY_METER_TO) final String toTimestampString,
+ @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+ @javax.ws.rs.core.Context final HttpServletRequest request) {
+ final UUID bundleId = UUID.fromString(bundleIdString);
+ final DateTime fromTimestamp = DATE_TIME_FORMATTER.parseDateTime(fromTimestampString);
+ final DateTime toTimestamp = DATE_TIME_FORMATTER.parseDateTime(toTimestampString);
+ final TenantContext tenantContext = context.createContext(request);
+
+ return new StreamingOutput() {
+ @Override
+ public void write(final OutputStream output) throws IOException, WebApplicationException {
+ if (withAggregate) {
+ meterApi.getAggregateUsage(output, bundleId, categories, fromTimestamp, toTimestamp, tenantContext);
+ } else {
+ final Map<String, Collection<String>> metricsPerCategory = retrieveMetricsPerCategory(categoriesAndMetrics);
+ meterApi.getUsage(output, bundleId, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ }
+ }
+ };
+ }
+
+ private Map<String, Collection<String>> retrieveMetricsPerCategory(final List<String> categoriesAndMetrics) {
+ final Map<String, Collection<String>> metricsPerCategory = new HashMap<String, Collection<String>>();
+ for (final String categoryAndSampleKind : categoriesAndMetrics) {
+ final String[] categoryAndMetric = getCategoryAndMetricFromQueryParameter(categoryAndSampleKind);
+ if (metricsPerCategory.get(categoryAndMetric[0]) == null) {
+ metricsPerCategory.put(categoryAndMetric[0], new ArrayList<String>());
+ }
+
+ metricsPerCategory.get(categoryAndMetric[0]).add(categoryAndMetric[1]);
+ }
+
+ return metricsPerCategory;
+ }
+
+ private String[] getCategoryAndMetricFromQueryParameter(final String categoryAndMetric) {
+ final String[] parts = categoryAndMetric.split(",");
+ if (parts.length != 2) {
+ throw new WebApplicationException(Response.Status.BAD_REQUEST);
+ }
+ return parts;
+ }
+
+ @POST
+ @Path("/{bundleId:" + UUID_PATTERN + "}/{categoryName:" + STRING_PATTERN + "}/{metricName:" + STRING_PATTERN + "}")
+ @Consumes(APPLICATION_JSON)
+ @Produces(APPLICATION_JSON)
+ public Response recordUsage(@PathParam("bundleId") final String bundleIdString,
+ @PathParam("categoryName") final String categoryName,
+ @PathParam("metricName") final String metricName,
+ @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+ @QueryParam(QUERY_METER_TIMESTAMP) final String timestampString,
+ @HeaderParam(HDR_CREATED_BY) final String createdBy,
+ @HeaderParam(HDR_REASON) final String reason,
+ @HeaderParam(HDR_COMMENT) final String comment,
+ @javax.ws.rs.core.Context final HttpServletRequest request) {
+ final UUID bundleId = UUID.fromString(bundleIdString);
+ final CallContext callContext = context.createContext(createdBy, reason, comment, request);
+
+ final DateTime timestamp;
+ if (timestampString == null) {
+ timestamp = clock.getUTCNow();
+ } else {
+ timestamp = DATE_TIME_FORMATTER.parseDateTime(timestampString);
+ }
+
+ if (withAggregate) {
+ meterApi.incrementUsageAndAggregate(bundleId, categoryName, metricName, timestamp, callContext);
+ } else {
+ meterApi.incrementUsage(bundleId, categoryName, metricName, timestamp, callContext);
+ }
+
+ return Response.ok().build();
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
index 28024af..840d295 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
@@ -16,6 +16,9 @@
package com.ning.billing.meter.api.user;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
import java.util.Map;
import java.util.UUID;
@@ -26,27 +29,54 @@ import org.joda.time.DateTime;
import com.ning.billing.ObjectType;
import com.ning.billing.meter.api.MeterUserApi;
import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+import com.ning.billing.util.callcontext.TenantContext;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
public class DefaultMeterUserApi implements MeterUserApi {
private static final String AGGREGATE_METRIC_NAME = "__AGGREGATE__";
private final TimelineEventHandler timelineEventHandler;
+ private final TimelineDao timelineDao;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultMeterUserApi(final TimelineEventHandler timelineEventHandler,
+ final TimelineDao timelineDao,
final InternalCallContextFactory internalCallContextFactory) {
this.timelineEventHandler = timelineEventHandler;
+ this.timelineDao = timelineDao;
this.internalCallContextFactory = internalCallContextFactory;
}
@Override
+ public void getAggregateUsage(final OutputStream outputStream, final UUID bundleId, final Collection<String> categories,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
+ for (final String category : categories) {
+ metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
+ }
+ getUsage(outputStream, bundleId, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+ }
+
+ @Override
+ public void getUsage(final OutputStream outputStream, final UUID bundleId, final Map<String, Collection<String>> metricsPerCategory,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+
+ final JsonSamplesOutputer outputerJson = new JsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
+ outputerJson.output(outputStream, ImmutableList.<UUID>of(bundleId), metricsPerCategory, fromTimestamp, toTimestamp);
+ }
+
+ @Override
public void incrementUsage(final UUID bundleId, final String categoryName, final String metricName,
final DateTime timestamp, final CallContext context) {
incrementUsage(bundleId,
@@ -59,8 +89,7 @@ public class DefaultMeterUserApi implements MeterUserApi {
public void incrementUsageAndAggregate(final UUID bundleId, final String categoryName, final String metricName,
final DateTime timestamp, final CallContext context) {
incrementUsage(bundleId,
- ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1,
- AGGREGATE_METRIC_NAME, (short) 1)),
+ ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1, AGGREGATE_METRIC_NAME, (short) 1)),
timestamp,
context);
}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
new file mode 100644
index 0000000..b589af5
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.codec.TimelineChunkDecoded;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleConsumer;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+import com.ning.billing.meter.timeline.filter.DecimatingSampleFilter;
+import com.ning.billing.meter.timeline.filter.DecimationMode;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Strings;
+
+public class JsonSamplesOutputer {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final TimelineEventHandler timelineEventHandler;
+ private final TimelineDao timelineDao;
+ private final SampleCoder sampleCoder;
+ private final InternalTenantContext context;
+
+ public JsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+ this.timelineEventHandler = timelineEventHandler;
+ this.timelineDao = timelineDao;
+ this.sampleCoder = new DefaultSampleCoder();
+ this.context = context;
+ }
+
+ public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
+ final DateTime startTime, final DateTime endTime) throws IOException {
+ // Default - output all data points as CSV
+ output(output, bundleIds, metricsPerCategory, DecimationMode.PEAK_PICK, null, false, false, startTime, endTime);
+ }
+
+ public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
+ final DecimationMode decimationMode, @Nullable final Integer outputCount, final boolean decodeSamples, final boolean compact,
+ final DateTime startTime, final DateTime endTime) throws IOException {
+ // Retrieve the source and metric ids
+ final List<Integer> sourceIds = translateBundleIdsToSourceIds(bundleIds);
+ final List<Integer> metricIds = translateCategoriesAndMetricNamesToMetricIds(metricsPerCategory);
+
+ // Create the decimating filters, if needed
+ final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
+
+ // Setup Jackson
+ final ObjectWriter writer;
+ if (compact) {
+ writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
+ } else {
+ writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
+ }
+ final JsonGenerator generator = objectMapper.getJsonFactory().createJsonGenerator(output);
+
+ generator.writeStartArray();
+
+ // First, return all data stored in the database
+ writeJsonForStoredChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+
+ // Now return all data in memory
+ writeJsonForInMemoryChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+
+ generator.writeEndArray();
+
+ generator.flush();
+ generator.close();
+ }
+
+ private List<Integer> translateBundleIdsToSourceIds(final List<UUID> bundleIds) {
+ final List<Integer> hostIds = new ArrayList<Integer>(bundleIds.size());
+ for (final UUID bundleId : bundleIds) {
+ hostIds.add(timelineDao.getSourceId(bundleId.toString(), context));
+ }
+
+ return hostIds;
+ }
+
+ private List<Integer> translateCategoriesAndMetricNamesToMetricIds(final Map<String, Collection<String>> metricsPerCategory) {
+ final List<Integer> metricIds = new ArrayList<Integer>(metricsPerCategory.keySet().size());
+ for (final String category : metricsPerCategory.keySet()) {
+ final Integer categoryId = timelineDao.getEventCategoryId(category, context);
+ if (categoryId == null) {
+ // Ignore
+ continue;
+ }
+
+ for (final String metricName : metricsPerCategory.get(category)) {
+ final Integer sampleKindId = timelineDao.getMetricId(categoryId, metricName, context);
+ if (sampleKindId == null) {
+ // Ignore
+ continue;
+ }
+
+ metricIds.add(sampleKindId);
+ }
+ }
+
+ return metricIds;
+ }
+
+ private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> hostIds, final List<Integer> sampleKindIds, final DecimationMode decimationMode,
+ final DateTime startTime, final DateTime endTime, final Integer outputCount) {
+ final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
+ for (final Integer hostId : hostIds) {
+ filters.put(hostId, new HashMap<Integer, DecimatingSampleFilter>());
+ for (final Integer sampleKindId : sampleKindIds) {
+ filters.get(hostId).put(sampleKindId, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
+ }
+ }
+ return filters;
+ }
+
+ private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
+ final DecimatingSampleFilter rangeSampleProcessor;
+ if (outputCount == null) {
+ rangeSampleProcessor = null;
+ } else {
+ // TODO Fix the polling interval
+ rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleConsumer());
+ }
+
+ return rangeSampleProcessor;
+ }
+
+ private void writeJsonForStoredChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
+ final List<Integer> sampleKindIdsList, final DateTime startTime, final DateTime endTime, final boolean decodeSamples) throws IOException {
+ final AtomicReference<Integer> lastHostId = new AtomicReference<Integer>(null);
+ final AtomicReference<Integer> lastSampleKindId = new AtomicReference<Integer>(null);
+ final List<TimelineChunk> chunksForHostAndSampleKind = new ArrayList<TimelineChunk>();
+
+ timelineDao.getSamplesBySourceIdsAndMetricIds(hostIdsList, sampleKindIdsList, startTime, endTime, new TimelineChunkConsumer() {
+ @Override
+ public void processTimelineChunk(final TimelineChunk chunks) {
+ final Integer previousHostId = lastHostId.get();
+ final Integer previousSampleKindId = lastSampleKindId.get();
+ final Integer currentHostId = chunks.getSourceId();
+ final Integer currentSampleKindId = chunks.getMetricId();
+
+ chunksForHostAndSampleKind.add(chunks);
+ if (previousHostId != null && (!previousHostId.equals(currentHostId) || !previousSampleKindId.equals(currentSampleKindId))) {
+ try {
+ writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ chunksForHostAndSampleKind.clear();
+ }
+
+ lastHostId.set(currentHostId);
+ lastSampleKindId.set(currentSampleKindId);
+ }
+ }, context);
+
+ if (chunksForHostAndSampleKind.size() > 0) {
+ writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+ chunksForHostAndSampleKind.clear();
+ }
+ }
+
+ private void writeJsonForInMemoryChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
+ final List<Integer> sampleKindIdsList, @Nullable final DateTime startTime, @Nullable final DateTime endTime, final boolean decodeSamples) throws IOException {
+
+ for (final Integer hostId : hostIdsList) {
+ final Collection<? extends TimelineChunk> inMemorySamples;
+ try {
+ inMemorySamples = timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindIdsList, startTime, endTime, context);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ writeJsonForChunks(generator, writer, filters, inMemorySamples, decodeSamples);
+ }
+ }
+
+ private void writeJsonForChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters,
+ final Iterable<? extends TimelineChunk> chunksForHostAndSampleKind, final boolean decodeSamples) throws IOException {
+ for (final TimelineChunk chunk : chunksForHostAndSampleKind) {
+ if (decodeSamples) {
+ writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
+ } else {
+ final String hostName = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndSampleKind = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndSampleKind.getEventCategoryId(), context);
+ final String sampleKind = categoryIdAndSampleKind.getMetric();
+ // TODO pass compact form
+ final DecimatingSampleFilter filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
+ // TODO CSV only for now
+ final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(hostName, eventCategory, sampleKind, samples));
+ }
+ }
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java b/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java
index da8feab..a9fd403 100644
--- a/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java
+++ b/meter/src/main/java/com/ning/billing/meter/glue/CachingDefaultTimelineDaoProvider.java
@@ -18,7 +18,7 @@ package com.ning.billing.meter.glue;
import javax.inject.Provider;
-import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
import com.ning.billing.meter.timeline.persistent.CachingTimelineDao;
import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
@@ -29,11 +29,11 @@ import com.google.inject.Inject;
public class CachingDefaultTimelineDaoProvider implements Provider<TimelineDao> {
- private final DBI dbi;
+ private final IDBI dbi;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
- public CachingDefaultTimelineDaoProvider(final DBI dbi, final InternalCallContextFactory internalCallContextFactory) {
+ public CachingDefaultTimelineDaoProvider(final IDBI dbi, final InternalCallContextFactory internalCallContextFactory) {
this.dbi = dbi;
this.internalCallContextFactory = internalCallContextFactory;
}
diff --git a/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java b/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
index 5eef3b9..20c725f 100644
--- a/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
+++ b/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
@@ -22,6 +22,8 @@ import org.skife.config.ConfigSource;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.config.SimplePropertyConfigSource;
+import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.user.DefaultMeterUserApi;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
@@ -71,9 +73,12 @@ public class MeterModule extends AbstractModule {
}
}
+ protected void installMeterUserApi() {
+ bind(MeterUserApi.class).to(DefaultMeterUserApi.class).asEagerSingleton();
+ }
+
@Override
protected void configure() {
-
final MeterConfig config = installConfig();
configureFileBackedBuffer(config);
@@ -84,5 +89,7 @@ public class MeterModule extends AbstractModule {
//configureTimelineAggregator();
//configureBackgroundDBChunkWriter();
//configureReplayer();
+
+ installMeterUserApi();
}
}
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
index 3ccc68b..fcc61ab 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
@@ -53,7 +53,7 @@ insert into sources (
getCategoryRecordId() ::= <<
select
- category_id
+ record_id
from categories
where category = :category
<AND_CHECK_TENANT()>
diff --git a/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java b/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
index 188644c..a1b3303 100644
--- a/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
+++ b/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
@@ -29,6 +29,7 @@ import com.ning.billing.jaxrs.resources.AccountResource;
import com.ning.billing.jaxrs.resources.BundleResource;
import com.ning.billing.jaxrs.resources.CatalogResource;
import com.ning.billing.jaxrs.resources.InvoiceResource;
+import com.ning.billing.jaxrs.resources.MeterResource;
import com.ning.billing.jaxrs.resources.PaymentMethodResource;
import com.ning.billing.jaxrs.resources.PaymentResource;
import com.ning.billing.jaxrs.resources.RefundResource;
@@ -37,6 +38,7 @@ import com.ning.billing.jaxrs.resources.TagResource;
import com.ning.billing.jaxrs.resources.TenantResource;
import com.ning.billing.jaxrs.util.KillbillEventHandler;
import com.ning.billing.junction.glue.DefaultJunctionModule;
+import com.ning.billing.meter.glue.MeterModule;
import com.ning.billing.overdue.glue.DefaultOverdueModule;
import com.ning.billing.payment.glue.PaymentModule;
import com.ning.billing.server.DefaultServerService;
@@ -53,7 +55,6 @@ import com.ning.billing.util.glue.CustomFieldModule;
import com.ning.billing.util.glue.ExportModule;
import com.ning.billing.util.glue.GlobalLockerModule;
import com.ning.billing.util.glue.NotificationQueueModule;
-import com.ning.billing.util.glue.TagStoreModule;
import com.google.inject.AbstractModule;
@@ -93,6 +94,7 @@ public class KillbillServerModule extends AbstractModule {
bind(PaymentResource.class).asEagerSingleton();
bind(RefundResource.class).asEagerSingleton();
bind(TenantResource.class).asEagerSingleton();
+ bind(MeterResource.class).asEagerSingleton();
bind(KillbillEventHandler.class).asEagerSingleton();
}
@@ -120,6 +122,7 @@ public class KillbillServerModule extends AbstractModule {
install(new DefaultOverdueModule());
install(new TenantModule());
install(new ExportModule());
+ install(new MeterModule());
installClock();
}
}
diff --git a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
index 9e3fb83..1780d91 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
@@ -16,13 +16,6 @@
package com.ning.billing.jaxrs;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.ACCOUNTS;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.BUNDLES;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.QUERY_PAYMENT_METHOD_PLUGIN_INFO;
-import static com.ning.billing.jaxrs.resources.JaxrsResource.SUBSCRIPTIONS;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -83,6 +76,13 @@ import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.ACCOUNTS;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.BUNDLES;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.QUERY_PAYMENT_METHOD_PLUGIN_INFO;
+import static com.ning.billing.jaxrs.resources.JaxrsResource.SUBSCRIPTIONS;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
protected static final String PLUGIN_NAME = "noop";
@@ -149,11 +149,10 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
return response.getHeader("Location");
}
-
- protected String registerCallbackNotificationForTenant(final String callback) throws Exception {
+ protected String registerCallbackNotificationForTenant(final String callback) throws Exception {
final Map<String, String> queryParams = new HashMap<String, String>();
queryParams.put(JaxrsResource.QUERY_NOTIFICATION_CALLBACK, callback);
- final String uri = JaxrsResource.TENANTS_PATH + "/" + JaxrsResource.REGISTER_NOTIFICATION_CALLBACK ;
+ final String uri = JaxrsResource.TENANTS_PATH + "/" + JaxrsResource.REGISTER_NOTIFICATION_CALLBACK;
final Response response = doPost(uri, null, queryParams, DEFAULT_HTTP_TIMEOUT_SEC);
Assert.assertEquals(response.getStatusCode(), Status.CREATED.getStatusCode());
return response.getHeader("Location");
@@ -807,6 +806,28 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
}
//
+ // METERING
+ //
+
+ protected void recordMeteringUsage(final UUID bundleId, final String category, final String metric, final DateTime timestamp) throws IOException {
+ final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString() + "/" + category + "/" + metric;
+ final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true",
+ JaxrsResource.QUERY_METER_TIMESTAMP, timestamp.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
+ assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
+ }
+
+ protected List<Map<String, Object>> getMeteringAggregateUsage(final UUID bundleId, final String category, final DateTime from, final DateTime to) throws IOException {
+ final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString();
+ final Response meterResponse = doGet(meterURI, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_CATEGORY, category,
+ JaxrsResource.QUERY_METER_FROM, from.toString(),
+ JaxrsResource.QUERY_METER_TO, to.toString(),
+ JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true"), DEFAULT_HTTP_TIMEOUT_SEC);
+ assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
+
+ return mapper.readValue(meterResponse.getResponseBody(), new TypeReference<List<Map<String, Object>>>() {});
+ }
+
+ //
// HTTP CLIENT HELPERS
//
protected Response doPost(final String uri, @Nullable final String body, final Map<String, String> queryParams, final int timeoutSec) {
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
index 8d7fcab..a69d0df 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
@@ -47,6 +47,7 @@ import com.ning.billing.invoice.api.InvoiceNotifier;
import com.ning.billing.invoice.glue.DefaultInvoiceModule;
import com.ning.billing.invoice.notification.NullInvoiceNotifier;
import com.ning.billing.junction.glue.DefaultJunctionModule;
+import com.ning.billing.meter.glue.MeterModule;
import com.ning.billing.overdue.glue.DefaultOverdueModule;
import com.ning.billing.payment.glue.PaymentModule;
import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
@@ -184,6 +185,7 @@ public class TestJaxrsBase extends KillbillClient {
install(new DefaultOverdueModule());
install(new TenantModule());
install(new ExportModule());
+ install(new MeterModule());
installClock();
}
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
new file mode 100644
index 0000000..e7fd51f
--- /dev/null
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.jaxrs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+public class TestMeter extends TestJaxrsBase {
+
+ private final Random rand = new Random();
+ private final UUID bundleId = UUID.randomUUID();
+ private final String category = "PageView";
+ private final String visitor1 = "pierre";
+ private final String visitor2 = "stephane";
+ private final int nbVisits = 20;
+
+ private final Ordering<DateTime> dateTimeOrdering = new Ordering<DateTime>() {
+
+ @Override
+ public int compare(final DateTime left, final DateTime right) {
+ return left.compareTo(right);
+ }
+ };
+
+ @Test(groups = "slow")
+ public void testRecordPageViews() throws Exception {
+ // Record a bunch of random visits by two visitors
+ final DateTime start = clock.getUTCNow();
+ final List<DateTime> visits = generatePageViews(nbVisits, start);
+ final DateTime end = dateTimeOrdering.max(visits);
+
+ // Verify the visits recorded
+ final List<Map<String, Object>> meteringAggregateUsage = getMeteringAggregateUsage(bundleId, category, start, end);
+ final List<DateTime> visitsFound = new ArrayList<DateTime>();
+ for (final Map<String, Object> oneUsage : meteringAggregateUsage) {
+ Assert.assertEquals(oneUsage.get("sourceName"), bundleId.toString());
+ Assert.assertEquals(oneUsage.get("eventCategory"), category);
+ Assert.assertEquals(oneUsage.get("metric"), "__AGGREGATE__");
+
+ // Retrieve the timestamps
+ final ImmutableList<String> samples = ImmutableList.<String>copyOf(Splitter.on(",").split((String) oneUsage.get("samples")));
+ for (int i = 0; i < samples.size(); i++) {
+ visitsFound.add(new DateTime(Long.valueOf(samples.get(i)) * 1000, DateTimeZone.UTC));
+ i++;
+ }
+ }
+
+ Assert.assertEquals(dateTimeOrdering.immutableSortedCopy(visitsFound), dateTimeOrdering.immutableSortedCopy(visits));
+ }
+
+ private List<DateTime> generatePageViews(final int nbVisits, final DateTime start) throws IOException {
+ DateTime lastVisit = start;
+ final List<DateTime> visits = new ArrayList<DateTime>();
+ for (int i = 0; i < nbVisits; i++) {
+ DateTime visitDate = lastVisit.plusSeconds(i);
+ recordMeteringUsage(bundleId, category, visitor1, visitDate);
+ visits.add(visitDate);
+
+ visitDate = visitDate.plusSeconds(1);
+ recordMeteringUsage(bundleId, category, visitor2, visitDate);
+ visits.add(visitDate);
+
+ lastVisit = visitDate;
+ }
+
+ return visits;
+ }
+}