killbill-memoizeit
Changes
meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java 15(+3 -12)
meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java 1(+1 -0)
meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java 1(+1 -0)
Details
diff --git a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
index 597cf74..de7e9ae 100644
--- a/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
+++ b/api/src/main/java/com/ning/billing/meter/api/MeterUserApi.java
@@ -21,6 +21,8 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
+import javax.annotation.Nullable;
+
import org.joda.time.DateTime;
import com.ning.billing.util.callcontext.CallContext;
@@ -28,11 +30,86 @@ import com.ning.billing.util.callcontext.TenantContext;
public interface MeterUserApi {
- public void getAggregateUsage(OutputStream outputStream, String source, Collection<String> categories,
- DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+ /**
+ * Get usage data for all metrics (given a set of categories) as Json using an accumulating filter.
+ * <p/>
+ * Emulate: select sum(value) from chunks group by source, category, date_trunc('timeAggregationMode', timestamp);
+ *
+ * @param outputStream stream to write the data to
+ * @param timeAggregationMode granularity of the accumulator
+ * @param source source
+ * @param categories categories
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException
+ */
+ void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+ String source, Collection<String> categories,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
- public void getUsage(OutputStream outputStream, String source, Map<String, Collection<String>> metricsPerCategory,
- DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+ /**
+ * Get usage data as Json using an accumulating filter.
+ * <p/>
+ * Emulate: select sum(value) from chunks group by source, category, metric, date_trunc('timeAggregationMode', timestamp);
+ *
+ * @param outputStream stream to write the data to
+ * @param timeAggregationMode granularity of the accumulator
+ * @param source source
+ * @param metricsPerCategory mapping of metrics per category
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, TimeAggregationMode timeAggregationMode,
+ String source, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ /**
+ * Get usage data as Json using a decimating filter to reduce the number of points
+ *
+ * @param outputStream stream to write the data to
+ * @param decimationMode decimation mode for the decimating filter
+ * @param outputCount number of data point the decimating filter should output, can be null
+ * @param source source
+ * @param metricsPerCategory mapping of metrics per category
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, DecimationMode decimationMode, @Nullable Integer outputCount,
+ String source, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ /**
+ * Get usage data for all metrics (given a set of categories) as Json.
+ *
+ * @param outputStream stream to write the data to
+ * @param source source
+ * @param categories categories
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, String source, Collection<String> categories,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
+
+ /**
+ * Get usage data as Json.
+ *
+ * @param outputStream stream to write the data to
+ * @param source source
+ * @param metricsPerCategory mapping of metrics per category
+ * @param fromTimestamp earliest timestamp to consider (inclusive)
+ * @param toTimestamp latest timestamp to consider (inclusive)
+ * @param context call context
+ * @throws IOException generic I/O exception
+ */
+ void getUsage(OutputStream outputStream, String source, Map<String, Collection<String>> metricsPerCategory,
+ DateTime fromTimestamp, DateTime toTimestamp, TenantContext context) throws IOException;
/**
* Shortcut API to record a usage value of "1" for a given category and metric.
diff --git a/api/src/main/java/com/ning/billing/meter/api/TimeAggregationMode.java b/api/src/main/java/com/ning/billing/meter/api/TimeAggregationMode.java
new file mode 100644
index 0000000..a35bfb8
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/meter/api/TimeAggregationMode.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api;
+
+public enum TimeAggregationMode {
+ SECONDS,
+ MINUTES,
+ HOURS,
+ DAYS,
+ MONTHS,
+ YEARS
+}
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 82d6d7d..1be8fd9 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -79,7 +79,8 @@ public interface JaxrsResource {
public static final String QUERY_NOTIFICATION_CALLBACK = "cb";
- public static final String QUERY_METER_WITH_AGGREGATE = "aggregate";
+ public static final String QUERY_METER_WITH_CATEGORY_AGGREGATE = "withCategoryAggregate";
+ public static final String QUERY_METER_TIME_AGGREGATION_MODE = "timeAggregationMode";
public static final String QUERY_METER_TIMESTAMP = "timestamp";
public static final String QUERY_METER_FROM = "from";
public static final String QUERY_METER_TO = "to";
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
index 634d63f..f44ada5 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/MeterResource.java
@@ -43,6 +43,7 @@ import org.joda.time.DateTime;
import com.ning.billing.jaxrs.util.Context;
import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.util.api.AuditUserApi;
import com.ning.billing.util.api.CustomFieldUserApi;
import com.ning.billing.util.api.TagUserApi;
@@ -50,6 +51,7 @@ import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.TenantContext;
import com.ning.billing.util.clock.Clock;
+import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -79,12 +81,13 @@ public class MeterResource extends JaxRsResourceBase {
@Path("/{source:" + STRING_PATTERN + "}")
@Produces(APPLICATION_JSON)
public StreamingOutput getUsage(@PathParam("source") final String source,
+ // Aggregates per category
@QueryParam(QUERY_METER_CATEGORY) final List<String> categories,
// Format: category,metric
@QueryParam(QUERY_METER_CATEGORY_AND_METRIC) final List<String> categoriesAndMetrics,
@QueryParam(QUERY_METER_FROM) final String fromTimestampString,
@QueryParam(QUERY_METER_TO) final String toTimestampString,
- @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+ @QueryParam(QUERY_METER_TIME_AGGREGATION_MODE) @DefaultValue("") final String timeAggregationModeString,
@javax.ws.rs.core.Context final HttpServletRequest request) {
final DateTime fromTimestamp = DATE_TIME_FORMATTER.parseDateTime(fromTimestampString);
final DateTime toTimestamp = DATE_TIME_FORMATTER.parseDateTime(toTimestampString);
@@ -93,11 +96,22 @@ public class MeterResource extends JaxRsResourceBase {
return new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
- if (withAggregate) {
- meterApi.getAggregateUsage(output, source, categories, fromTimestamp, toTimestamp, tenantContext);
+ // Look at aggregates per category?
+ if (categories != null) {
+ if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+ meterApi.getUsage(output, source, categories, fromTimestamp, toTimestamp, tenantContext);
+ } else {
+ final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+ meterApi.getUsage(output, timeAggregationMode, source, categories, fromTimestamp, toTimestamp, tenantContext);
+ }
} else {
final Map<String, Collection<String>> metricsPerCategory = retrieveMetricsPerCategory(categoriesAndMetrics);
- meterApi.getUsage(output, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ if (Strings.isNullOrEmpty(timeAggregationModeString)) {
+ meterApi.getUsage(output, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ } else {
+ final TimeAggregationMode timeAggregationMode = TimeAggregationMode.valueOf(timeAggregationModeString);
+ meterApi.getUsage(output, timeAggregationMode, source, metricsPerCategory, fromTimestamp, toTimestamp, tenantContext);
+ }
}
}
};
@@ -132,7 +146,7 @@ public class MeterResource extends JaxRsResourceBase {
public Response recordUsage(@PathParam("source") final String source,
@PathParam("categoryName") final String categoryName,
@PathParam("metricName") final String metricName,
- @QueryParam(QUERY_METER_WITH_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
+ @QueryParam(QUERY_METER_WITH_CATEGORY_AGGREGATE) @DefaultValue("false") final Boolean withAggregate,
@QueryParam(QUERY_METER_TIMESTAMP) final String timestampString,
@HeaderParam(HDR_CREATED_BY) final String createdBy,
@HeaderParam(HDR_REASON) final String reason,
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..2f933d1
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/AccumulatingJsonSamplesOutputer.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.api.TimeAggregationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class AccumulatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ private final AccumulatorSampleConsumer accumulatorSampleConsumer;
+
+ private String lastSource;
+ private String lastEventCategory;
+ private String lastMetric;
+
+ public AccumulatingJsonSamplesOutputer(final TimeAggregationMode timeAggregationMode, final TimelineEventHandler timelineEventHandler,
+ final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ this.accumulatorSampleConsumer = new AccumulatorSampleConsumer(timeAggregationMode, new CSVSampleProcessor());
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+
+ final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, accumulatorSampleConsumer);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+ }
+
+ lastSource = source;
+ lastEventCategory = eventCategory;
+ lastMetric = metric;
+ }
+ }
+
+ @Override
+ protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+ final String samples = accumulatorSampleConsumer.flush();
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(lastSource, lastEventCategory, lastMetric, samples));
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
new file mode 100644
index 0000000..4ffd3f6
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DebugJsonSamplesOutputer.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Strings;
+
+public class DebugJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ private final boolean withBinaryData;
+ private final ObjectWriter writer;
+
+ public DebugJsonSamplesOutputer(final boolean withBinaryData, final TimelineEventHandler timelineEventHandler,
+ final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ this.withBinaryData = withBinaryData;
+ if (withBinaryData) {
+ this.writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
+ } else {
+ this.writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
+ }
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ if (withBinaryData) {
+ writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
+ } else {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String category = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+ final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, category, metric, samples));
+ }
+ }
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
new file mode 100644
index 0000000..7fed8a7
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DecimatingJsonSamplesOutputer.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.meter.api.DecimationMode;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
+import com.ning.billing.meter.timeline.consumer.filter.DecimatingSampleFilter;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DecimatingJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ private final Integer outputCount;
+ private final DecimationMode decimationMode;
+
+ private Map<Integer, Map<Integer, DecimatingSampleFilter>> filters;
+
+ public DecimatingJsonSamplesOutputer(final DecimationMode decimationMode, @Nullable final Integer outputCount,
+ final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ this.outputCount = outputCount;
+ this.decimationMode = decimationMode;
+ }
+
+ @Override
+ protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds, final DateTime startTime, final DateTime endTime) throws IOException {
+ // Create the decimating filters
+ filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
+
+ super.output(output, sourceIds, metricIds, startTime, endTime);
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+ final TimeRangeSampleProcessor filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
+
+ final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+ }
+ }
+ }
+
+ private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> sourceIds, final List<Integer> metricIds, final DecimationMode decimationMode,
+ final DateTime startTime, final DateTime endTime, final Integer outputCount) {
+ final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
+ for (final Integer sourceId : sourceIds) {
+ filters.put(sourceId, new HashMap<Integer, DecimatingSampleFilter>());
+ for (final Integer metric : metricIds) {
+ filters.get(sourceId).put(metric, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
+ }
+ }
+ return filters;
+ }
+
+ private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
+ final DecimatingSampleFilter rangeSampleProcessor;
+ if (outputCount == null) {
+ rangeSampleProcessor = null;
+ } else {
+ // TODO Fix the polling interval
+ rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleProcessor());
+ }
+
+ return rangeSampleProcessor;
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
new file mode 100644
index 0000000..d11e3f3
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultJsonSamplesOutputer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.api.user;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.CSVConsumer;
+import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Strings;
+
+public class DefaultJsonSamplesOutputer extends JsonSamplesOutputer {
+
+ public DefaultJsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
+ super(timelineEventHandler, timelineDao, context);
+ }
+
+ @Override
+ protected void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException {
+ for (final TimelineChunk chunk : chunksForSourceAndMetric) {
+ final String source = timelineDao.getSource(chunk.getSourceId(), context);
+ final CategoryRecordIdAndMetric categoryIdAndMetric = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
+ final String eventCategory = timelineDao.getEventCategory(categoryIdAndMetric.getEventCategoryId(), context);
+ final String metric = categoryIdAndMetric.getMetric();
+
+ final String samples = CSVConsumer.getSamplesAsCSV(sampleCoder, chunk);
+
+ // Don't write out empty samples
+ if (!Strings.isNullOrEmpty(samples)) {
+ generator.writeObject(new SamplesForMetricAndSource(source, eventCategory, metric, samples));
+ }
+ }
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
index 92023cc..880baf3 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/DefaultMeterUserApi.java
@@ -21,11 +21,14 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import org.joda.time.DateTime;
+import com.ning.billing.meter.api.DecimationMode;
import com.ning.billing.meter.api.MeterUserApi;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.meter.timeline.TimelineEventHandler;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.util.callcontext.CallContext;
@@ -56,12 +59,43 @@ public class DefaultMeterUserApi implements MeterUserApi {
}
@Override
- public void getAggregateUsage(final OutputStream outputStream, final String source, final Collection<String> categories,
- final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+ final String source, final Collection<String> categories,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
+ for (final String category : categories) {
+ metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
+ }
+
+ getUsage(outputStream, timeAggregationMode, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
+ }
+
+ @Override
+ public void getUsage(final OutputStream outputStream, final TimeAggregationMode timeAggregationMode,
+ final String source, final Map<String, Collection<String>> metricsPerCategory,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+ final JsonSamplesOutputer outputerJson = new AccumulatingJsonSamplesOutputer(timeAggregationMode, timelineEventHandler, timelineDao, internalTenantContext);
+ outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+ }
+
+ @Override
+ public void getUsage(final OutputStream outputStream, final DecimationMode decimationMode, @Nullable final Integer outputCount,
+ final String source, final Map<String, Collection<String>> metricsPerCategory,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
+ final JsonSamplesOutputer outputerJson = new DecimatingJsonSamplesOutputer(decimationMode, outputCount, timelineEventHandler, timelineDao, internalTenantContext);
+ outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
+ }
+
+ @Override
+ public void getUsage(final OutputStream outputStream, final String source, final Collection<String> categories,
+ final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
final ImmutableMap.Builder<String, Collection<String>> metricsPerCategory = new Builder<String, Collection<String>>();
for (final String category : categories) {
metricsPerCategory.put(category, ImmutableList.<String>of(AGGREGATE_METRIC_NAME));
}
+
getUsage(outputStream, source, metricsPerCategory.build(), fromTimestamp, toTimestamp, context);
}
@@ -69,8 +103,7 @@ public class DefaultMeterUserApi implements MeterUserApi {
public void getUsage(final OutputStream outputStream, final String source, final Map<String, Collection<String>> metricsPerCategory,
final DateTime fromTimestamp, final DateTime toTimestamp, final TenantContext context) throws IOException {
final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
-
- final JsonSamplesOutputer outputerJson = new JsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
+ final JsonSamplesOutputer outputerJson = new DefaultJsonSamplesOutputer(timelineEventHandler, timelineDao, internalTenantContext);
outputerJson.output(outputStream, ImmutableList.<String>of(source), metricsPerCategory, fromTimestamp, toTimestamp);
}
diff --git a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
index 5309ed7..9478edf 100644
--- a/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
+++ b/meter/src/main/java/com/ning/billing/meter/api/user/JsonSamplesOutputer.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -29,37 +28,26 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
-import org.skife.config.TimeSpan;
import com.ning.billing.meter.timeline.TimelineEventHandler;
-import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
-import com.ning.billing.meter.timeline.chunks.TimelineChunksViews;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.consumer.CSVConsumer;
-import com.ning.billing.meter.timeline.consumer.CSVSampleProcessor;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
-import com.ning.billing.meter.timeline.consumer.TimelineChunkDecoded;
-import com.ning.billing.meter.timeline.consumer.filter.DecimatingSampleFilter;
-import com.ning.billing.meter.timeline.consumer.filter.DecimationMode;
-import com.ning.billing.meter.timeline.metrics.SamplesForMetricAndSource;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Strings;
-public class JsonSamplesOutputer {
+public abstract class JsonSamplesOutputer {
- private static final ObjectMapper objectMapper = new ObjectMapper();
+ protected static final ObjectMapper objectMapper = new ObjectMapper();
- private final TimelineEventHandler timelineEventHandler;
- private final TimelineDao timelineDao;
- private final SampleCoder sampleCoder;
- private final InternalTenantContext context;
+ protected final TimelineEventHandler timelineEventHandler;
+ protected final TimelineDao timelineDao;
+ protected final SampleCoder sampleCoder;
+ protected final InternalTenantContext context;
public JsonSamplesOutputer(final TimelineEventHandler timelineEventHandler, final TimelineDao timelineDao, final InternalTenantContext context) {
this.timelineEventHandler = timelineEventHandler;
@@ -68,38 +56,31 @@ public class JsonSamplesOutputer {
this.context = context;
}
- public void output(final OutputStream output, final List<String> sources, final Map<String, Collection<String>> metricsPerCategory,
- final DateTime startTime, final DateTime endTime) throws IOException {
- // Default - output all data points as CSV
- output(output, sources, metricsPerCategory, DecimationMode.PEAK_PICK, null, false, false, startTime, endTime);
- }
+ protected abstract void writeJsonForChunks(final JsonGenerator generator, final Collection<? extends TimelineChunk> chunksForSourceAndMetric) throws IOException;
public void output(final OutputStream output, final List<String> sources, final Map<String, Collection<String>> metricsPerCategory,
- final DecimationMode decimationMode, @Nullable final Integer outputCount, final boolean decodeSamples, final boolean compact,
final DateTime startTime, final DateTime endTime) throws IOException {
// Retrieve the source and metric ids
final List<Integer> sourceIds = translateSourcesToSourceIds(sources);
final List<Integer> metricIds = translateCategoriesAndMetricNamesToMetricIds(metricsPerCategory);
+ output(output, sourceIds, metricIds, startTime, endTime);
+ }
- // Create the decimating filters, if needed
- final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = createDecimatingSampleFilters(sourceIds, metricIds, decimationMode, startTime, endTime, outputCount);
-
+ protected void output(final OutputStream output, final List<Integer> sourceIds, final List<Integer> metricIds,
+ final DateTime startTime, final DateTime endTime) throws IOException {
// Setup Jackson
- final ObjectWriter writer;
- if (compact) {
- writer = objectMapper.writerWithView(TimelineChunksViews.Compact.class);
- } else {
- writer = objectMapper.writerWithView(TimelineChunksViews.Loose.class);
- }
final JsonGenerator generator = objectMapper.getJsonFactory().createJsonGenerator(output);
generator.writeStartArray();
// First, return all data stored in the database
- writeJsonForStoredChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+ writeJsonForStoredChunks(generator, sourceIds, metricIds, startTime, endTime);
// Now return all data in memory
- writeJsonForInMemoryChunks(generator, writer, filters, sourceIds, metricIds, startTime, endTime, decodeSamples);
+ writeJsonForInMemoryChunks(generator, sourceIds, metricIds, startTime, endTime);
+
+ // Allow implementers to flush their buffers
+ writeRemainingData(generator);
generator.writeEndArray();
@@ -107,6 +88,10 @@ public class JsonSamplesOutputer {
generator.close();
}
+ protected void writeRemainingData(final JsonGenerator generator) throws IOException {
+ // No-op
+ }
+
private List<Integer> translateSourcesToSourceIds(final List<String> sources) {
final List<Integer> hostIds = new ArrayList<Integer>(sources.size());
for (final String source : sources) {
@@ -139,32 +124,8 @@ public class JsonSamplesOutputer {
return metricIds;
}
- private Map<Integer, Map<Integer, DecimatingSampleFilter>> createDecimatingSampleFilters(final List<Integer> hostIds, final List<Integer> sampleKindIds, final DecimationMode decimationMode,
- final DateTime startTime, final DateTime endTime, final Integer outputCount) {
- final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters = new HashMap<Integer, Map<Integer, DecimatingSampleFilter>>();
- for (final Integer hostId : hostIds) {
- filters.put(hostId, new HashMap<Integer, DecimatingSampleFilter>());
- for (final Integer sampleKindId : sampleKindIds) {
- filters.get(hostId).put(sampleKindId, createDecimatingSampleFilter(outputCount, decimationMode, startTime, endTime));
- }
- }
- return filters;
- }
-
- private DecimatingSampleFilter createDecimatingSampleFilter(final Integer outputCount, final DecimationMode decimationMode, final DateTime startTime, final DateTime endTime) {
- final DecimatingSampleFilter rangeSampleProcessor;
- if (outputCount == null) {
- rangeSampleProcessor = null;
- } else {
- // TODO Fix the polling interval
- rangeSampleProcessor = new DecimatingSampleFilter(startTime, endTime, outputCount, new TimeSpan("1s"), decimationMode, new CSVSampleProcessor());
- }
-
- return rangeSampleProcessor;
- }
-
- private void writeJsonForStoredChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
- final List<Integer> sampleKindIdsList, final DateTime startTime, final DateTime endTime, final boolean decodeSamples) throws IOException {
+ private void writeJsonForStoredChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+ final DateTime startTime, final DateTime endTime) throws IOException {
final AtomicReference<Integer> lastHostId = new AtomicReference<Integer>(null);
final AtomicReference<Integer> lastSampleKindId = new AtomicReference<Integer>(null);
final List<TimelineChunk> chunksForHostAndSampleKind = new ArrayList<TimelineChunk>();
@@ -180,7 +141,7 @@ public class JsonSamplesOutputer {
chunksForHostAndSampleKind.add(chunks);
if (previousHostId != null && (!previousHostId.equals(currentHostId) || !previousSampleKindId.equals(currentSampleKindId))) {
try {
- writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+ writeJsonForChunks(generator, chunksForHostAndSampleKind);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -193,13 +154,13 @@ public class JsonSamplesOutputer {
}, context);
if (chunksForHostAndSampleKind.size() > 0) {
- writeJsonForChunks(generator, writer, filters, chunksForHostAndSampleKind, decodeSamples);
+ writeJsonForChunks(generator, chunksForHostAndSampleKind);
chunksForHostAndSampleKind.clear();
}
}
- private void writeJsonForInMemoryChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters, final List<Integer> hostIdsList,
- final List<Integer> sampleKindIdsList, @Nullable final DateTime startTime, @Nullable final DateTime endTime, final boolean decodeSamples) throws IOException {
+ private void writeJsonForInMemoryChunks(final JsonGenerator generator, final List<Integer> hostIdsList, final List<Integer> sampleKindIdsList,
+ @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
for (final Integer hostId : hostIdsList) {
final Collection<? extends TimelineChunk> inMemorySamples;
@@ -208,30 +169,7 @@ public class JsonSamplesOutputer {
} catch (ExecutionException e) {
throw new IOException(e);
}
- writeJsonForChunks(generator, writer, filters, inMemorySamples, decodeSamples);
- }
- }
-
- private void writeJsonForChunks(final JsonGenerator generator, final ObjectWriter writer, final Map<Integer, Map<Integer, DecimatingSampleFilter>> filters,
- final Iterable<? extends TimelineChunk> chunksForHostAndSampleKind, final boolean decodeSamples) throws IOException {
- for (final TimelineChunk chunk : chunksForHostAndSampleKind) {
- if (decodeSamples) {
- writer.writeValue(generator, new TimelineChunkDecoded(chunk, sampleCoder));
- } else {
- final String hostName = timelineDao.getSource(chunk.getSourceId(), context);
- final CategoryRecordIdAndMetric categoryIdAndSampleKind = timelineDao.getCategoryIdAndMetric(chunk.getMetricId(), context);
- final String eventCategory = timelineDao.getEventCategory(categoryIdAndSampleKind.getEventCategoryId(), context);
- final String sampleKind = categoryIdAndSampleKind.getMetric();
- // TODO pass compact form
- final DecimatingSampleFilter filter = filters.get(chunk.getSourceId()).get(chunk.getMetricId());
- // TODO CSV only for now
- final String samples = filter == null ? CSVConsumer.getSamplesAsCSV(sampleCoder, chunk) : CSVConsumer.getSamplesAsCSV(sampleCoder, chunk, filter);
-
- // Don't write out empty samples
- if (!Strings.isNullOrEmpty(samples)) {
- generator.writeObject(new SamplesForMetricAndSource(hostName, eventCategory, sampleKind, samples));
- }
- }
+ writeJsonForChunks(generator, inMemorySamples);
}
}
}
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
index 31f8260..0c05cba 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/AccumulatorSampleConsumer.java
@@ -22,20 +22,12 @@ import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
- public enum TimeAggregationMode {
- SECONDS,
- MINUTES,
- HOURS,
- DAYS,
- MONTHS,
- YEARS
- }
-
private final StringBuilder builder = new StringBuilder();
// Linked HashMap to keep ordering of opcodes as they came
private final Map<SampleOpcode, Double> accumulators = new LinkedHashMap<SampleOpcode, Double>();
@@ -46,11 +38,10 @@ public class AccumulatorSampleConsumer extends TimeRangeSampleProcessor {
private DateTime lastRoundedTime = null;
private int aggregatedSampleNumber = 0;
- public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode) {
+ public AccumulatorSampleConsumer(final TimeAggregationMode timeAggregationMode, final TimeRangeSampleProcessor sampleProcessor) {
super(null, null);
this.timeAggregationMode = timeAggregationMode;
- // TODO should be configurable
- this.sampleProcessor = new CSVSampleProcessor();
+ this.sampleProcessor = sampleProcessor;
}
@Override
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java
index 401e1b4..51a0e6f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/filter/DecimatingSampleFilter.java
@@ -19,6 +19,7 @@ package com.ning.billing.meter.timeline.consumer.filter;
import org.joda.time.DateTime;
import org.skife.config.TimeSpan;
+import com.ning.billing.meter.api.DecimationMode;
import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java
index f4186a7..a5d6954 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/filter/TestDecimatingFilter.java
@@ -25,6 +25,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.api.DecimationMode;
import com.ning.billing.meter.timeline.consumer.TimeRangeSampleProcessor;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
index 933eaa5..2aa9513 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestAccumulatorSampleConsumer.java
@@ -22,7 +22,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.ning.billing.meter.MeterTestSuite;
-import com.ning.billing.meter.timeline.consumer.AccumulatorSampleConsumer.TimeAggregationMode;
+import com.ning.billing.meter.api.TimeAggregationMode;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.util.clock.ClockMock;
@@ -35,7 +35,7 @@ public class TestAccumulatorSampleConsumer extends MeterTestSuite {
clock.setTime(new DateTime(2012, 12, 1, 12, 40, DateTimeZone.UTC));
final DateTime start = clock.getUTCNow();
- final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS);
+ final AccumulatorSampleConsumer sampleConsumer = new AccumulatorSampleConsumer(TimeAggregationMode.DAYS, new CSVSampleProcessor());
// 5 for day 1
sampleConsumer.processOneSample(start, SampleOpcode.DOUBLE, (double) 1);
diff --git a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
index eb09494..238af51 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/KillbillClient.java
@@ -811,7 +811,7 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
protected void recordMeteringUsage(final String source, final String category, final String metric, final DateTime timestamp) throws IOException {
final String meterURI = JaxrsResource.METER_PATH + "/" + source + "/" + category + "/" + metric;
- final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true",
+ final Response meterResponse = doPost(meterURI, null, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_WITH_CATEGORY_AGGREGATE, "true",
JaxrsResource.QUERY_METER_TIMESTAMP, timestamp.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
}
@@ -820,8 +820,7 @@ public abstract class KillbillClient extends ServerTestSuiteWithEmbeddedDB {
final String meterURI = JaxrsResource.METER_PATH + "/" + source;
final Response meterResponse = doGet(meterURI, ImmutableMap.<String, String>of(JaxrsResource.QUERY_METER_CATEGORY, category,
JaxrsResource.QUERY_METER_FROM, from.toString(),
- JaxrsResource.QUERY_METER_TO, to.toString(),
- JaxrsResource.QUERY_METER_WITH_AGGREGATE, "true"), DEFAULT_HTTP_TIMEOUT_SEC);
+ JaxrsResource.QUERY_METER_TO, to.toString()), DEFAULT_HTTP_TIMEOUT_SEC);
assertEquals(meterResponse.getStatusCode(), Status.OK.getStatusCode());
return mapper.readValue(meterResponse.getResponseBody(), new TypeReference<List<Map<String, Object>>>() {});