thingsboard-developers

Timeseries and Attributes DAO

12/1/2016 7:42:53 AM

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
new file mode 100644
index 0000000..8089db9
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.attributes;
+
+import com.datastax.driver.core.ResultSetFuture;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * @author Andrew Shvayka
+ */
+public interface AttributesDao {
+
+    AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey);
+
+    List<AttributeKvEntry> findAll(EntityId entityId, String attributeType);
+
+    ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
+
+    void removeAll(EntityId entityId, String scope, List<String> keys);
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
new file mode 100644
index 0000000..5e72c90
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.attributes;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+
+import java.util.List;
+
+/**
+ * @author Andrew Shvayka
+ */
+public interface AttributesService {
+
+    AttributeKvEntry find(EntityId entityId, String scope, String attributeKey);
+
+    List<AttributeKvEntry> findAll(EntityId entityId, String scope);
+
+    ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
+
+    void removeAll(EntityId entityId, String scope, List<String> attributeKeys);
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
new file mode 100644
index 0000000..4c542e3
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.attributes;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DataType;
+import org.thingsboard.server.dao.AbstractDao;
+import org.thingsboard.server.dao.model.ModelConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.thingsboard.server.dao.model.ModelConstants.*;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Component
+@Slf4j
+public class BaseAttributesDao extends AbstractDao implements AttributesDao {
+    
+    private PreparedStatement saveStmt;
+
+    @Override
+    public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) {
+        Select.Where select = select().from(ATTRIBUTES_KV_CF)
+                .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
+                .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
+                .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
+                .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
+        log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
+        return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one());
+    }
+
+    @Override
+    public List<AttributeKvEntry> findAll(EntityId entityId, String attributeType) {
+        Select.Where select = select().from(ATTRIBUTES_KV_CF)
+                .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
+                .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
+                .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType));
+        log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType);
+        return convertResultToAttributesKvEntryList(executeRead(select));
+    }
+
+    @Override
+    public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
+        BoundStatement stmt = getSaveStmt().bind();
+        stmt.setString(0, entityId.getEntityType().name());
+        stmt.setUUID(1, entityId.getId());
+        stmt.setString(2, attributeType);
+        stmt.setString(3, attribute.getKey());
+        stmt.setLong(4, attribute.getLastUpdateTs());
+        stmt.setString(5, attribute.getStrValue().orElse(null));
+        if (attribute.getBooleanValue().isPresent()) {
+            stmt.setBool(6, attribute.getBooleanValue().get());
+        } else {
+            stmt.setToNull(6);
+        }
+        if (attribute.getLongValue().isPresent()) {
+            stmt.setLong(7, attribute.getLongValue().get());
+        } else {
+            stmt.setToNull(7);
+        }
+        if (attribute.getDoubleValue().isPresent()) {
+            stmt.setDouble(8, attribute.getDoubleValue().get());
+        } else {
+            stmt.setToNull(8);
+        }
+        return executeAsyncWrite(stmt);
+    }
+
+    @Override
+    public void removeAll(EntityId entityId, String attributeType, List<String> keys) {
+        for (String key : keys) {
+            delete(entityId, attributeType, key);
+        }
+    }
+
+    private void delete(EntityId entityId, String attributeType, String key) {
+        Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
+                .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
+                .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
+                .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
+                .and(eq(ATTRIBUTE_KEY_COLUMN, key));
+        log.debug("Remove request: {}", delete.toString());
+        getSession().execute(delete);
+    }
+
+    private PreparedStatement getSaveStmt() {
+        if (saveStmt == null) {
+            saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
+                    "(" + ENTITY_TYPE_COLUMN +
+                    "," + ENTITY_ID_COLUMN +
+                    "," + ATTRIBUTE_TYPE_COLUMN +
+                    "," + ATTRIBUTE_KEY_COLUMN +
+                    "," + LAST_UPDATE_TS_COLUMN +
+                    "," + ModelConstants.STRING_VALUE_COLUMN +
+                    "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
+                    "," + ModelConstants.LONG_VALUE_COLUMN +
+                    "," + ModelConstants.DOUBLE_VALUE_COLUMN +
+                    ")" +
+                    " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)");
+        }
+        return saveStmt;
+    }
+
+    private AttributeKvEntry convertResultToAttributesKvEntry(String key, Row row) {
+        AttributeKvEntry attributeEntry = null;
+        if (row != null) {
+            long lastUpdateTs = row.get(LAST_UPDATE_TS_COLUMN, Long.class);
+            attributeEntry = new BaseAttributeKvEntry(BaseTimeseriesDao.toKvEntry(row, key), lastUpdateTs);
+        }
+        return attributeEntry;
+    }
+
+    private List<AttributeKvEntry> convertResultToAttributesKvEntryList(ResultSet resultSet) {
+        List<Row> rows = resultSet.all();
+        List<AttributeKvEntry> entries = new ArrayList<>(rows.size());
+        if (!rows.isEmpty()) {
+            rows.stream().forEach(row -> {
+                String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN);
+                AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row);
+                if (kvEntry != null) {
+                    entries.add(kvEntry);
+                }
+            });
+        }
+        return entries;
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
new file mode 100644
index 0000000..3e9fc3d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
@@ -0,0 +1,87 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.attributes;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.dao.exception.IncorrectParameterException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.dao.service.Validator;
+
+import java.util.List;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Service
+public class BaseAttributesService implements AttributesService {
+
+    @Autowired
+    private AttributesDao attributesDao;
+
+    @Override
+    public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) {
+        validate(entityId, scope);
+        Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
+        return attributesDao.find(entityId, scope, attributeKey);
+    }
+
+    @Override
+    public List<AttributeKvEntry> findAll(EntityId entityId, String scope) {
+        validate(entityId, scope);
+        return attributesDao.findAll(entityId, scope);
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+        validate(entityId, scope);
+        attributes.forEach(attribute -> validate(attribute));
+        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
+        for(AttributeKvEntry attribute : attributes) {
+            futures.add(attributesDao.save(entityId, scope, attribute));
+        }
+        return Futures.allAsList(futures);
+    }
+
+    @Override
+    public void removeAll(EntityId entityId, String scope, List<String> keys) {
+        validate(entityId, scope);
+        attributesDao.removeAll(entityId, scope, keys);
+    }
+
+    private static void validate(EntityId id, String scope) {
+        Validator.validateId(id.getId(), "Incorrect id " + id);
+        Validator.validateString(scope, "Incorrect scope " + scope);
+    }
+
+    private static void validate(AttributeKvEntry kvEntry) {
+        if (kvEntry == null) {
+            throw new IncorrectParameterException("Key value entry can't be null");
+        } else if (kvEntry.getDataType() == null) {
+            throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null");
+        } else {
+            Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty");
+            Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive");
+        }
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
new file mode 100644
index 0000000..79134f7
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -0,0 +1,313 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.common.data.kv.DataType;
+import org.thingsboard.server.dao.AbstractDao;
+import org.thingsboard.server.dao.model.ModelConstants;
+
+import java.util.*;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Component
+@Slf4j
+public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
+
+    @Value("${cassandra.query.max_limit_per_request}")
+    protected Integer maxLimitPerRequest;
+
+    private PreparedStatement partitionInsertStmt;
+    private PreparedStatement[] latestInsertStmts;
+    private PreparedStatement[] saveStmts;
+    private PreparedStatement findLatestStmt;
+    private PreparedStatement findAllLatestStmt;
+
+    @Override
+    public List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition) {
+        List<Row> rows = Collections.emptyList();
+        Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
+        int partsLength = parts.length;
+        if (parts != null && partsLength > 0) {
+            int limit = maxLimitPerRequest;
+            Optional<Integer> lim = query.getLimit();
+            if (lim.isPresent() && lim.get() < maxLimitPerRequest) {
+                limit = lim.get();
+            }
+
+            rows = new ArrayList<>(limit);
+            int lastIdx = partsLength - 1;
+            for (int i = 0; i < partsLength; i++) {
+                int currentLimit;
+                if (rows.size() >= limit) {
+                    break;
+                } else {
+                    currentLimit = limit - rows.size();
+                }
+                Long partition = parts[i];
+                Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
+                        .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId))
+                        .and(eq(ModelConstants.KEY_COLUMN, query.getKey()))
+                        .and(eq(ModelConstants.PARTITION_COLUMN, partition));
+                if (i == 0 && query.getStartTs().isPresent()) {
+                    where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get()));
+                } else if (i == lastIdx && query.getEndTs().isPresent()) {
+                    where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get()));
+                }
+                where.limit(currentLimit);
+                rows.addAll(executeRead(where).all());
+            }
+        }
+        return convertResultToTsKvEntryList(rows);
+    }
+
+    @Override
+    public ResultSetFuture findLatest(String entityType, UUID entityId, String key) {
+        BoundStatement stmt = getFindLatestStmt().bind();
+        stmt.setString(0, entityType);
+        stmt.setUUID(1, entityId);
+        stmt.setString(2, key);
+        log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
+        return executeAsyncRead(stmt);
+    }
+
+    @Override
+    public ResultSetFuture findAllLatest(String entityType, UUID entityId) {
+        BoundStatement stmt = getFindAllLatestStmt().bind();
+        stmt.setString(0, entityType);
+        stmt.setUUID(1, entityId);
+        log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
+        return executeAsyncRead(stmt);
+    }
+
+    @Override
+    public ResultSetFuture save(String entityType, UUID entityId, long partition, TsKvEntry tsKvEntry) {
+        DataType type = tsKvEntry.getDataType();
+        BoundStatement stmt = getSaveStmt(type).bind()
+                .setString(0, entityType)
+                .setUUID(1, entityId)
+                .setString(2, tsKvEntry.getKey())
+                .setLong(3, partition)
+                .setLong(4, tsKvEntry.getTs());
+        addValue(tsKvEntry, stmt, 5);
+        return executeAsyncWrite(stmt);
+    }
+
+    @Override
+    public ResultSetFuture saveLatest(String entityType, UUID entityId, TsKvEntry tsKvEntry) {
+        DataType type = tsKvEntry.getDataType();
+        BoundStatement stmt = getLatestStmt(type).bind()
+                .setString(0, entityType)
+                .setUUID(1, entityId)
+                .setString(2, tsKvEntry.getKey())
+                .setLong(3, tsKvEntry.getTs());
+        addValue(tsKvEntry, stmt, 4);
+        return executeAsyncWrite(stmt);
+    }
+
+    @Override
+    public ResultSetFuture savePartition(String entityType, UUID entityId, long partition, String key) {
+        log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityType, entityId, key);
+        return executeAsyncWrite(getPartitionInsertStmt().bind()
+                .setString(0, entityType)
+                .setUUID(1, entityId)
+                .setLong(2, partition)
+                .setString(3, key));
+    }
+
+    @Override
+    public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
+        List<TsKvEntry> entries = new ArrayList<>(rows.size());
+        if (!rows.isEmpty()) {
+            rows.stream().forEach(row -> {
+                TsKvEntry kvEntry = convertResultToTsKvEntry(row);
+                if (kvEntry != null) {
+                    entries.add(kvEntry);
+                }
+            });
+        }
+        return entries;
+    }
+
+    @Override
+    public TsKvEntry convertResultToTsKvEntry(Row row) {
+        String key = row.getString(ModelConstants.KEY_COLUMN);
+        long ts = row.getLong(ModelConstants.TS_COLUMN);
+        return new BasicTsKvEntry(ts, toKvEntry(row, key));
+    }
+
+    public static KvEntry toKvEntry(Row row, String key) {
+        KvEntry kvEntry = null;
+        String strV = row.get(ModelConstants.STRING_VALUE_COLUMN, String.class);
+        if (strV != null) {
+            kvEntry = new StringDataEntry(key, strV);
+        } else {
+            Long longV = row.get(ModelConstants.LONG_VALUE_COLUMN, Long.class);
+            if (longV != null) {
+                kvEntry = new LongDataEntry(key, longV);
+            } else {
+                Double doubleV = row.get(ModelConstants.DOUBLE_VALUE_COLUMN, Double.class);
+                if (doubleV != null) {
+                    kvEntry = new DoubleDataEntry(key, doubleV);
+                } else {
+                    Boolean boolV = row.get(ModelConstants.BOOLEAN_VALUE_COLUMN, Boolean.class);
+                    if (boolV != null) {
+                        kvEntry = new BooleanDataEntry(key, boolV);
+                    } else {
+                        log.warn("All values in key-value row are nullable ");
+                    }
+                }
+            }
+        }
+        return kvEntry;
+    }
+
+    /**
+     * Select existing partitions from the table
+     * <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
+     */
+    private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional<Long> minPartition, Optional<Long> maxPartition) {
+        Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
+                .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key));
+        minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get())));
+        maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get())));
+        ResultSet resultSet = executeRead(select);
+        return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new);
+    }
+
+    private PreparedStatement getSaveStmt(DataType dataType) {
+        if (saveStmts == null) {
+            saveStmts = new PreparedStatement[DataType.values().length];
+            for (DataType type : DataType.values()) {
+                saveStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF +
+                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+                        "," + ModelConstants.ENTITY_ID_COLUMN +
+                        "," + ModelConstants.KEY_COLUMN +
+                        "," + ModelConstants.PARTITION_COLUMN +
+                        "," + ModelConstants.TS_COLUMN +
+                        "," + getColumnName(type) + ")" +
+                        " VALUES(?, ?, ?, ?, ?, ?)");
+            }
+        }
+        return saveStmts[dataType.ordinal()];
+    }
+
+    private PreparedStatement getLatestStmt(DataType dataType) {
+        if (latestInsertStmts == null) {
+            latestInsertStmts = new PreparedStatement[DataType.values().length];
+            for (DataType type : DataType.values()) {
+                latestInsertStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_LATEST_CF +
+                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+                        "," + ModelConstants.ENTITY_ID_COLUMN +
+                        "," + ModelConstants.KEY_COLUMN +
+                        "," + ModelConstants.TS_COLUMN +
+                        "," + getColumnName(type) + ")" +
+                        " VALUES(?, ?, ?, ?, ?)");
+            }
+        }
+        return latestInsertStmts[dataType.ordinal()];
+    }
+
+
+    private PreparedStatement getPartitionInsertStmt() {
+        if (partitionInsertStmt == null) {
+            partitionInsertStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF +
+                    "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+                    "," + ModelConstants.ENTITY_ID_COLUMN +
+                    "," + ModelConstants.PARTITION_COLUMN +
+                    "," + ModelConstants.KEY_COLUMN + ")" +
+                    " VALUES(?, ?, ?, ?)");
+        }
+        return partitionInsertStmt;
+    }
+
+    private PreparedStatement getFindLatestStmt() {
+        if (findLatestStmt == null) {
+            findLatestStmt = getSession().prepare("SELECT " +
+                    ModelConstants.KEY_COLUMN + "," +
+                    ModelConstants.TS_COLUMN + "," +
+                    ModelConstants.STRING_VALUE_COLUMN + "," +
+                    ModelConstants.BOOLEAN_VALUE_COLUMN + "," +
+                    ModelConstants.LONG_VALUE_COLUMN + "," +
+                    ModelConstants.DOUBLE_VALUE_COLUMN + " " +
+                    "FROM " + ModelConstants.TS_KV_LATEST_CF + " " +
+                    "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " +
+                    "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? " +
+                    "AND " + ModelConstants.KEY_COLUMN + " = ? ");
+        }
+        return findLatestStmt;
+    }
+
+    private PreparedStatement getFindAllLatestStmt() {
+        if (findAllLatestStmt == null) {
+            findAllLatestStmt = getSession().prepare("SELECT " +
+                    ModelConstants.KEY_COLUMN + "," +
+                    ModelConstants.TS_COLUMN + "," +
+                    ModelConstants.STRING_VALUE_COLUMN + "," +
+                    ModelConstants.BOOLEAN_VALUE_COLUMN + "," +
+                    ModelConstants.LONG_VALUE_COLUMN + "," +
+                    ModelConstants.DOUBLE_VALUE_COLUMN + " " +
+                    "FROM " + ModelConstants.TS_KV_LATEST_CF + " " +
+                    "WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? " +
+                    "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? ");
+        }
+        return findAllLatestStmt;
+    }
+
+    public static String getColumnName(DataType type) {
+        switch (type) {
+            case BOOLEAN:
+                return ModelConstants.BOOLEAN_VALUE_COLUMN;
+            case STRING:
+                return ModelConstants.STRING_VALUE_COLUMN;
+            case LONG:
+                return ModelConstants.LONG_VALUE_COLUMN;
+            case DOUBLE:
+                return ModelConstants.DOUBLE_VALUE_COLUMN;
+            default:
+                throw new RuntimeException("Not implemented!");
+        }
+    }
+
+    public static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
+        switch (kvEntry.getDataType()) {
+            case BOOLEAN:
+                stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue());
+                break;
+            case STRING:
+                stmt.setString(column, kvEntry.getStrValue().get());
+                break;
+            case LONG:
+                stmt.setLong(column, kvEntry.getLongValue().get().longValue());
+                break;
+            case DOUBLE:
+                stmt.setDouble(column, kvEntry.getDoubleValue().get().doubleValue());
+                break;
+        }
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
new file mode 100644
index 0000000..49ef16c
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -0,0 +1,167 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.dao.exception.IncorrectParameterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.dao.service.Validator;
+
+import javax.annotation.PostConstruct;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.*;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Service
+@Slf4j
+public class BaseTimeseriesService implements TimeseriesService {
+
+    public static final int INSERTS_PER_ENTRY = 3;
+
+    @Value("${cassandra.query.ts_key_value_partitioning}")
+    private String partitioning;
+
+    @Autowired
+    private TimeseriesDao timeseriesDao;
+
+    private TsPartitionDate tsFormat;
+
+    @PostConstruct
+    public void init() {
+        Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
+        if (partition.isPresent()) {
+            tsFormat = partition.get();
+        } else {
+            log.warn("Incorrect configuration of partitioning {}", partitioning);
+            throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
+        }
+    }
+
+    @Override
+    public List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query) {
+        validate(entityType, entityId);
+        validate(query);
+        return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()));
+    }
+
+    private Optional<Long> toPartitionTs(Optional<Long> ts) {
+        if (ts.isPresent()) {
+            return Optional.of(toPartitionTs(ts.get()));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys) {
+        validate(entityType, entityId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(keys.size());
+        keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
+        keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityType, entityId.getId(), key)));
+        return Futures.allAsList(futures);
+    }
+
+    @Override
+    public ResultSetFuture findAllLatest(String entityType, UUIDBased entityId) {
+        validate(entityType, entityId);
+        return timeseriesDao.findAllLatest(entityType, entityId.getId());
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, TsKvEntry tsKvEntry) {
+        validate(entityType, entityId);
+        if (tsKvEntry == null) {
+            throw new IncorrectParameterException("Key value entry can't be null");
+        }
+        UUID uid = entityId.getId();
+        long partitionTs = toPartitionTs(tsKvEntry.getTs());
+
+        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
+        saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
+        return Futures.allAsList(futures);
+    }
+
+    @Override
+    public ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, List<TsKvEntry> tsKvEntries) {
+        validate(entityType, entityId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
+        for (TsKvEntry tsKvEntry : tsKvEntries) {
+            if (tsKvEntry == null) {
+                throw new IncorrectParameterException("Key value entry can't be null");
+            }
+            UUID uid = entityId.getId();
+            long partitionTs = toPartitionTs(tsKvEntry.getTs());
+            saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
+        }
+        return Futures.allAsList(futures);
+    }
+
+    @Override
+    public TsKvEntry convertResultToTsKvEntry(Row row) {
+        return timeseriesDao.convertResultToTsKvEntry(row);
+    }
+
+    @Override
+    public List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs) {
+        return timeseriesDao.convertResultToTsKvEntryList(rs.all());
+    }
+
+    private void saveAndRegisterFutures(List<ResultSetFuture> futures, String entityType, TsKvEntry tsKvEntry, UUID uid, long partitionTs) {
+        futures.add(timeseriesDao.savePartition(entityType, uid, partitionTs, tsKvEntry.getKey()));
+        futures.add(timeseriesDao.saveLatest(entityType, uid, tsKvEntry));
+        futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry));
+    }
+
+    private long toPartitionTs(long ts) {
+        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
+
+        LocalDateTime parititonTime = tsFormat.truncatedTo(time);
+
+        return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+    }
+
+    private static void validate(String entityType, UUIDBased entityId) {
+        Validator.validateString(entityType, "Incorrect entityType " + entityType);
+        Validator.validateId(entityId, "Incorrect entityId " + entityId);
+    }
+
+    private static void validate(TsKvQuery query) {
+        if (query == null) {
+            throw new IncorrectParameterException("TsKvQuery can't be null");
+        } else if (isBlank(query.getKey())) {
+            throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
+        }
+    }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
new file mode 100644
index 0000000..eae9ab4
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * @author Andrew Shvayka
+ */
+public interface TimeseriesDao {
+
+    List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
+
+    ResultSetFuture findLatest(String entityType, UUID entityId, String key);
+
+    ResultSetFuture findAllLatest(String entityType, UUID entityId);
+
+    ResultSetFuture save(String entityType, UUID entityId, long partition, TsKvEntry tsKvEntry);
+
+    ResultSetFuture savePartition(String entityType, UUID entityId, long partition, String key);
+
+    ResultSetFuture saveLatest(String entityType, UUID entityId, TsKvEntry tsKvEntry);
+
+    TsKvEntry convertResultToTsKvEntry(Row row);
+
+    List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows);
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
new file mode 100644
index 0000000..b165f79
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Andrew Shvayka
+ */
+public interface TimeseriesService {
+
+    //TODO: Replace this with async operation
+    List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query);
+
+    ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);
+
+    ResultSetFuture findAllLatest(String entityType, UUIDBased entityId);
+
+    ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, TsKvEntry tsKvEntry);
+
+    ListenableFuture<List<ResultSet>> save(String entityType, UUIDBased entityId, List<TsKvEntry> tsKvEntry);
+
+    TsKvEntry convertResultToTsKvEntry(Row row);
+
+    List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java
new file mode 100644
index 0000000..1a1637d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalUnit;
+import java.util.Optional;
+
+public enum TsPartitionDate {
+
+    MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS);
+
+    private final String pattern;
+    private final TemporalUnit truncateUnit;
+
+    TsPartitionDate(String pattern, TemporalUnit truncateUnit) {
+        this.pattern = pattern;
+        this.truncateUnit = truncateUnit;
+    }
+
+    public String getPattern() {
+        return pattern;
+    }
+
+    public TemporalUnit getTruncateUnit() {
+        return truncateUnit;
+    }
+
+    public LocalDateTime truncatedTo(LocalDateTime time) {
+        switch (this){
+            case MONTHS:
+                return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
+            case YEARS:
+                return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
+            default:
+                return time.truncatedTo(truncateUnit);
+        }
+    }
+
+    public static Optional<TsPartitionDate> parse(String name) {
+        TsPartitionDate partition = null;
+        if (name != null) {
+            for (TsPartitionDate partitionDate : TsPartitionDate.values()) {
+                if (partitionDate.name().equalsIgnoreCase(name)) {
+                    partition = partitionDate;
+                    break;
+                }
+            }
+        }
+        return Optional.of(partition);
+    }
+}