thingsboard-memoizeit

Changes

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6bb3917..7eef95a 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -251,6 +251,9 @@ spring:
     username: "${SPRING_DATASOURCE_USERNAME:sa}"
     password: "${SPRING_DATASOURCE_PASSWORD:}"
 
+rule:
+  queue:
+    msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}"
 
 # PostgreSQL DAO Configuration
 #spring:
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 876c9f7..05e6cfe 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -555,48 +555,45 @@ CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
     partition       bigint,
     ts              bigint,
     msg             blob,
-	PRIMARY KEY ((node_id, cluster_hash, partition), ts)
-	WITH CLUSTERING ORDER BY (ts DESC)
-	AND compaction = {
-        'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
-        'min_threshold': '5',
-        'base_time_seconds': '43200',
-        'max_window_size_seconds': '43200'
-        'tombstone_threshold': '0.9',
-        'unchecked_tombstone_compaction': 'true',
-    };
-);
+	PRIMARY KEY ((node_id, clustered_hash, partition), ts))
+WITH CLUSTERING ORDER BY (ts DESC)
+AND compaction = {
+    'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+    'min_threshold': '5',
+    'base_time_seconds': '43200',
+    'max_window_size_seconds': '43200',
+    'tombstone_threshold': '0.9',
+    'unchecked_tombstone_compaction': 'true'
+};
+
 
 CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
     node_id         timeuuid,
     clustered_hash    bigint,
     partition       bigint,
-    ts              bigint,
     msg_id              timeuuid,
-	PRIMARY KEY ((node_id, cluster_hash, partition), ts)
-	WITH CLUSTERING ORDER BY (ts DESC)
-	AND compaction = {
-        'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
-        'min_threshold': '5',
-        'base_time_seconds': '43200',
-        'max_window_size_seconds': '43200'
-        'tombstone_threshold': '0.9',
-        'unchecked_tombstone_compaction': 'true',
-    };
-);
+	PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
+WITH CLUSTERING ORDER BY (msg_id DESC)
+AND compaction = {
+    'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+    'min_threshold': '5',
+    'base_time_seconds': '43200',
+    'max_window_size_seconds': '43200',
+    'tombstone_threshold': '0.9',
+    'unchecked_tombstone_compaction': 'true'
+};
 
 CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
     node_id         timeuuid,
     clustered_hash    bigint,
     partition       bigint,
