TestSamplesReplayer.java

121 lines | 5.798 kB Blame History Raw Download
/*
 * Copyright 2010-2012 Ning, Inc.
 *
 * Ning licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package com.ning.billing.meter.timeline.persistent;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mockito.Mockito;
import org.skife.jdbi.v2.DBI;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.ning.billing.meter.MeterTestSuite;
import com.ning.billing.meter.timeline.MockTimelineDao;
import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.samples.SampleOpcode;
import com.ning.billing.meter.timeline.samples.ScalarSample;
import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
import com.ning.billing.meter.timeline.times.TimelineCoder;
import com.ning.billing.util.cache.CacheController;
import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.dao.NonEntityDao;

import com.google.common.collect.ImmutableMap;

// Lightweight version of TestFileBackedBuffer
public class TestSamplesReplayer extends MeterTestSuite {

    // Total space: 255 * 3 = 765 bytes
    private static final int NB_EVENTS = 3;
    // One will still be in memory after the flush
    private static final int EVENTS_ON_DISK = NB_EVENTS - 1;
    private static final int HOST_ID = 1;
    private static final int EVENT_CATEGORY_ID = 123;
    private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestSamplesReplayer-" + System.currentTimeMillis());
    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
    private static final SampleCoder sampleCoder = new DefaultSampleCoder();

    private final NonEntityDao nonEntityDao = Mockito.mock(NonEntityDao.class);
    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(new ClockMock(), nonEntityDao, new CacheControllerDispatcher());

    @BeforeMethod(groups = "fast")
    public void setUp() throws Exception {
        Assert.assertTrue(basePath.mkdir());
    }

    @Test(groups = "fast")
    public void testIdentityFilter() throws Exception {
        // Need less than 765 + 1 (metadata) bytes
        final FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(basePath.toString(), "test", 765, 1);

        // Create the host samples - this will take 255 bytes
        final Map<Integer, ScalarSample> eventMap = new HashMap<Integer, ScalarSample>();
        eventMap.putAll(ImmutableMap.<Integer, ScalarSample>of(
                1, new ScalarSample(SampleOpcode.BYTE, (byte) 0),
                2, new ScalarSample(SampleOpcode.SHORT, (short) 1),
                3, new ScalarSample(SampleOpcode.INT, 1000),
                4, new ScalarSample(SampleOpcode.LONG, 12345678901L),
                5, new ScalarSample(SampleOpcode.DOUBLE, Double.MAX_VALUE)
                                                              ));
        eventMap.putAll(ImmutableMap.<Integer, ScalarSample>of(
                6, new ScalarSample(SampleOpcode.FLOAT, Float.NEGATIVE_INFINITY),
                7, new ScalarSample(SampleOpcode.STRING, "pwet")
                                                              ));
        final DateTime firstTime = new DateTime(DateTimeZone.UTC).minusSeconds(NB_EVENTS * 30);

        // Write the samples to disk
        for (int i = 0; i < NB_EVENTS; i++) {
            final SourceSamplesForTimestamp samples = new SourceSamplesForTimestamp(HOST_ID, "something", firstTime.plusSeconds(30 * i), eventMap);
            fileBackedBuffer.append(samples);
        }

        // Try the replayer
        final Replayer replayer = new Replayer(new File(basePath.toString()).getAbsolutePath());
        final List<SourceSamplesForTimestamp> hostSamples = replayer.readAll();
        Assert.assertEquals(hostSamples.size(), EVENTS_ON_DISK);

        // Try to encode them again
        final MockTimelineDao dao = new MockTimelineDao();
        final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(dao, timelineCoder, sampleCoder, HOST_ID,
                                                                                              EVENT_CATEGORY_ID, hostSamples.get(0).getTimestamp(), internalCallContextFactory);
        for (final SourceSamplesForTimestamp samplesFound : hostSamples) {
            accumulator.addSourceSamples(samplesFound);
        }
        Assert.assertTrue(accumulator.checkSampleCounts(EVENTS_ON_DISK));

        // This will check the SampleCode can encode value correctly
        accumulator.extractAndQueueTimelineChunks();
        Assert.assertEquals(dao.getTimelineChunks().keySet().size(), 7);
        for (final TimelineChunk chunk : dao.getTimelineChunks().values()) {
            Assert.assertEquals(chunk.getSourceId(), HOST_ID);
            Assert.assertEquals(chunk.getSampleCount(), EVENTS_ON_DISK);
        }
    }
}