killbill-memoizeit
Changes
meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java 22(+6 -16)
meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java 33(+17 -16)
meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java 2(+2 -0)
meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java 15(+8 -7)
Details
diff --git a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
index b3aee1f..de7e9ae 100644
--- a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
+++ b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
@@ -20,7 +20,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
-import java.util.UUID;
+
+import javax.annotation.Nullable;
import org.joda.time.DateTime;
@@ -29,21 +30,96 @@ import com.ning.billing.util.callcontext.TenantContext;
public interface MeterUserApi {
- public void getAggregateUsage(OutputStream outputStream, UUID bundleId, Collection<String> categories,
- DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+ /**
+ * Get usage data for all metrics (given a set of categories) as Json using an accumulating filter.
+ * <p/>
+ * Emulate: select sum(value) from chunks group by source, category, date_trunc('timeAggregationMode', timestamp);
+ *
+ * @param outputStream stream to write the data to
+ * @param timeAggregationMode granularity of the accumulator
+ * @param source source
+ * @param categories categories
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException
+ */
+ void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+ String source, Collection<String> categories,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
- public void getUsage(OutputStream outputStream, UUID bundleId, Map<String, Collection<String>> metricsPerCategory,
- DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+ /**
+ * Get usage data as Json using an accumulating filter.
+ * <p/>
+ * Emulate: select sum(value) from chunks group by source, category, metric, date_trunc('timeAggregationMode', timestamp);
+ *
+ * @param outputStream stream to write the data to
+ * @param timeAggregationMode granularity of the accumulator
+ * @param source source
+ * @param metricsPerCategory mapping of metrics per category
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+ String source, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ /**
+ * Get usage data as Json using a decimating filter to reduce the number of points
+ *
+ * @param outputStream stream to write the data to
+ * @param decimationMode decimation mode for the decimating filter
+ * @param outputCount number of data point the decimating filter should output, can be null
+ * @param source source
+ * @param metricsPerCategory mapping of metrics per category
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, DecimationMode decimationMode, @Nullable Integer outputCount,
+ String source, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ /**
+ * Get usage data for all metrics (given a set of categories) as Json.
+ *
+ * @param outputStream stream to write the data to
+ * @param source source
+ * @param categories categories
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, String source, Collection<String> categories,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ /**
+ * Get usage data as Json.
+ *
+ * @param outputStream stream to write the data to
+ * @param source source
+ * @param metricsPerCategory mapping of metrics per category
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, String source, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
/**
* Shortcut API to record a usage value of "1" for a given category and metric.
*
- * @param bundleId bundle id source
+ * @param source source name for this usage
* @param categoryName category name for this usage
* @param metricName metric name associated with this category
* @param context call context
*/
- public void incrementUsage(UUID bundleId, String categoryName, String metricName, DateTime timestamp, CallContext context);
+ public void incrementUsage(String source, String categoryName, String metricName, DateTime timestamp, CallContext context);
/**
* Shortcut API to record a usage value of "1" for a given category and metric.
@@ -52,20 +128,20 @@ public interface MeterUserApi {
* This is useful if one wants to store fine grained usage information (e.g. number of minutes used per cell phone number),
* while keeping a fast access path to the aggregate (e.g. number of minutes used across all cell phone numbers).
*
- * @param bundleId bundle id source
+ * @param source source name for this usage
* @param categoryName category name for this usage
* @param metricName metric name associated with this category
* @param context call context
*/
- public void incrementUsageAndAggregate(UUID bundleId, String categoryName, String metricName, DateTime timestamp, CallContext context);
+ public void incrementUsageAndAggregate(String source, String categoryName, String metricName, DateTime timestamp, CallContext context);
/**
* Fine grained usage API. This is used to record e.g. "X has used 2 credits at 2012/02/04 4:12pm".
*
- * @param bundleId bundle id source
+ * @param source source name for this usage
* @param samplesForCategoriesAndMetrics samples per metric and category
* @param timestamp timestamp of this usage
* @param context tenant context
*/
- public void recordUsage(UUID bundleId, Map<String, Map<String, Object>> samplesForCategoriesAndMetrics, DateTime timestamp, CallContext context);
+ public void recordUsage(String source, Map<String, Map<String, Object>> samplesForCategoriesAndMetrics, DateTime timestamp, CallContext context);
}
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index d2973cb..1be8fd9 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -37,7 +37,7 @@ public interface JaxrsResource {
/*
* Patterns
*/
- public static String STRING_PATTERN = "\\w+";
+ public static String STRING_PATTERN = "[\\w-]+";
public static String UUID_PATTERN = "\\w+-\\w+-\\w+-\\w+-\\w+";
/*
@@ -79,7 +79,8 @@ public interface JaxrsResource {
public static final String QUERY_NOTIFICATION_CALLBACK = "cb";
- public static final String QUERY_METER_WITH_AGGREGATE = "aggregate";
+ public static final String QUERY_METER_WITH_CATEGORY_AGGREGATE = "withCategoryAggregate";
+ public static final String QUERY_METER_TIME_AGGREGATION_MODE = "timeAggregationMode";
public static final String QUERY_METER_TIMESTAMP = "timestamp";
public static final String QUERY_METER_FROM = "from";
public static final String QUERY_METER_TO = "to";
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
index 9b81ac9..f44ada5 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@@ -44,6 +43,7 @@ import org.joda.time.DateTime;
import com.ning.billing.jaxrs.util.Context;
import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.util.api.AuditUserApi;
import com.ning.billing.util.api.CustomFieldUserApi;
import com.ning.billing.util.api.TagUserApi;
@@ -51,7 +51,7 @@ import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.TenantContext;
import com.ning.billing.util.clock.Clock;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -61,8 +61,6 @@ import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
@Path(JaxrsResource.METER_PATH)
public class MeterResource extends JaxRsResourceBase {
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
private final MeterUserApi meterApi;
private final Clock clock;
@@ -80,17 +78,17 @@ public class MeterResource extends JaxRsResourceBase {
}
@GET
- @Path("/{bundleId:" + UUID_PATTERN + "}")
+ @Path("/{source:" + STRING_PATTERN + "}")
@Produces(APPLICATION_JSON)
- public StreamingOutput getUsage(@PathParam("bundleId") final String bundleIdString,
+ public StreamingOutput getUsage(@PathParam("source") final String source,
+ // Aggregates per category
@QueryParam(QUERY_METER_CATEGORY) final List<String> categories,
// Format: category,metric
@QueryParam(QUERY_METER_CATEGORY_AND_METRIC) final List<String> categoriesAndMetrics,
@QueryParam(QUERY_METER_FROM) final String fromTimestampString,
@QueryParam(QUERY_METER_TO) final String toTimestampString,
- @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+ @QueryParam(QUERY_METER_TIME_AGGREGATION_MODE) @DefaultValue("") final String timeAggregationModeString,
@javax.ws.rs.core.Context final HttpServletRequest request) {
- final UUID bundleId = UUID.fromString(bundleIdString);
final DateTime fromTimestamp = DATE_TIME_FORMATTER.parseDateTime(fromTimestampString);
final DateTime toTimestamp = DATE_TIME_FORMATTER.parseDateTime(toTimestampString);
final TenantContext tenantContext = context.createContext(request);
@@ -98,11 +96,22 @@ public class MeterResource extends JaxRsResourceBase {
return new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
- if (withAggregate) {
- meterApi.getAggregateUsage(output, bundleId, categories, fromTimestamp, toTimestamp, tenantContext);
+ // Look at aggregates per category?
+ if (categories != null) {
+ if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+ meterApi.getUsage(output, source, categories, fromTimestamp, toTimestamp, tenantContext);
+ } else {
+ final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+ meterApi.getUsage(output, timeAggregationMode, source, categories, fromTimestamp, toTimestamp, tenantContext);
+ }
} else {
final Map<String, Collection<String>> metricsPerCategory = retrieveMetricsPerCategory(categoriesAndMetrics);
- meterApi.getUsage(output, bundleId, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+ meterApi.getUsage(output, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ } else {
+ final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+ meterApi.getUsage(output, timeAggregationMode, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ }
}
}
};
@@ -131,19 +140,18 @@ public class MeterResource extends JaxRsResourceBase {
}
@POST
- @Path("/{bundleId:" + UUID_PATTERN + "}/{categoryName:" + STRING_PATTERN + "}/{metricName:" + STRING_PATTERN + "}")
+ @Path("/{source:" + STRING_PATTERN + "}/{categoryName:" + STRING_PATTERN + "}/{metricName:" + STRING_PATTERN + "}")
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
- public Response recordUsage(@PathParam("bundleId") final String bundleIdString,
+ public Response recordUsage(@PathParam("source") final String source,
@PathParam("categoryName") final String categoryName,
@PathParam("metricName") final String metricName,
- @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+ @QueryParam(QUERY_METER_WITH_CATEGORY_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
@QueryParam(QUERY_METER_TIMESTAMP) final String timestampString,
@HeaderParam(HDR_CREATED_BY) final String createdBy,
@HeaderParam(HDR_REASON) final String reason,
@HeaderParam(HDR_COMMENT) final String comment,
@javax.ws.rs.core.Context final HttpServletRequest request) {
- final UUID bundleId = UUID.fromString(bundleIdString);
final CallContext callContext = context.createContext(createdBy, reason, comment, request);
final DateTime timestamp;
@@ -154,9 +162,9 @@ public class MeterResource extends JaxRsResourceBase {
}
if (withAggregate) {
- meterApi.incrementUsageAndAggregate(bundleId, categoryName, metricName, timestamp, callContext);
+ meterApi.incrementUsageAndAggregate(source, categoryName, metricName, timestamp, callContext);
} else {
- meterApi.incrementUsage(bundleId, categoryName, metricName, timestamp, callContext);
+ meterApi.incrementUsage(source, categoryName, metricName, timestamp, callContext);
}
return Response.ok().build();
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..2f933d1
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.api.TimeAggregationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class AccumulatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ private final AccumulatorSampleConsumer accumulatorSampleConsumer;
+
+ private String lastSource;
+ private String lastEventCategory;
+ private String lastMetric;
+
+ public AccumulatingJsonSamplesOutputer(final TimeAggregationMode timeAggregationMode, final TimelineEventHandler timelineEventHandler,
+ final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ this.accumulatorSampleConsumer = new AccumulatorSampleConsumer(timeAggregationMode, new CSVSampleProcessor());
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+
+ final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, accumulatorSampleConsumer);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+ }
+
+ lastSource = source;
+ lastEventCategory = eventCategory;
+ lastMetric = metric;
+ }
+ }
+
+ @Override
+ protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+ final String samples = accumulatorSampleConsumer.flush();
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(lastSource, lastEventCategory, lastMetric, samples));
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
new file mode 100644
index 0000000..4ffd3f6
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Strings;
+
+public class DebugJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ private final boolean withBinaryData;
+ private final ObjectWriter writer;
+
+ public DebugJsonSamplesOutputer(final boolean withBinaryData, final TimelineEventHandler timelineEventHandler,
+ final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ this.withBinaryData = withBinaryData;
+ if (withBinaryData) {
+ this.writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
+ } else {
+ this.writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
+ }
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ if (withBinaryData) {
+ writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
+ } else {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String category = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+ final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, category, metric, samples));
+ }
+ }
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..7fed8a7
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.meter.api.DecimationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.filter.DecimatingSampleFilter;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DecimatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ private final Integer outputCount;
+ private final DecimationMode decimationMode;
+
+ private Map<Integer, Map<Integer, DecimatingSampleFilter>> filters;
+
+ public DecimatingJsonSamplesOutputer(final DecimationMode decimationMode, @Nullable final Integer outputCount,
+ final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ this.outputCount = outputCount;
+ this.decimationMode = decimationMode;
+ }
+
+ @Override
+ protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds, final DateTime startTime, final DateTime endTime) throws IOException {
+ // Create the decimating filters
+ filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
+
+ super.output(output, sourceIds, metricIds, startTime, endTime);
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+ final TimeRangeSampleProcessor filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
+
+ final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+ }
+ }
+ }
+
+ private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> sourceIds, final List<Integer> metricIds, final DecimationMode decimationMode,
+ final DateTime startTime, final DateTime endTime, final Integer outputCount) {
+ final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
+ for (final Integer sourceId : sourceIds) {
+ filters.put(sourceId, new HashMap<Integer, DecimatingSampleFilter>());
+ for (final Integer metric : metricIds) {
+ filters.get(sourceId).put(metric, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
+ }
+ }
+ return filters;
+ }
+
+ private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
+ final DecimatingSampleFilter rangeSampleProcessor;
+ if (outputCount == null) {
+ rangeSampleProcessor = null;
+ } else {
+ // TODO Fix the polling interval
+ rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleProcessor());
+ }
+
+ return rangeSampleProcessor;
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
new file mode 100644
index 0000000..d11e3f3
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DefaultJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ public DefaultJsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+
+ final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+ }
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
index 8ea781b..880baf3 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
@@ -20,14 +20,15 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
-import java.util.UUID;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import org.joda.time.DateTime;
-import com.ning.billing.ObjectType;
+import com.ning.billing.meter.api.DecimationMode;
import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.meter.timeline.TimelineEventHandler;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.util.callcontext.CallContext;
@@ -58,50 +59,78 @@ public class DefaultMeterUserApi implements MeterUserApi {
}
@Override
- public void getAggregateUsage(final OutputStream outputStream, final UUID bundleId, final Collection<String> categories,
- final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+ final String source, final Collection<String> categories,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
for (final String category : categories) {
metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
}
- getUsage(outputStream, bundleId, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+
+ getUsage(outputStream, timeAggregationMode, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
}
@Override
- public void getUsage(final OutputStream outputStream, final UUID bundleId, final Map<String, Collection<String>> metricsPerCategory,
+ public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+ final String source, final Map<String, Collection<String>> metricsPerCategory,
final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+ final JsonSamplesOutputer outputerJson = new AccumulatingJsonSamplesOutputer(timeAggregationMode, timelineEventHandler, timelineDao, internalTenantContext);
+ outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+ }
- final JsonSamplesOutputer outputerJson = new JsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
- outputerJson.output(outputStream, ImmutableList.<UUID>of(bundleId), metricsPerCategory, fromTimestamp, toTimestamp);
+ @Override
+ public void getUsage(final OutputStream outputStream, final DecimationMode decimationMode, @Nullable final Integer outputCount,
+ final String source, final Map<String, Collection<String>> metricsPerCategory,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+ final JsonSamplesOutputer outputerJson = new DecimatingJsonSamplesOutputer(decimationMode, outputCount, timelineEventHandler, timelineDao, internalTenantContext);
+ outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
}
@Override
- public void incrementUsage(final UUID bundleId, final String categoryName, final String metricName,
+ public void getUsage(final OutputStream outputStream, final String source, final Collection<String> categories,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
+ for (final String category : categories) {
+ metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
+ }
+
+ getUsage(outputStream, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+ }
+
+ @Override
+ public void getUsage(final OutputStream outputStream, final String source, final Map<String, Collection<String>> metricsPerCategory,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+ final JsonSamplesOutputer outputerJson = new DefaultJsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
+ outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+ }
+
+ @Override
+ public void incrementUsage(final String source, final String categoryName, final String metricName,
final DateTime timestamp, final CallContext context) {
- recordUsage(bundleId,
+ recordUsage(source,
ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1)),
timestamp,
context);
}
@Override
- public void incrementUsageAndAggregate(final UUID bundleId, final String categoryName, final String metricName,
+ public void incrementUsageAndAggregate(final String source, final String categoryName, final String metricName,
final DateTime timestamp, final CallContext context) {
- recordUsage(bundleId,
+ recordUsage(source,
ImmutableMap.<String, Map<String, Object>>of(categoryName, ImmutableMap.<String, Object>of(metricName, (short) 1, AGGREGATE_METRIC_NAME, (short) 1)),
timestamp,
context);
}
@Override
- public void recordUsage(final UUID bundleId, final Map<String, Map<String, Object>> samplesForCategoriesAndMetrics,
+ public void recordUsage(final String source, final Map<String, Map<String, Object>> samplesForCategoriesAndMetrics,
final DateTime timestamp, final CallContext context) {
- final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(bundleId, ObjectType.BUNDLE, context);
-
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(context);
for (final String category : samplesForCategoriesAndMetrics.keySet()) {
- final String sourceName = bundleId.toString();
- timelineEventHandler.record(sourceName, category, timestamp, samplesForCategoriesAndMetrics.get(category),
+ timelineEventHandler.record(source, category, timestamp, samplesForCategoriesAndMetrics.get(category),
internalCallContext);
}
}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
index b589af5..9478edf 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
@@ -20,47 +20,34 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
-import org.skife.config.TimeSpan;
import com.ning.billing.meter.timeline.TimelineEventHandler;
-import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
-import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.codec.TimelineChunkDecoded;
-import com.ning.billing.meter.timeline.consumer.CSVConsumer;
-import com.ning.billing.meter.timeline.consumer.CSVSampleConsumer;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
-import com.ning.billing.meter.timeline.filter.DecimatingSampleFilter;
-import com.ning.billing.meter.timeline.filter.DecimationMode;
-import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Strings;
-public class JsonSamplesOutputer {
+public abstract class JsonSamplesOutputer {
- private static final ObjectMapper objectMapper = new ObjectMapper();
+ protected static final ObjectMapper objectMapper = new ObjectMapper();
- private final TimelineEventHandler timelineEventHandler;
- private final TimelineDao timelineDao;
- private final SampleCoder sampleCoder;
- private final InternalTenantContext context;
+ protected final TimelineEventHandler timelineEventHandler;
+ protected final TimelineDao timelineDao;
+ protected final SampleCoder sampleCoder;
+ protected final InternalTenantContext context;
public JsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
this.timelineEventHandler = timelineEventHandler;
@@ -69,38 +56,31 @@ public class JsonSamplesOutputer {
this.context = context;
}
- public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
- final DateTime startTime, final DateTime endTime) throws IOException {
- // Default - output all data points as CSV
- output(output, bundleIds, metricsPerCategory, DecimationMode.PEAK_PICK, null, false, false, startTime, endTime);
- }
+ protected abstract void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException;
- public void output(final OutputStream output, final List<UUID> bundleIds, final Map<String, Collection<String>> metricsPerCategory,
- final DecimationMode decimationMode, @Nullable final Integer outputCount, final boolean decodeSamples, final boolean compact,
+ public void output(final OutputStream output, final List<String> sources, final Map<String, Collection<String>> metricsPerCategory,
final DateTime startTime, final DateTime endTime) throws IOException {
// Retrieve the source and metric ids
- final List<Integer> sourceIds = translateBundleIdsToSourceIds(bundleIds);
+ final List<Integer> sourceIds = translateSourcesToSourceIds(sources);
final List<Integer> metricIds = translateCategoriesAndMetricNamesToMetricIds(metricsPerCategory);
+ output(output, sourceIds, metricIds, startTime, endTime);
+ }
- // Create the decimating filters, if needed
- final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
-
+ protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds,
+ final DateTime startTime, final DateTime endTime) throws IOException {
// Setup Jackson
- final ObjectWriter writer;
- if (compact) {
- writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
- } else {
- writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
- }
final JsonGenerator generator = objectMapper.getJsonFactory().createJsonGenerator(output);
generator.writeStartArray();
// First, return all data stored in the database
- writeJsonForStoredChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+ writeJsonForStoredChunks(generator, sourceIds, metricIds, startTime, endTime);
// Now return all data in memory
- writeJsonForInMemoryChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+ writeJsonForInMemoryChunks(generator, sourceIds, metricIds, startTime, endTime);
+
+ // Allow implementers to flush their buffers
+ writeRemainingData(generator);
generator.writeEndArray();
@@ -108,10 +88,14 @@ public class JsonSamplesOutputer {
generator.close();
}
- private List<Integer> translateBundleIdsToSourceIds(final List<UUID> bundleIds) {
- final List<Integer> hostIds = new ArrayList<Integer>(bundleIds.size());
- for (final UUID bundleId : bundleIds) {
- hostIds.add(timelineDao.getSourceId(bundleId.toString(), context));
+ protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+ // No-op
+ }
+
+ private List<Integer> translateSourcesToSourceIds(final List<String> sources) {
+ final List<Integer> hostIds = new ArrayList<Integer>(sources.size());
+ for (final String source : sources) {
+ hostIds.add(timelineDao.getSourceId(source, context));
}
return hostIds;
@@ -140,32 +124,8 @@ public class JsonSamplesOutputer {
return metricIds;
}
- private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> hostIds, final List<Integer> sampleKindIds, final DecimationMode decimationMode,
- final DateTime startTime, final DateTime endTime, final Integer outputCount) {
- final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
- for (final Integer hostId : hostIds) {
- filters.put(hostId, new HashMap<Integer, DecimatingSampleFilter>());
- for (final Integer sampleKindId : sampleKindIds) {
- filters.get(hostId).put(sampleKindId, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
- }
- }
- return filters;
- }
-
- private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
- final DecimatingSampleFilter rangeSampleProcessor;
- if (outputCount == null) {
- rangeSampleProcessor = null;
- } else {
- // TODO Fix the polling interval
- rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleConsumer());
- }
-
- return rangeSampleProcessor;
- }
-
- private void writeJsonForStoredChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
- final List<Integer> sampleKindIdsList, final DateTime startTime, final DateTime endTime, final boolean decodeSamples) throws IOException {
+ private void writeJsonForStoredChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+ final DateTime startTime, final DateTime endTime) throws IOException {
final AtomicReference<Integer> lastHostId = new AtomicReference<Integer>(null);
final AtomicReference<Integer> lastSampleKindId = new AtomicReference<Integer>(null);
final List<TimelineChunk> chunksForHostAndSampleKind = new ArrayList<TimelineChunk>();
@@ -181,7 +141,7 @@ public class JsonSamplesOutputer {
chunksForHostAndSampleKind.add(chunks);
if (previousHostId != null && (!previousHostId.equals(currentHostId) || !previousSampleKindId.equals(currentSampleKindId))) {
try {
- writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+ writeJsonForChunks(generator, chunksForHostAndSampleKind);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -194,13 +154,13 @@ public class JsonSamplesOutputer {
}, context);
if (chunksForHostAndSampleKind.size() > 0) {
- writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+ writeJsonForChunks(generator, chunksForHostAndSampleKind);
chunksForHostAndSampleKind.clear();
}
}
- private void writeJsonForInMemoryChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
- final List<Integer> sampleKindIdsList, @Nullable final DateTime startTime, @Nullable final DateTime endTime, final boolean decodeSamples) throws IOException {
+ private void writeJsonForInMemoryChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+ @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
for (final Integer hostId : hostIdsList) {
final Collection<? extends TimelineChunk> inMemorySamples;
@@ -209,30 +169,7 @@ public class JsonSamplesOutputer {
} catch (ExecutionException e) {
throw new IOException(e);
}
- writeJsonForChunks(generator, writer, filters, inMemorySamples, decodeSamples);
- }
- }
-
- private void writeJsonForChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters,
- final Iterable<? extends TimelineChunk> chunksForHostAndSampleKind, final boolean decodeSamples) throws IOException {
- for (final TimelineChunk chunk : chunksForHostAndSampleKind) {
- if (decodeSamples) {
- writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
- } else {
- final String hostName = timelineDao.getSource(chunk.getSourceId(), context);
- final CategoryRecordIdAndMetric categoryIdAndSampleKind = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
- final String eventCategory = timelineDao.getEventCategory(categoryIdAndSampleKind.getEventCategoryId(), context);
- final String sampleKind = categoryIdAndSampleKind.getMetric();
- // TODO pass compact form
- final DecimatingSampleFilter filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
- // TODO CSV only for now
- final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
-
- // Don't write out empty samples
- if (!Strings.isNullOrEmpty(samples)) {
- generator.writeObject(new SamplesForMetricAndSource(hostName, eventCategory, sampleKind, samples));
- }
- }
+ writeJsonForChunks(generator, inMemorySamples);
}
}
}
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java b/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java
index 982641a..dd286c1 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/BackgroundDBChunkWriter.java
@@ -31,13 +31,13 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.util.config.MeterConfig;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.config.MeterConfig;
import com.google.inject.Inject;
import com.google.inject.Singleton;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java b/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java
index 743f677..3e79502 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/codec/DefaultSampleCoder.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.SampleProcessor;
import com.ning.billing.meter.timeline.samples.HalfFloat;
import com.ning.billing.meter.timeline.samples.RepeatSample;
import com.ning.billing.meter.timeline.samples.SampleBase;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java b/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java
index d690bcd..3a9fdf7 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/codec/SampleCoder.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.SampleProcessor;
import com.ning.billing.meter.timeline.samples.SampleBase;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
index d71f14b..0c05cba 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
@@ -22,36 +22,26 @@ import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import com.ning.billing.meter.timeline.codec.TimeRangeSampleProcessor;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
- public enum TimeAggregationMode {
- SECONDS,
- MINUTES,
- HOURS,
- DAYS,
- MONTHS,
- YEARS
- }
-
private final StringBuilder builder = new StringBuilder();
// Linked HashMap to keep ordering of opcodes as they came
private final Map<SampleOpcode, Double> accumulators = new LinkedHashMap<SampleOpcode, Double>();
private final TimeAggregationMode timeAggregationMode;
- private final SampleConsumer sampleConsumer;
+ private final TimeRangeSampleProcessor sampleProcessor;
private DateTime lastRoundedTime = null;
private int aggregatedSampleNumber = 0;
- public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode) {
+ public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode, final TimeRangeSampleProcessor sampleProcessor) {
super(null, null);
this.timeAggregationMode = timeAggregationMode;
- // TODO should be configurable
- this.sampleConsumer = new CSVSampleConsumer();
+ this.sampleProcessor = sampleProcessor;
}
@Override
@@ -109,10 +99,10 @@ public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
// Output one opcode at a time
for (final SampleOpcode opcode : accumulators.keySet()) {
aggregatedSampleNumber++;
- sampleConsumer.consumeSample(aggregatedSampleNumber, opcode, accumulators.get(opcode), lastRoundedTime);
+ sampleProcessor.processOneSample(lastRoundedTime, opcode, accumulators.get(opcode));
}
// This will flush (clear) the sample consumer
- builder.append(sampleConsumer.toString());
+ builder.append(sampleProcessor.toString());
accumulators.clear();
}
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
index 3c4531e..c62f717 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
@@ -24,25 +24,22 @@ import org.joda.time.DateTime;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.filter.DecimatingSampleFilter;
public class CSVConsumer {
- private CSVConsumer() {
- }
-
- public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, final DecimatingSampleFilter rangeSampleProcessor) throws IOException {
- sampleCoder.scan(chunk, rangeSampleProcessor);
- return rangeSampleProcessor.getSampleConsumer().toString();
- }
+ private CSVConsumer() {}
public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk) throws IOException {
return getSamplesAsCSV(sampleCoder, chunk, null, null);
}
public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
- final CSVOutputProcessor processor = new CSVOutputProcessor(startTime, endTime);
- sampleCoder.scan(chunk, processor);
- return processor.toString();
+ final CSVSampleProcessor processor = new CSVSampleProcessor(startTime, endTime);
+ return getSamplesAsCSV(sampleCoder, chunk, processor);
+ }
+
+ public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, final TimeRangeSampleProcessor rangeSampleProcessor) throws IOException {
+ sampleCoder.scan(chunk, rangeSampleProcessor);
+ return rangeSampleProcessor.toString();
}
}
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index 44658fa..e1b3413 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -38,7 +38,6 @@ import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
import com.ning.billing.meter.timeline.util.DateTimeUtils;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
index 805f068..42cbb13 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
@@ -38,7 +38,6 @@ import com.ning.billing.meter.timeline.shutdown.StartTimes;
import com.ning.billing.meter.timeline.shutdown.StartTimesBinder;
import com.ning.billing.meter.timeline.shutdown.StartTimesMapper;
import com.ning.billing.meter.timeline.sources.SourceIdAndMetricIdMapper;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.callcontext.InternalTenantContextBinder;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java
index 115bdf0..b86c237 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestSampleCoder.java
@@ -31,6 +31,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
import com.ning.billing.meter.timeline.samples.RepeatSample;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java
index 3e5b31d..246ce96 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/codec/TestTimelineChunkAccumulator.java
@@ -25,6 +25,8 @@ import org.testng.annotations.Test;
import com.ning.billing.meter.MeterTestSuite;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.SampleProcessor;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
index 933eaa5..2aa9513 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
@@ -22,7 +22,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.ning.billing.meter.MeterTestSuite;
-import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer.TimeAggregationMode;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.util.clock.ClockMock;
@@ -35,7 +35,7 @@ public class TestAccumulatorSampleConsumer extends MeterTestSuite {
clock.setTime(new DateTime(2012, 12, 1, 12, 40, DateTimeZone.UTC));
final DateTime start = clock.getUTCNow();
- final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS);
+ final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS, new CSVSampleProcessor());
// 5 for day 1
sampleConsumer.processOneSample(start, SampleOpcode.DOUBLE, (double) 1);
diff --git a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
index 1780d91..238af51 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
@@ -809,19 +809,18 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
// METERING
//
- protected void recordMeteringUsage(final UUID bundleId, final String category, final String metric, final DateTime timestamp) throws IOException {
- final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString() + "/" + category + "/" + metric;
- final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true",
+ protected void recordMeteringUsage(final String source, final String category, final String metric, final DateTime timestamp) throws IOException {
+ final String meterURI = JaxrsResource.METER_PATH + "/" + source + "/" + category + "/" + metric;
+ final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_CATEGORY_AGGREGATE, "true",
JaxrsResource.QUERY_METER_TIMESTAMP, timestamp.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
}
- protected List<Map<String, Object>> getMeteringAggregateUsage(final UUID bundleId, final String category, final DateTime from, final DateTime to) throws IOException {
- final String meterURI = JaxrsResource.METER_PATH + "/" + bundleId.toString();
+ protected List<Map<String, Object>> getMeteringAggregateUsage(final String source, final String category, final DateTime from, final DateTime to) throws IOException {
+ final String meterURI = JaxrsResource.METER_PATH + "/" + source;
final Response meterResponse = doGet(meterURI, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_CATEGORY, category,
JaxrsResource.QUERY_METER_FROM, from.toString(),
- JaxrsResource.QUERY_METER_TO, to.toString(),
- JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true"), DEFAULT_HTTP_TIMEOUT_SEC);
+ JaxrsResource.QUERY_METER_TO, to.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
return mapper.readValue(meterResponse.getResponseBody(), new TypeReference<List<Map<String, Object>>>() {});
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
index 05e2759..0f88e9b 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestMeter.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -34,7 +33,7 @@ import com.google.common.collect.Ordering;
public class TestMeter extends TestJaxrsBase {
- private final UUID bundleId = UUID.randomUUID();
+ private final String source = UUID.randomUUID().toString();
private final String category = "PageView";
private final String visitor1 = "pierre";
private final String visitor2 = "stephane";
@@ -56,10 +55,10 @@ public class TestMeter extends TestJaxrsBase {
final DateTime end = dateTimeOrdering.max(visits);
// Verify the visits recorded
- final List<Map<String, Object>> meteringAggregateUsage = getMeteringAggregateUsage(bundleId, category, start, end);
+ final List<Map<String, Object>> meteringAggregateUsage = getMeteringAggregateUsage(source, category, start, end);
final List<DateTime> visitsFound = new ArrayList<DateTime>();
for (final Map<String, Object> oneUsage : meteringAggregateUsage) {
- Assert.assertEquals(oneUsage.get("sourceName"), bundleId.toString());
+ Assert.assertEquals(oneUsage.get("sourceName"), source);
Assert.assertEquals(oneUsage.get("eventCategory"), category);
Assert.assertEquals(oneUsage.get("metric"), "__AGGREGATE__");
@@ -79,11 +78,11 @@ public class TestMeter extends TestJaxrsBase {
final List<DateTime> visits = new ArrayList<DateTime>();
for (int i = 0; i < nbVisits; i++) {
DateTime visitDate = lastVisit.plusSeconds(i);
- recordMeteringUsage(bundleId, category, visitor1, visitDate);
+ recordMeteringUsage(source, category, visitor1, visitDate);
visits.add(visitDate);
visitDate = visitDate.plusSeconds(1);
- recordMeteringUsage(bundleId, category, visitor2, visitDate);
+ recordMeteringUsage(source, category, visitor2, visitDate);
visits.add(visitDate);
lastVisit = visitDate;