-	PRIMARY KEY ((node_id, cluster_hash), partition)
-	WITH CLUSTERING ORDER BY (partition DESC)
-	AND compaction = {
-        'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
-        'min_threshold': '5',
-        'base_time_seconds': '43200',
-        'max_window_size_seconds': '43200'
-        'tombstone_threshold': '0.9',
-        'unchecked_tombstone_compaction': 'true',
-    };
-);
\ No newline at end of file
+	PRIMARY KEY ((node_id, clustered_hash), partition))
+WITH CLUSTERING ORDER BY (partition DESC)
+AND compaction = {
+    'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+    'min_threshold': '5',
+    'base_time_seconds': '43200',
+    'max_window_size_seconds': '43200',
+    'tombstone_threshold': '0.9',
+    'unchecked_tombstone_compaction': 'true'
+};
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
index 92d3b8e..8dd4840 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
@@ -19,6 +19,7 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Created by ashvayka on 13.01.18.
@@ -26,7 +27,7 @@ import java.util.Map;
 @Data
 public final class TbMsgMetaData implements Serializable {
 
-    private Map<String, String> data;
+    private Map<String, String> data = new ConcurrentHashMap<>();
 
     public String getValue(String key) {
         return data.get(key);
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index ee0d83f..5ee1d94 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -63,5 +63,107 @@
             <artifactId>rule-engine-api</artifactId>
             <version>1.4.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-mapping</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-extras</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.cassandraunit</groupId>
+            <artifactId>cassandra-unit</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>RELEASE</version>
+        </dependency>
+
+        <!--<dependency>-->
+            <!--<groupId>org.springframework.boot</groupId>-->
+            <!--<artifactId>spring-boot-starter-web</artifactId>-->
+        <!--</dependency>-->
+
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+            </plugin>
+
+
+
+
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <mainClass>org.thingsboard.rule.engine.tool.QueueBenchmark</mainClass>
+                    <classifier>boot</classifier>
+                    <layout>ZIP</layout>
+                    <executable>true</executable>
+                    <excludeDevtools>true</excludeDevtools>
+                    <!--<embeddedLaunchScriptProperties>-->
+                        <!--<confFolder>${pkg.installFolder}/conf</confFolder>-->
+                        <!--<logFolder>${pkg.unixLogFolder}</logFolder>-->
+                        <!--<logFilename>${pkg.name}.out</logFilename>-->
+                        <!--<initInfoProvides>${pkg.name}</initInfoProvides>-->
+                    <!--</embeddedLaunchScriptProperties>-->
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+
 </project>
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
index 38ae627..4839761 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
@@ -15,68 +15,65 @@
  */
 package org.thingsboard.rule.engine.queue.cassandra;
 
+import com.datastax.driver.core.utils.UUIDs;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.springframework.beans.factory.annotation.Autowired;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.thingsboard.rule.engine.api.MsqQueue;
 import org.thingsboard.rule.engine.api.TbMsg;
 import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
 import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
-import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.common.data.UUIDConverter;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 
 @Component
+@Slf4j
 public class CassandraMsqQueue implements MsqQueue {
 
-    @Autowired
-    private MsgRepository msgRepository;
+    private final MsgRepository msgRepository;
+    private final AckRepository ackRepository;
+    private final UnprocessedMsgFilter unprocessedMsgFilter;
+    private final QueuePartitioner queuePartitioner;
 
-    @Autowired
-    private AckRepository ackRepository;
-
-    @Autowired
-    private AckBuilder ackBuilder;
-
-    @Autowired
-    private UnprocessedMsgFilter unprocessedMsgFilter;
+    public CassandraMsqQueue(MsgRepository msgRepository, AckRepository ackRepository,
+                             UnprocessedMsgFilter unprocessedMsgFilter, QueuePartitioner queuePartitioner) {
+        this.msgRepository = msgRepository;
+        this.ackRepository = ackRepository;
+        this.unprocessedMsgFilter = unprocessedMsgFilter;
+        this.queuePartitioner = queuePartitioner;
+    }
 
-    @Autowired
-    private ProcessedPartitionRepository processedPartitionRepository;
 
     @Override
     public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash) {
-        return msgRepository.save(msg, nodeId, clusteredHash, getPartition(msg));
+        long msgTime = getMsgTime(msg);
+        long partition = queuePartitioner.getPartition(msgTime);
+        return msgRepository.save(msg, nodeId, clusteredHash, partition, msgTime);
     }
 
     @Override
     public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash) {
-        MsgAck ack = ackBuilder.build(msg, nodeId, clusteredHash);
+        long partition = queuePartitioner.getPartition(getMsgTime(msg));
+        MsgAck ack = new MsgAck(msg.getId(), nodeId, clusteredHash, partition);
         return ackRepository.ack(ack);
     }
 
     @Override
     public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash) {
         List<TbMsg> unprocessedMsgs = Lists.newArrayList();
-        for (Long partition : findUnprocessedPartitions(nodeId, clusteredHash)) {
-            Iterable<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
-            Iterable<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
+        for (Long partition : queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash)) {
+            List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
+            List<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
             unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));
         }
         return unprocessedMsgs;
     }
 
