TestTimelineAggregator.java

166 lines | 8.021 kB Blame History Raw Download
/*
 * Copyright 2010-2012 Ning, Inc.
 *
 * Ning licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package com.ning.billing.meter.timeline.aggregator;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.skife.config.ConfigurationObjectFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.ning.billing.meter.MeterTestSuiteWithEmbeddedDB;
import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
import com.ning.billing.meter.timeline.times.TimelineCoder;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.config.MeterConfig;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class TestTimelineAggregator extends MeterTestSuiteWithEmbeddedDB {

    private static final UUID HOST_UUID = UUID.randomUUID();
    private static final String HOST_NAME = HOST_UUID.toString();
    private static final String EVENT_TYPE = "myType";
    private static final int EVENT_TYPE_ID = 123;
    private static final String MIN_HEAPUSED_KIND = "min_heapUsed";
    private static final String MAX_HEAPUSED_KIND = "max_heapUsed";
    private static final DateTime START_TIME = new DateTime(DateTimeZone.UTC);
    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
    private static final SampleCoder sampleCoder = new DefaultSampleCoder();

    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(getDBI(), new ClockMock());

    private TimelineDao timelineDao;
    private TimelineAggregator aggregator;

    private Integer hostId = null;
    private Integer minHeapUsedKindId = null;
    private Integer maxHeapUsedKindId = null;

    @BeforeMethod(groups = "mysql")
    public void setUp() throws Exception {
        timelineDao = new DefaultTimelineDao(getDBI());
        final Properties properties = System.getProperties();
        properties.put("killbill.usage.timelines.chunksToAggregate", "2,2");
        final MeterConfig config = new ConfigurationObjectFactory(properties).build(MeterConfig.class);
        aggregator = new TimelineAggregator(getDBI(), timelineDao, timelineCoder, sampleCoder, config, internalCallContextFactory);
    }

    @Test(groups = "mysql")
    public void testAggregation() throws Exception {
        // Create the host
        hostId = timelineDao.getOrAddSource(HOST_NAME, internalCallContext);
        Assert.assertNotNull(hostId);
        Assert.assertEquals(timelineDao.getSources(internalCallContext).values().size(), 1);

        // Create the sample kinds
        minHeapUsedKindId = timelineDao.getOrAddMetric(EVENT_TYPE_ID, MIN_HEAPUSED_KIND, internalCallContext);
        Assert.assertNotNull(minHeapUsedKindId);
        maxHeapUsedKindId = timelineDao.getOrAddMetric(EVENT_TYPE_ID, MAX_HEAPUSED_KIND, internalCallContext);
        Assert.assertNotNull(maxHeapUsedKindId);
        Assert.assertEquals(timelineDao.getMetrics(internalCallContext).values().size(), 2);

        // Create two sets of times: T - 125 ... T - 65 ; T - 60 ... T (note the gap!)
        createAOneHourTimelineChunk(125);
        createAOneHourTimelineChunk(60);

        // Check the getSamplesByHostIdsAndSampleKindIds DAO method works as expected
        // You might want to draw timelines on a paper and remember boundaries are inclusive to understand these numbers
        checkSamplesForATimeline(185, 126, 0);
        checkSamplesForATimeline(185, 125, 2);
        checkSamplesForATimeline(64, 61, 0);
        checkSamplesForATimeline(125, 65, 2);
        checkSamplesForATimeline(60, 0, 2);
        checkSamplesForATimeline(125, 0, 4);
        checkSamplesForATimeline(124, 0, 4);
        checkSamplesForATimeline(124, 66, 2);

        aggregator.getAndProcessTimelineAggregationCandidates();

        Assert.assertEquals(timelineDao.getSources(internalCallContext).values().size(), 1);
        Assert.assertEquals(timelineDao.getMetrics(internalCallContext).values().size(), 2);

        // Similar than above, but we have only 2 now
        checkSamplesForATimeline(185, 126, 0);
        checkSamplesForATimeline(185, 125, 2);
        // Note, the gap is filled now
        checkSamplesForATimeline(64, 61, 2);
        checkSamplesForATimeline(125, 65, 2);
        checkSamplesForATimeline(60, 0, 2);
        checkSamplesForATimeline(125, 0, 2);
        checkSamplesForATimeline(124, 0, 2);
        checkSamplesForATimeline(124, 66, 2);
    }

    private void checkSamplesForATimeline(final Integer startTimeMinutesAgo, final Integer endTimeMinutesAgo, final long expectedChunks) throws InterruptedException {
        final AtomicLong timelineChunkSeen = new AtomicLong(0);

        timelineDao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(hostId), ImmutableList.<Integer>of(minHeapUsedKindId, maxHeapUsedKindId),
                                                      START_TIME.minusMinutes(startTimeMinutesAgo), START_TIME.minusMinutes(endTimeMinutesAgo), new TimelineChunkConsumer() {

            @Override
            public void processTimelineChunk(final TimelineChunk chunk) {
                Assert.assertEquals((Integer) chunk.getSourceId(), hostId);
                Assert.assertTrue(chunk.getMetricId() == minHeapUsedKindId || chunk.getMetricId() == maxHeapUsedKindId);
                timelineChunkSeen.incrementAndGet();
            }
        }, internalCallContext);

        Assert.assertEquals(timelineChunkSeen.get(), expectedChunks);
    }

    private void createAOneHourTimelineChunk(final int startTimeMinutesAgo) throws IOException {
        final DateTime firstSampleTime = START_TIME.minusMinutes(startTimeMinutesAgo);
        final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(timelineDao, timelineCoder, sampleCoder, hostId, EVENT_TYPE_ID, firstSampleTime, internalCallContextFactory);
        // 120 samples per hour
        for (int i = 0; i < 120; i++) {
            final DateTime eventDateTime = firstSampleTime.plusSeconds(i * 30);
            final Map<Integer, ScalarSample> event = createEvent(eventDateTime.getMillis());
            final SourceSamplesForTimestamp samples = new SourceSamplesForTimestamp(hostId, EVENT_TYPE, eventDateTime, event);
            accumulator.addSourceSamples(samples);
        }

        accumulator.extractAndQueueTimelineChunks();
    }

    private Map<Integer, ScalarSample> createEvent(final long ts) {
        return ImmutableMap.<Integer, ScalarSample>of(
                minHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.MIN_VALUE + ts),
                maxHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.MAX_VALUE - ts)
                                                     );
    }
}