TimeseriesServiceTest.java

141 lines | 5.495 kB Blame History Raw Download
/**
 * Copyright © 2016 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.dao.timeseries;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.utils.UUIDs;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.kv.*;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/**
 * @author Andrew Shvayka
 */

@Slf4j
public class TimeseriesServiceTest extends AbstractServiceTest {

    private static final String STRING_KEY = "stringKey";
    private static final String LONG_KEY = "longKey";
    private static final String DOUBLE_KEY = "doubleKey";
    private static final String BOOLEAN_KEY = "booleanKey";

    public static final int PARTITION_MINUTES = 1100;

    private static final long TS = 42L;

    KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
    KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
    KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE);
    KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE);

    @Test
    public void testFindAllLatest() throws Exception {
        DeviceId deviceId = new DeviceId(UUIDs.timeBased());

        saveEntries(deviceId, TS - 2);
        saveEntries(deviceId, TS - 1);
        saveEntries(deviceId, TS);

        ResultSetFuture rsFuture = tsService.findAllLatest(DataConstants.DEVICE, deviceId);
        List<TsKvEntry> tsList = tsService.convertResultSetToTsKvEntryList(rsFuture.get());

        assertNotNull(tsList);
        assertEquals(4, tsList.size());
        for (int i = 0; i < tsList.size(); i++) {
            assertEquals(TS, tsList.get(i).getTs());
        }

        Collections.sort(tsList, (o1, o2) -> o1.getKey().compareTo(o2.getKey()));

        List<TsKvEntry> expected = Arrays.asList(
                toTsEntry(TS, stringKvEntry),
                toTsEntry(TS, longKvEntry),
                toTsEntry(TS, doubleKvEntry),
                toTsEntry(TS, booleanKvEntry));
        Collections.sort(expected, (o1, o2) -> o1.getKey().compareTo(o2.getKey()));

        assertEquals(expected, tsList);
    }

    @Test
    public void testFindLatest() throws Exception {
        DeviceId deviceId = new DeviceId(UUIDs.timeBased());

        saveEntries(deviceId, TS - 2);
        saveEntries(deviceId, TS - 1);
        saveEntries(deviceId, TS);

        List<ResultSet> rs = tsService.findLatest(DataConstants.DEVICE, deviceId, Collections.singleton(STRING_KEY)).get();
        Assert.assertEquals(1, rs.size());
        Assert.assertEquals(toTsEntry(TS, stringKvEntry), tsService.convertResultToTsKvEntry(rs.get(0).one()));
    }

    @Test
    public void testFindDeviceTsDataByQuery() throws Exception {
        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
        LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
        log.debug("Start event time is {}", localDateTime);
        List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);

        for (int i = 0; i < PARTITION_MINUTES; i++) {
            long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
            BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
            tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
            entries.add(tsKvEntry);
        }
        log.debug("Saved all records {}", localDateTime);
        List<TsKvEntry> list = tsService.find(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
                LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli()));
        log.debug("Fetched records {}", localDateTime);
        List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
        assertEquals(expected.size(), list.size());
        assertEquals(expected, list);
    }


    private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
        tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get();
        tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, longKvEntry)).get();
        tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, doubleKvEntry)).get();
        tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, booleanKvEntry)).get();
    }

    private static TsKvEntry toTsEntry(long ts, KvEntry entry) {
        return new BasicTsKvEntry(ts, entry);
    }


}