-    private List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
-        Optional<Long> lastPartition = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
-        return Collections.emptyList();
-    }
-
-    private long getPartition(TbMsg msg) {
-        return Long.MIN_VALUE;
+    private long getMsgTime(TbMsg msg) {
+        return UUIDs.unixTimestamp(msg.getId());
     }
 
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
index ca8d820..c6885ad 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
@@ -15,17 +15,21 @@
  */
 package org.thingsboard.rule.engine.queue.cassandra;
 
+import com.datastax.driver.core.utils.UUIDs;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.server.common.data.UUIDConverter;
 
 import java.util.UUID;
 
 @Data
+@EqualsAndHashCode
 public class MsgAck {
 
     private final UUID msgId;
     private final UUID nodeId;
     private final long clusteredHash;
     private final long partition;
-    private final long ts;
 
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java
new file mode 100644
index 0000000..9a7886f
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.dao.timeseries.TsPartitionDate;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+@Component
+@Slf4j
+public class QueuePartitioner {
+
+    private ProcessedPartitionRepository processedPartitionRepository;
+
+    private final TsPartitionDate tsFormat;
+    private Clock clock = Clock.systemUTC();
+
+    public QueuePartitioner(@Value("${rule.queue.msg_partitioning}") String partitioning,
+                            ProcessedPartitionRepository processedPartitionRepository) {
+        this.processedPartitionRepository = processedPartitionRepository;
+        Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
+        if (partition.isPresent()) {
+            tsFormat = partition.get();
+        } else {
+            log.warn("Incorrect configuration of partitioning {}", "MINUTES");
+            throw new RuntimeException("Failed to parse partitioning property: " + "MINUTES" + "!");
+        }
+    }
+
+    public long getPartition(long ts) {
+        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
+        return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
+    }
+
+    public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
+        Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
+        long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100);
+        List<Long> unprocessedPartitions = Lists.newArrayList();
+
+        LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);
+        LocalDateTime end = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC)
+                .plus(1L, tsFormat.getTruncateUnit());
+
+        while (current.isBefore(end)) {
+            current = current.plus(1L, tsFormat.getTruncateUnit());
+            unprocessedPartitions.add(tsFormat.truncatedTo(current).toInstant(ZoneOffset.UTC).toEpochMilli());
+        }
+
+        return unprocessedPartitions;
+    }
+
+    public void setClock(Clock clock) {
+        this.clock = clock;
+    }
+
+    public void checkProcessedPartitions() {
+        //todo-vp: we need to implement this
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
index 3d9b55f..40c0416 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
@@ -18,11 +18,12 @@ package org.thingsboard.rule.engine.queue.cassandra.repository;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
 
+import java.util.List;
 import java.util.UUID;
 
 public interface AckRepository {
 
     ListenableFuture<Void> ack(MsgAck msgAck);
 
-    Iterable<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
+    List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java
new file mode 100644
index 0000000..57dc79c
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@Component
+public class CassandraAckRepository extends SimpleAbstractCassandraDao implements AckRepository {
+
+    private final int ackQueueTtl;
+
+    public CassandraAckRepository(Session session, int ackQueueTtl) {
+        super(session);
+        this.ackQueueTtl = ackQueueTtl;
+    }
+
+    @Override
+    public ListenableFuture<Void> ack(MsgAck msgAck) {
+        String insert = "INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
+        PreparedStatement statement = prepare(insert);
+        BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredHash(),
+                msgAck.getPartition(), msgAck.getMsgId(), ackQueueTtl);
+        ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+        return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
+    }
+
+    @Override
+    public List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition) {
+        String select = "SELECT msg_id FROM msg_ack_queue WHERE " +
+                "node_id = ? AND clustered_hash = ? AND partition = ?";
+        PreparedStatement statement = prepare(select);
+        BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
+        ResultSet rows = executeRead(boundStatement);
+        List<MsgAck> msgs = new ArrayList<>();
+        for (Row row : rows) {
+            msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusteredHash, partition));
+        }
+        return msgs;
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java
new file mode 100644
index 0000000..bd3ef21
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.api.TbMsgMetaData;
+import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
+import org.thingsboard.rule.engine.queue.cassandra.repository.gen.MsgQueueProtos;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@Component
+public class CassandraMsgRepository extends SimpleAbstractCassandraDao implements MsgRepository {
+
+    private final int msqQueueTtl;
+
+
+    public CassandraMsgRepository(Session session, int msqQueueTtl) {
+        super(session);
+        this.msqQueueTtl = msqQueueTtl;
+    }
+
+    @Override
+    public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs) {
+        String insert = "INSERT INTO msg_queue (node_id, clustered_hash, partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";
+        PreparedStatement statement = prepare(insert);
+        BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition, msgTs, toBytes(msg), msqQueueTtl);
+        ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+        return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
+    }
+
+    @Override
+    public List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition) {
+        String select = "SELECT node_id, clustered_hash, partition, ts, msg FROM msg_queue WHERE " +
+                "node_id = ? AND clustered_hash = ? AND partition = ?";
+        PreparedStatement statement = prepare(select);
+        BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
+        ResultSet rows = executeRead(boundStatement);
+        List<TbMsg> msgs = new ArrayList<>();
+        for (Row row : rows) {
+            msgs.add(fromBytes(row.getBytes("msg")));
+        }
+        return msgs;
+    }
+
+    private ByteBuffer toBytes(TbMsg msg) {
+        MsgQueueProtos.TbMsgProto.Builder builder = MsgQueueProtos.TbMsgProto.newBuilder();
+        builder.setId(msg.getId().toString());
+        builder.setType(msg.getType());
+        if (msg.getOriginator() != null) {
+            builder.setEntityType(msg.getOriginator().getEntityType().name());
+            builder.setEntityId(msg.getOriginator().getId().toString());
+        }
+
+        if (msg.getMetaData() != null) {
+            MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
+            metadataBuilder.putAllData(msg.getMetaData().getData());
+            builder.addMetaData(metadataBuilder.build());
+        }
+
+        builder.setData(ByteString.copyFrom(msg.getData()));
+        byte[] bytes = builder.build().toByteArray();
+        return ByteBuffer.wrap(bytes);
+    }
+
+    private TbMsg fromBytes(ByteBuffer buffer) {
+        try {
+            MsgQueueProtos.TbMsgProto proto = MsgQueueProtos.TbMsgProto.parseFrom(buffer.array());
+            TbMsgMetaData metaData = new TbMsgMetaData();
+            if (proto.getMetaDataCount() > 0) {
+                metaData.setData(proto.getMetaData(0).getDataMap());
+            }
+
+            EntityId entityId = null;
+            if (proto.getEntityId() != null) {
+                entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+            }
+
+            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
+        } catch (InvalidProtocolBufferException e) {
+            throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
+        }
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
new file mode 100644
index 0000000..7fc15d8
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+
+import java.util.Optional;
+import java.util.UUID;
+
+@Component
+public class CassandraProcessedPartitionRepository extends SimpleAbstractCassandraDao implements ProcessedPartitionRepository {
+
+    private final int repositoryTtl;
+
+    public CassandraProcessedPartitionRepository(Session session, int repositoryTtl) {
+        super(session);
+        this.repositoryTtl = repositoryTtl;
+    }
+
+    @Override
+    public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition) {
+        String insert = "INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) VALUES (?, ?, ?) USING TTL ?";
+        PreparedStatement prepared = prepare(insert);
+        BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash, partition, repositoryTtl);
+        ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+        return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
+    }
+
+    @Override
+    public Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash) {
+        String select = "SELECT partition FROM processed_msg_partitions WHERE " +
+                "node_id = ? AND clustered_hash = ?";
+        PreparedStatement prepared = prepare(select);
+        BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);
+        Row row = executeRead(boundStatement).one();
+        if (row == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(row.getLong("partition"));
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java
new file mode 100644
index 0000000..8f01c18
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+@Slf4j
+public abstract class SimpleAbstractCassandraDao {
+
+    private ConsistencyLevel defaultReadLevel = ConsistencyLevel.QUORUM;
+    private ConsistencyLevel defaultWriteLevel = ConsistencyLevel.QUORUM;
+    private Session session;
+    private Map<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
+
+    public SimpleAbstractCassandraDao(Session session) {
+        this.session = session;
+    }
+
+    protected Session getSession() {
+        return session;
+    }
+
+    protected ResultSet executeRead(Statement statement) {
+        return execute(statement, defaultReadLevel);
+    }
+
+    protected ResultSet executeWrite(Statement statement) {
+        return execute(statement, defaultWriteLevel);
+    }
+
+    protected ResultSetFuture executeAsyncRead(Statement statement) {
+        return executeAsync(statement, defaultReadLevel);
+    }
+
+    protected ResultSetFuture executeAsyncWrite(Statement statement) {
+        return executeAsync(statement, defaultWriteLevel);
+    }
+
+    protected PreparedStatement prepare(String query) {
+        return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
+    }
+
+    private ResultSet execute(Statement statement, ConsistencyLevel level) {
+        log.debug("Execute cassandra statement {}", statement);
+        if (statement.getConsistencyLevel() == null) {
+            statement.setConsistencyLevel(level);
+        }
+        return getSession().execute(statement);
+    }
+
+    private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
+        log.debug("Execute cassandra async statement {}", statement);
+        if (statement.getConsistencyLevel() == null) {
+            statement.setConsistencyLevel(level);
+        }
+        return getSession().executeAsync(statement);
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
index 57e501e..5d34d84 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
@@ -18,12 +18,13 @@ package org.thingsboard.rule.engine.queue.cassandra.repository;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.rule.engine.api.TbMsg;
 
+import java.util.List;
 import java.util.UUID;
 
 public interface MsgRepository {
 
-    ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition);
+    ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs);
 
-    Iterable<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
+    List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
 
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
index bc29050..807c001 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
@@ -1,11 +1,28 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.rule.engine.queue.cassandra.repository;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.Optional;
 import java.util.UUID;
 
 public interface ProcessedPartitionRepository {
 
-    void partitionProcessed(UUID nodeId, long clusteredHash, long partition);
+    ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition);
 
     Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash);
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
index 99e9a92..e114a85 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
@@ -15,14 +15,20 @@
  */
 package org.thingsboard.rule.engine.queue.cassandra;
 
+import org.springframework.stereotype.Component;
 import org.thingsboard.rule.engine.api.TbMsg;
 
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
+@Component
 public class UnprocessedMsgFilter {
 
-    public Collection<TbMsg> filter(Iterable<TbMsg> msgs, Iterable<MsgAck> acks) {
-        return Collections.emptyList();
+    public Collection<TbMsg> filter(List<TbMsg> msgs, List<MsgAck> acks) {
+        Set<UUID> processedIds = acks.stream().map(MsgAck::getMsgId).collect(Collectors.toSet());
+        return msgs.stream().filter(i -> !processedIds.contains(i.getId())).collect(Collectors.toList());
     }
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java
new file mode 100644
index 0000000..58510d7
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java
@@ -0,0 +1,136 @@
+package org.thingsboard.rule.engine.tool;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.thingsboard.rule.engine.api.MsqQueue;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.api.TbMsgMetaData;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+@SpringBootApplication
+@EnableAutoConfiguration
+@ComponentScan({"org.thingsboard.rule.engine"})
+//@PropertySource("classpath:processing-pipeline.properties")
+@Slf4j
+public class QueueBenchmark implements CommandLineRunner {
+
+    public static void main(String[] args) {
+        try {
+            SpringApplication.run(QueueBenchmark.class, args);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            System.exit(0);
+        }
+    }
+
+    @Autowired
+    private MsqQueue msqQueue;
+
+    @Override
+    public void run(String... strings) throws Exception {
+        System.out.println("It works + " + msqQueue);
+
+
+        long start = System.currentTimeMillis();
+        int msgCount = 10000000;
+        AtomicLong count = new AtomicLong(0);
+        ExecutorService service = Executors.newFixedThreadPool(100);
+
+        CountDownLatch latch = new CountDownLatch(msgCount);
+        for (int i = 0; i < msgCount; i++) {
+            service.submit(() -> {
+                boolean isFinished = false;
+                while (!isFinished) {
+                    try {
+                        TbMsg msg = randomMsg();
+                        UUID nodeId = UUIDs.timeBased();
+                        ListenableFuture<Void> put = msqQueue.put(msg, nodeId, 100L);
+//                    ListenableFuture<Void> put = msqQueue.ack(msg, nodeId, 100L);
+                        Futures.addCallback(put, new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(@Nullable Void result) {
+                                latch.countDown();
+                            }
+
+                            @Override
+                            public void onFailure(Throwable t) {
+//                                t.printStackTrace();
+                                System.out.println("onFailure, because:" + t.getMessage());
+                                latch.countDown();
+                            }
+                        });
+                        isFinished = true;
+                    } catch (Throwable th) {
+//                        th.printStackTrace();
+                        System.out.println("Repeat query, because:" + th.getMessage());
+//                        latch.countDown();
+                    }
+                }
+            });
+        }
+
+        long prev = 0L;
+        while (latch.getCount() != 0) {
+            TimeUnit.SECONDS.sleep(1);
+            long curr = latch.getCount();
+            long rps = prev - curr;
+            prev = curr;
+            System.out.println("rps = " + rps);
+        }
+
+        long end = System.currentTimeMillis();
+        System.out.println("final rps = " + (msgCount / (end - start) * 1000));
+
+        System.out.println("Finished");
+
+    }
+
+    private TbMsg randomMsg() {
+        TbMsgMetaData metaData = new TbMsgMetaData();
+        metaData.putValue("key", "value");
+        String dataStr = "someContent";
+        return new TbMsg(UUIDs.timeBased(), "type", null, metaData, dataStr.getBytes());
+    }
+
+    @Bean
+    public Session session() {
+        Cluster thingsboard = Cluster.builder()
+                .addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042))
+                .withClusterName("thingsboard")
+//                .withSocketOptions(socketOpts.getOpts())
+                .withPoolingOptions(new PoolingOptions()
+                        .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
+                        .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)).build();
+
+        Session session = thingsboard.connect("thingsboard");
+        return session;
+    }
+
+    @Bean
+    public int defaultTtl() {
+        return 6000;
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto b/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto
new file mode 100644
index 0000000..1105aeb
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+package msgqueue;
+
+option java_package = "org.thingsboard.rule.engine.queue.cassandra.repository.gen";
+option java_outer_classname = "MsgQueueProtos";
+
+
+message TbMsgProto {
+    string id = 1;
+    string type = 2;
+    string entityType = 3;
+    string entityId = 4;
+
+    message TbMsgMetaDataProto {
+        map<string, string> data = 1;
+    }
+
+    repeated TbMsgMetaDataProto metaData = 5;
+
+    bytes data = 6;
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java
new file mode 100644
index 0000000..8ce3355
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
+import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
+
+public class CassandraMsqQueueTest {
+
+    private CassandraMsqQueue msqQueue;
+
+    @Mock
+    private MsgRepository msgRepository;
+    @Mock
+    private AckRepository ackRepository;
+    @Mock
+    private UnprocessedMsgFilter unprocessedMsgFilter;
+    @Mock
+    private QueuePartitioner queuePartitioner;
+
+    @Before
+    public void init() {
+        msqQueue = new CassandraMsqQueue(msgRepository, ackRepository, unprocessedMsgFilter, queuePartitioner);
+    }
+
+    @Test
+    public void msgCanBeSaved() {
+//        todo-vp: implement
+    }
+
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java
new file mode 100644
index 0000000..a71a737
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueuePartitionerTest {
+
+    private QueuePartitioner queuePartitioner;
+
+    @Mock
+    private ProcessedPartitionRepository partitionRepo;
+
+    private Instant startInstant;
+    private Instant endInstant;
+
+    @Before
+    public void init() {
+        queuePartitioner = new QueuePartitioner("MINUTES", partitionRepo);
+        startInstant = Instant.now();
+        endInstant = startInstant.plus(2, ChronoUnit.MINUTES);
+        queuePartitioner.setClock(Clock.fixed(endInstant, ZoneOffset.UTC));
+    }
+
+    @Test
+    public void partitionCalculated() {
+        long time = 1519390191425L;
+        long partition = queuePartitioner.getPartition(time);
+        assertEquals(1519390140000L, partition);
+    }
+
+    @Test
+    public void unprocessedPartitionsReturned() {
+        UUID nodeId = UUID.randomUUID();
+        long clusteredHash = 101L;
+        when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.of(startInstant.toEpochMilli()));
+        List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
+        assertEquals(3, actual.size());
+    }
+
+    @Test
+    public void defaultShiftUsedIfNoPartitionWasProcessed() {
+        UUID nodeId = UUID.randomUUID();
+        long clusteredHash = 101L;
+        when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());
+        List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
+        assertEquals(1011, actual.size());
+    }
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
new file mode 100644
index 0000000..38b7b9d
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraAckRepositoryTest extends SimpleAbstractCassandraDaoTest {
+
+    private CassandraAckRepository ackRepository;
+
+    @Before
+    public void init() {
+        ackRepository = new CassandraAckRepository(cassandraUnit.session, 1);
+    }
+
+    @Test
+    public void acksInPartitionCouldBeFound() {
+        UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
+
+        List<MsgAck> extectedAcks = Lists.newArrayList(
+                new MsgAck(UUID.fromString("bebaeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L),
+                new MsgAck(UUID.fromString("12baeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L)
+        );
+
+        List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 101L, 300L);
+        assertEquals(extectedAcks, actualAcks);
+    }
+
+    @Test
+    public void ackCanBeSavedAndRead() throws ExecutionException, InterruptedException {
+        UUID msgId = UUIDs.timeBased();
+        UUID nodeId = UUIDs.timeBased();
+        MsgAck ack = new MsgAck(msgId, nodeId, 10L, 20L);
+        ListenableFuture<Void> future = ackRepository.ack(ack);
+        future.get();
+        List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 10L, 20L);
+        assertEquals(1, actualAcks.size());
+        assertEquals(ack, actualAcks.get(0));
+    }
+
+    @Test
+    public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {
+        UUID msgId = UUIDs.timeBased();
+        UUID nodeId = UUIDs.timeBased();
+        MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);
+        ListenableFuture<Void> future = ackRepository.ack(ack);
+        future.get();
+        List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 30L, 40L);
+        assertEquals(1, actualAcks.size());
+        TimeUnit.SECONDS.sleep(2);
+        assertTrue(ackRepository.findAcks(nodeId, 30L, 40L).isEmpty());
+    }
+
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
new file mode 100644
index 0000000..a0827fc
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+//import static org.junit.jupiter.api.Assertions.*;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.api.TbMsgMetaData;
+import org.thingsboard.server.common.data.id.DeviceId;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
+
+    private CassandraMsgRepository msgRepository;
+
+    @Before
+    public void init() {
+        msgRepository = new CassandraMsgRepository(cassandraUnit.session, 1);
+    }
+
+    @Test
+    public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+        UUID nodeId = UUIDs.timeBased();
+        ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
+        future.get();
+        List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);
+        assertEquals(1, msgs.size());
+    }
+
+    @Test
+    public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+        UUID nodeId = UUIDs.timeBased();
+        ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
+        future.get();
+        List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 2L, 2L);
+        assertEquals(1, msgs.size());
+        TimeUnit.SECONDS.sleep(2);
+        assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());
+    }
+
+    @Test
+    public void protoBufConverterWorkAsExpected() throws ExecutionException, InterruptedException {
+        TbMsgMetaData metaData = new TbMsgMetaData();
+        metaData.putValue("key", "value");
+        String dataStr = "someContent";
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, dataStr.getBytes());
+        UUID nodeId = UUIDs.timeBased();
+        ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
+        future.get();
+        List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);
+        assertEquals(1, msgs.size());
+        assertEquals(msg, msgs.get(0));
+    }
+
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
new file mode 100644
index 0000000..a731452
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class CassandraProcessedPartitionRepositoryTest extends SimpleAbstractCassandraDaoTest {
+
+    private CassandraProcessedPartitionRepository partitionRepository;
+
+    @Before
+    public void init() {
+        partitionRepository = new CassandraProcessedPartitionRepository(cassandraUnit.session, 1);
+    }
+
+    @Test
+    public void lastProcessedPartitionCouldBeFound() {
+        UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
+        Optional<Long> lastProcessedPartition = partitionRepository.findLastProcessedPartition(nodeId, 101L);
+        assertTrue(lastProcessedPartition.isPresent());
+        assertEquals((Long) 777L, lastProcessedPartition.get());
+    }
+
+    @Test
+    public void highestProcessedPartitionReturned() throws ExecutionException, InterruptedException {
+        UUID nodeId = UUIDs.timeBased();
+        ListenableFuture<Void> future1 = partitionRepository.partitionProcessed(nodeId, 303L, 100L);
+        ListenableFuture<Void> future2 = partitionRepository.partitionProcessed(nodeId, 303L, 200L);
+        ListenableFuture<Void> future3 = partitionRepository.partitionProcessed(nodeId, 303L, 10L);
+        ListenableFuture<List<Void>> allFutures = Futures.allAsList(future1, future2, future3);
+        allFutures.get();
+        Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 303L);
+        assertTrue(actual.isPresent());
+        assertEquals((Long) 200L, actual.get());
+    }
+
+    @Test
+    public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {
+        UUID nodeId = UUIDs.timeBased();
+        ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);
+        future.get();
+        Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 404L);
+        assertEquals((Long) 10L, actual.get());
+        TimeUnit.SECONDS.sleep(2);
+        assertFalse(partitionRepository.findLastProcessedPartition(nodeId, 404L).isPresent());
+    }
+
+    @Test
+    public void ifNoPartitionsWereProcessedEmptyResultReturned() {
+        UUID nodeId = UUIDs.timeBased();
+        Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 505L);
+        assertFalse(actual.isPresent());
+    }
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java
new file mode 100644
index 0000000..dec8e35
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.thingsboard.rule.engine.api.TbMsg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class UnprocessedMsgFilterTest {
+
+    private UnprocessedMsgFilter msgFilter = new UnprocessedMsgFilter();
+
+    @Test
+    public void acknowledgedMsgsAreFilteredOut() {
+        UUID id1 = UUID.randomUUID();
+        UUID id2 = UUID.randomUUID();
+        TbMsg msg1 = new TbMsg(id1, "T", null, null, null);
+        TbMsg msg2 = new TbMsg(id2, "T", null, null, null);
+        List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
+        List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
+        Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
+        assertEquals(1, actual.size());
+        assertEquals(msg1, actual.iterator().next());
+    }
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql b/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql
new file mode 100644
index 0000000..7ba9f26
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql
@@ -0,0 +1,75 @@
+CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
+    node_id         timeuuid,
+    clustered_hash    bigint,
+    partition       bigint,
+    ts              bigint,
+    msg             blob,
+	PRIMARY KEY ((node_id, clustered_hash, partition), ts))
+WITH CLUSTERING ORDER BY (ts DESC)
+AND compaction = {
+    'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+    'min_threshold': '5',
+    'base_time_seconds': '43200',
+    'max_window_size_seconds': '43200',
+    'tombstone_threshold': '0.9',
+    'unchecked_tombstone_compaction': 'true'
+};
+
+
+CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
+    node_id         timeuuid,
+    clustered_hash    bigint,
+    partition       bigint,
+    msg_id              timeuuid,
+	PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
+WITH CLUSTERING ORDER BY (msg_id DESC)
+AND compaction = {
+    'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+    'min_threshold': '5',
+    'base_time_seconds': '43200',
+    'max_window_size_seconds': '43200',
+    'tombstone_threshold': '0.9',
+    'unchecked_tombstone_compaction': 'true'
+};
+
+CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
+    node_id         timeuuid,
+    clustered_hash    bigint,
+    partition       bigint,
+	PRIMARY KEY ((node_id, clustered_hash), partition))
+WITH CLUSTERING ORDER BY (partition DESC)
+AND compaction = {
+    'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+    'min_threshold': '5',
+    'base_time_seconds': '43200',
+    'max_window_size_seconds': '43200',
+    'tombstone_threshold': '0.9',
+    'unchecked_tombstone_compaction': 'true'
+};
+
+
+
+-- msg_queue dataset
+
+INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null);
+INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null);
+INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null);
+
+-- ack_queue dataset
+INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9);
+INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9);
+    INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
+        VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9);
+
+-- processed partition dataset
+INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100);
+INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777);
+INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
+    VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200);
\ No newline at end of file