thingsboard-memoizeit
Changes
dao/src/main/resources/cassandra/schema.cql 65(+31 -34)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java 3(+2 -1)
rule-engine/rule-engine-components/pom.xml 102(+102 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java 53(+25 -28)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java 6(+5 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java 83(+83 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java 64(+64 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java 109(+109 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java 60(+60 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java 77(+77 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java 5(+3 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java 19(+18 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java 12(+9 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java 136(+136 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java 48(+48 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java 81(+81 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java 81(+81 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java 82(+82 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java 80(+80 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDaoTest.java 21(+11 -10)
Details
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6bb3917..7eef95a 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -251,6 +251,9 @@ spring:
username: "${SPRING_DATASOURCE_USERNAME:sa}"
password: "${SPRING_DATASOURCE_PASSWORD:}"
+rule:
+ queue:
+ msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}"
# PostgreSQL DAO Configuration
#spring:
dao/src/main/resources/cassandra/schema.cql 65(+31 -34)
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 876c9f7..05e6cfe 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -555,48 +555,45 @@ CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
partition bigint,
ts bigint,
msg blob,
- PRIMARY KEY ((node_id, cluster_hash, partition), ts)
- WITH CLUSTERING ORDER BY (ts DESC)
- AND compaction = {
- 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
- 'min_threshold': '5',
- 'base_time_seconds': '43200',
- 'max_window_size_seconds': '43200'
- 'tombstone_threshold': '0.9',
- 'unchecked_tombstone_compaction': 'true',
- };
-);
+ PRIMARY KEY ((node_id, clustered_hash, partition), ts))
+WITH CLUSTERING ORDER BY (ts DESC)
+AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200',
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true'
+};
+
CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
- ts bigint,
msg_id timeuuid,
- PRIMARY KEY ((node_id, cluster_hash, partition), ts)
- WITH CLUSTERING ORDER BY (ts DESC)
- AND compaction = {
- 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
- 'min_threshold': '5',
- 'base_time_seconds': '43200',
- 'max_window_size_seconds': '43200'
- 'tombstone_threshold': '0.9',
- 'unchecked_tombstone_compaction': 'true',
- };
-);
+ PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
+WITH CLUSTERING ORDER BY (msg_id DESC)
+AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200',
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true'
+};
CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
node_id timeuuid,
clustered_hash bigint,
partition bigint,
- PRIMARY KEY ((node_id, cluster_hash), partition)
- WITH CLUSTERING ORDER BY (partition DESC)
- AND compaction = {
- 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
- 'min_threshold': '5',
- 'base_time_seconds': '43200',
- 'max_window_size_seconds': '43200'
- 'tombstone_threshold': '0.9',
- 'unchecked_tombstone_compaction': 'true',
- };
-);
\ No newline at end of file
+ PRIMARY KEY ((node_id, clustered_hash), partition))
+WITH CLUSTERING ORDER BY (partition DESC)
+AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200',
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true'
+};
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
index 92d3b8e..8dd4840 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
@@ -19,6 +19,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Created by ashvayka on 13.01.18.
@@ -26,7 +27,7 @@ import java.util.Map;
@Data
public final class TbMsgMetaData implements Serializable {
- private Map<String, String> data;
+ private Map<String, String> data = new ConcurrentHashMap<>();
public String getValue(String key) {
return data.get(key);
rule-engine/rule-engine-components/pom.xml 102(+102 -0)
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index ee0d83f..5ee1d94 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -63,5 +63,107 @@
<artifactId>rule-engine-api</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-mapping</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-extras</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.cassandraunit</groupId>
+ <artifactId>cassandra-unit</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>RELEASE</version>
+ </dependency>
+
+ <!--<dependency>-->
+ <!--<groupId>org.springframework.boot</groupId>-->
+ <!--<artifactId>spring-boot-starter-web</artifactId>-->
+ <!--</dependency>-->
+
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+
+
+
+
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <mainClass>org.thingsboard.rule.engine.tool.QueueBenchmark</mainClass>
+ <classifier>boot</classifier>
+ <layout>ZIP</layout>
+ <executable>true</executable>
+ <excludeDevtools>true</excludeDevtools>
+ <!--<embeddedLaunchScriptProperties>-->
+ <!--<confFolder>${pkg.installFolder}/conf</confFolder>-->
+ <!--<logFolder>${pkg.unixLogFolder}</logFolder>-->
+ <!--<logFilename>${pkg.name}.out</logFilename>-->
+ <!--<initInfoProvides>${pkg.name}</initInfoProvides>-->
+ <!--</embeddedLaunchScriptProperties>-->
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
</project>
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
index 38ae627..4839761 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
@@ -15,68 +15,65 @@
*/
package org.thingsboard.rule.engine.queue.cassandra;
+import com.datastax.driver.core.utils.UUIDs;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-import org.springframework.beans.factory.annotation.Autowired;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.MsqQueue;
import org.thingsboard.rule.engine.api.TbMsg;
import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
-import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.common.data.UUIDConverter;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
@Component
+@Slf4j
public class CassandraMsqQueue implements MsqQueue {
- @Autowired
- private MsgRepository msgRepository;
+ private final MsgRepository msgRepository;
+ private final AckRepository ackRepository;
+ private final UnprocessedMsgFilter unprocessedMsgFilter;
+ private final QueuePartitioner queuePartitioner;
- @Autowired
- private AckRepository ackRepository;
-
- @Autowired
- private AckBuilder ackBuilder;
-
- @Autowired
- private UnprocessedMsgFilter unprocessedMsgFilter;
+ public CassandraMsqQueue(MsgRepository msgRepository, AckRepository ackRepository,
+ UnprocessedMsgFilter unprocessedMsgFilter, QueuePartitioner queuePartitioner) {
+ this.msgRepository = msgRepository;
+ this.ackRepository = ackRepository;
+ this.unprocessedMsgFilter = unprocessedMsgFilter;
+ this.queuePartitioner = queuePartitioner;
+ }
- @Autowired
- private ProcessedPartitionRepository processedPartitionRepository;
@Override
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash) {
- return msgRepository.save(msg, nodeId, clusteredHash, getPartition(msg));
+ long msgTime = getMsgTime(msg);
+ long partition = queuePartitioner.getPartition(msgTime);
+ return msgRepository.save(msg, nodeId, clusteredHash, partition, msgTime);
}
@Override
public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash) {
- MsgAck ack = ackBuilder.build(msg, nodeId, clusteredHash);
+ long partition = queuePartitioner.getPartition(getMsgTime(msg));
+ MsgAck ack = new MsgAck(msg.getId(), nodeId, clusteredHash, partition);
return ackRepository.ack(ack);
}
@Override
public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash) {
List<TbMsg> unprocessedMsgs = Lists.newArrayList();
- for (Long partition : findUnprocessedPartitions(nodeId, clusteredHash)) {
- Iterable<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
- Iterable<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
+ for (Long partition : queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash)) {
+ List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
+ List<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));
}
return unprocessedMsgs;
}
- private List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
- Optional<Long> lastPartition = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
- return Collections.emptyList();
- }
-
- private long getPartition(TbMsg msg) {
- return Long.MIN_VALUE;
+ private long getMsgTime(TbMsg msg) {
+ return UUIDs.unixTimestamp(msg.getId());
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
index ca8d820..c6885ad 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
@@ -15,17 +15,21 @@
*/
package org.thingsboard.rule.engine.queue.cassandra;
+import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.server.common.data.UUIDConverter;
import java.util.UUID;
@Data
+@EqualsAndHashCode
public class MsgAck {
private final UUID msgId;
private final UUID nodeId;
private final long clusteredHash;
private final long partition;
- private final long ts;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java
new file mode 100644
index 0000000..9a7886f
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+import org.thingsboard.server.dao.timeseries.TsPartitionDate;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+@Component
+@Slf4j
+public class QueuePartitioner {
+
+ private ProcessedPartitionRepository processedPartitionRepository;
+
+ private final TsPartitionDate tsFormat;
+ private Clock clock = Clock.systemUTC();
+
+ public QueuePartitioner(@Value("${rule.queue.msg_partitioning}") String partitioning,
+ ProcessedPartitionRepository processedPartitionRepository) {
+ this.processedPartitionRepository = processedPartitionRepository;
+ Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
+ if (partition.isPresent()) {
+ tsFormat = partition.get();
+ } else {
+ log.warn("Incorrect configuration of partitioning {}", "MINUTES");
+ throw new RuntimeException("Failed to parse partitioning property: " + "MINUTES" + "!");
+ }
+ }
+
+ public long getPartition(long ts) {
+ LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
+ return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
+ }
+
+ public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
+ Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
+ long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100);
+ List<Long> unprocessedPartitions = Lists.newArrayList();
+
+ LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);
+ LocalDateTime end = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC)
+ .plus(1L, tsFormat.getTruncateUnit());
+
+ while (current.isBefore(end)) {
+ current = current.plus(1L, tsFormat.getTruncateUnit());
+ unprocessedPartitions.add(tsFormat.truncatedTo(current).toInstant(ZoneOffset.UTC).toEpochMilli());
+ }
+
+ return unprocessedPartitions;
+ }
+
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ public void checkProcessedPartitions() {
+ //todo-vp: we need to implement this
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
index 3d9b55f..40c0416 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
@@ -18,11 +18,12 @@ package org.thingsboard.rule.engine.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+import java.util.List;
import java.util.UUID;
public interface AckRepository {
ListenableFuture<Void> ack(MsgAck msgAck);
- Iterable<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
+ List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java
new file mode 100644
index 0000000..57dc79c
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@Component
+public class CassandraAckRepository extends SimpleAbstractCassandraDao implements AckRepository {
+
+ private final int ackQueueTtl;
+
+ public CassandraAckRepository(Session session, int ackQueueTtl) {
+ super(session);
+ this.ackQueueTtl = ackQueueTtl;
+ }
+
+ @Override
+ public ListenableFuture<Void> ack(MsgAck msgAck) {
+ String insert = "INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";
+ PreparedStatement statement = prepare(insert);
+ BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredHash(),
+ msgAck.getPartition(), msgAck.getMsgId(), ackQueueTtl);
+ ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+ return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
+ }
+
+ @Override
+ public List<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition) {
+ String select = "SELECT msg_id FROM msg_ack_queue WHERE " +
+ "node_id = ? AND clustered_hash = ? AND partition = ?";
+ PreparedStatement statement = prepare(select);
+ BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
+ ResultSet rows = executeRead(boundStatement);
+ List<MsgAck> msgs = new ArrayList<>();
+ for (Row row : rows) {
+ msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusteredHash, partition));
+ }
+ return msgs;
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java
new file mode 100644
index 0000000..bd3ef21
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.api.TbMsgMetaData;
+import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
+import org.thingsboard.rule.engine.queue.cassandra.repository.gen.MsgQueueProtos;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@Component
+public class CassandraMsgRepository extends SimpleAbstractCassandraDao implements MsgRepository {
+
+ private final int msqQueueTtl;
+
+
+ public CassandraMsgRepository(Session session, int msqQueueTtl) {
+ super(session);
+ this.msqQueueTtl = msqQueueTtl;
+ }
+
+ @Override
+ public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs) {
+ String insert = "INSERT INTO msg_queue (node_id, clustered_hash, partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";
+ PreparedStatement statement = prepare(insert);
+ BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition, msgTs, toBytes(msg), msqQueueTtl);
+ ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+ return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
+ }
+
+ @Override
+ public List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition) {
+ String select = "SELECT node_id, clustered_hash, partition, ts, msg FROM msg_queue WHERE " +
+ "node_id = ? AND clustered_hash = ? AND partition = ?";
+ PreparedStatement statement = prepare(select);
+ BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition);
+ ResultSet rows = executeRead(boundStatement);
+ List<TbMsg> msgs = new ArrayList<>();
+ for (Row row : rows) {
+ msgs.add(fromBytes(row.getBytes("msg")));
+ }
+ return msgs;
+ }
+
+ private ByteBuffer toBytes(TbMsg msg) {
+ MsgQueueProtos.TbMsgProto.Builder builder = MsgQueueProtos.TbMsgProto.newBuilder();
+ builder.setId(msg.getId().toString());
+ builder.setType(msg.getType());
+ if (msg.getOriginator() != null) {
+ builder.setEntityType(msg.getOriginator().getEntityType().name());
+ builder.setEntityId(msg.getOriginator().getId().toString());
+ }
+
+ if (msg.getMetaData() != null) {
+ MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
+ metadataBuilder.putAllData(msg.getMetaData().getData());
+ builder.addMetaData(metadataBuilder.build());
+ }
+
+ builder.setData(ByteString.copyFrom(msg.getData()));
+ byte[] bytes = builder.build().toByteArray();
+ return ByteBuffer.wrap(bytes);
+ }
+
+ private TbMsg fromBytes(ByteBuffer buffer) {
+ try {
+ MsgQueueProtos.TbMsgProto proto = MsgQueueProtos.TbMsgProto.parseFrom(buffer.array());
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ if (proto.getMetaDataCount() > 0) {
+ metaData.setData(proto.getMetaData(0).getDataMap());
+ }
+
+ EntityId entityId = null;
+ if (proto.getEntityId() != null) {
+ entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+ }
+
+ return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
+ }
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
new file mode 100644
index 0000000..7fc15d8
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+
+import java.util.Optional;
+import java.util.UUID;
+
+@Component
+public class CassandraProcessedPartitionRepository extends SimpleAbstractCassandraDao implements ProcessedPartitionRepository {
+
+ private final int repositoryTtl;
+
+ public CassandraProcessedPartitionRepository(Session session, int repositoryTtl) {
+ super(session);
+ this.repositoryTtl = repositoryTtl;
+ }
+
+ @Override
+ public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition) {
+ String insert = "INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) VALUES (?, ?, ?) USING TTL ?";
+ PreparedStatement prepared = prepare(insert);
+ BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash, partition, repositoryTtl);
+ ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+ return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);
+ }
+
+ @Override
+ public Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash) {
+ String select = "SELECT partition FROM processed_msg_partitions WHERE " +
+ "node_id = ? AND clustered_hash = ?";
+ PreparedStatement prepared = prepare(select);
+ BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);
+ Row row = executeRead(boundStatement).one();
+ if (row == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(row.getLong("partition"));
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java
new file mode 100644
index 0000000..8f01c18
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+@Slf4j
+public abstract class SimpleAbstractCassandraDao {
+
+ private ConsistencyLevel defaultReadLevel = ConsistencyLevel.QUORUM;
+ private ConsistencyLevel defaultWriteLevel = ConsistencyLevel.QUORUM;
+ private Session session;
+ private Map<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
+
+ public SimpleAbstractCassandraDao(Session session) {
+ this.session = session;
+ }
+
+ protected Session getSession() {
+ return session;
+ }
+
+ protected ResultSet executeRead(Statement statement) {
+ return execute(statement, defaultReadLevel);
+ }
+
+ protected ResultSet executeWrite(Statement statement) {
+ return execute(statement, defaultWriteLevel);
+ }
+
+ protected ResultSetFuture executeAsyncRead(Statement statement) {
+ return executeAsync(statement, defaultReadLevel);
+ }
+
+ protected ResultSetFuture executeAsyncWrite(Statement statement) {
+ return executeAsync(statement, defaultWriteLevel);
+ }
+
+ protected PreparedStatement prepare(String query) {
+ return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
+ }
+
+ private ResultSet execute(Statement statement, ConsistencyLevel level) {
+ log.debug("Execute cassandra statement {}", statement);
+ if (statement.getConsistencyLevel() == null) {
+ statement.setConsistencyLevel(level);
+ }
+ return getSession().execute(statement);
+ }
+
+ private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
+ log.debug("Execute cassandra async statement {}", statement);
+ if (statement.getConsistencyLevel() == null) {
+ statement.setConsistencyLevel(level);
+ }
+ return getSession().executeAsync(statement);
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
index 57e501e..5d34d84 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
@@ -18,12 +18,13 @@ package org.thingsboard.rule.engine.queue.cassandra.repository;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.TbMsg;
+import java.util.List;
import java.util.UUID;
public interface MsgRepository {
- ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition);
+ ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs);
- Iterable<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
+ List<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
index bc29050..807c001 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
@@ -1,11 +1,28 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.queue.cassandra.repository;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.Optional;
import java.util.UUID;
public interface ProcessedPartitionRepository {
- void partitionProcessed(UUID nodeId, long clusteredHash, long partition);
+ ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition);
Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
index 99e9a92..e114a85 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
@@ -15,14 +15,20 @@
*/
package org.thingsboard.rule.engine.queue.cassandra;
+import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.TbMsg;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+@Component
public class UnprocessedMsgFilter {
- public Collection<TbMsg> filter(Iterable<TbMsg> msgs, Iterable<MsgAck> acks) {
- return Collections.emptyList();
+ public Collection<TbMsg> filter(List<TbMsg> msgs, List<MsgAck> acks) {
+ Set<UUID> processedIds = acks.stream().map(MsgAck::getMsgId).collect(Collectors.toSet());
+ return msgs.stream().filter(i -> !processedIds.contains(i.getId())).collect(Collectors.toList());
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java
new file mode 100644
index 0000000..58510d7
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java
@@ -0,0 +1,136 @@
+package org.thingsboard.rule.engine.tool;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.thingsboard.rule.engine.api.MsqQueue;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.api.TbMsgMetaData;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+@SpringBootApplication
+@EnableAutoConfiguration
+@ComponentScan({"org.thingsboard.rule.engine"})
+//@PropertySource("classpath:processing-pipeline.properties")
+@Slf4j
+public class QueueBenchmark implements CommandLineRunner {
+
+ public static void main(String[] args) {
+ try {
+ SpringApplication.run(QueueBenchmark.class, args);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ System.exit(0);
+ }
+ }
+
+ @Autowired
+ private MsqQueue msqQueue;
+
+ @Override
+ public void run(String... strings) throws Exception {
+ System.out.println("It works + " + msqQueue);
+
+
+ long start = System.currentTimeMillis();
+ int msgCount = 10000000;
+ AtomicLong count = new AtomicLong(0);
+ ExecutorService service = Executors.newFixedThreadPool(100);
+
+ CountDownLatch latch = new CountDownLatch(msgCount);
+ for (int i = 0; i < msgCount; i++) {
+ service.submit(() -> {
+ boolean isFinished = false;
+ while (!isFinished) {
+ try {
+ TbMsg msg = randomMsg();
+ UUID nodeId = UUIDs.timeBased();
+ ListenableFuture<Void> put = msqQueue.put(msg, nodeId, 100L);
+// ListenableFuture<Void> put = msqQueue.ack(msg, nodeId, 100L);
+ Futures.addCallback(put, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+// t.printStackTrace();
+ System.out.println("onFailure, because:" + t.getMessage());
+ latch.countDown();
+ }
+ });
+ isFinished = true;
+ } catch (Throwable th) {
+// th.printStackTrace();
+ System.out.println("Repeat query, because:" + th.getMessage());
+// latch.countDown();
+ }
+ }
+ });
+ }
+
+ long prev = 0L;
+ while (latch.getCount() != 0) {
+ TimeUnit.SECONDS.sleep(1);
+ long curr = latch.getCount();
+ long rps = prev - curr;
+ prev = curr;
+ System.out.println("rps = " + rps);
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("final rps = " + (msgCount / (end - start) * 1000));
+
+ System.out.println("Finished");
+
+ }
+
+ private TbMsg randomMsg() {
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("key", "value");
+ String dataStr = "someContent";
+ return new TbMsg(UUIDs.timeBased(), "type", null, metaData, dataStr.getBytes());
+ }
+
+ @Bean
+ public Session session() {
+ Cluster thingsboard = Cluster.builder()
+ .addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042))
+ .withClusterName("thingsboard")
+// .withSocketOptions(socketOpts.getOpts())
+ .withPoolingOptions(new PoolingOptions()
+ .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
+ .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)).build();
+
+ Session session = thingsboard.connect("thingsboard");
+ return session;
+ }
+
+ @Bean
+ public int defaultTtl() {
+ return 6000;
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto b/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto
new file mode 100644
index 0000000..1105aeb
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+package msgqueue;
+
+option java_package = "org.thingsboard.rule.engine.queue.cassandra.repository.gen";
+option java_outer_classname = "MsgQueueProtos";
+
+
+message TbMsgProto {
+ string id = 1;
+ string type = 2;
+ string entityType = 3;
+ string entityId = 4;
+
+ message TbMsgMetaDataProto {
+ map<string, string> data = 1;
+ }
+
+ repeated TbMsgMetaDataProto metaData = 5;
+
+ bytes data = 6;
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java
new file mode 100644
index 0000000..8ce3355
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
+import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
+
+public class CassandraMsqQueueTest {
+
+ private CassandraMsqQueue msqQueue;
+
+ @Mock
+ private MsgRepository msgRepository;
+ @Mock
+ private AckRepository ackRepository;
+ @Mock
+ private UnprocessedMsgFilter unprocessedMsgFilter;
+ @Mock
+ private QueuePartitioner queuePartitioner;
+
+ @Before
+ public void init() {
+ msqQueue = new CassandraMsqQueue(msgRepository, ackRepository, unprocessedMsgFilter, queuePartitioner);
+ }
+
+ @Test
+ public void msgCanBeSaved() {
+// todo-vp: implement
+ }
+
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java
new file mode 100644
index 0000000..a71a737
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueuePartitionerTest {
+
+ private QueuePartitioner queuePartitioner;
+
+ @Mock
+ private ProcessedPartitionRepository partitionRepo;
+
+ private Instant startInstant;
+ private Instant endInstant;
+
+ @Before
+ public void init() {
+ queuePartitioner = new QueuePartitioner("MINUTES", partitionRepo);
+ startInstant = Instant.now();
+ endInstant = startInstant.plus(2, ChronoUnit.MINUTES);
+ queuePartitioner.setClock(Clock.fixed(endInstant, ZoneOffset.UTC));
+ }
+
+ @Test
+ public void partitionCalculated() {
+ long time = 1519390191425L;
+ long partition = queuePartitioner.getPartition(time);
+ assertEquals(1519390140000L, partition);
+ }
+
+ @Test
+ public void unprocessedPartitionsReturned() {
+ UUID nodeId = UUID.randomUUID();
+ long clusteredHash = 101L;
+ when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.of(startInstant.toEpochMilli()));
+ List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
+ assertEquals(3, actual.size());
+ }
+
+ @Test
+ public void defaultShiftUsedIfNoPartitionWasProcessed() {
+ UUID nodeId = UUID.randomUUID();
+ long clusteredHash = 101L;
+ when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());
+ List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
+ assertEquals(1011, actual.size());
+ }
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
new file mode 100644
index 0000000..38b7b9d
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraAckRepositoryTest extends SimpleAbstractCassandraDaoTest {
+
+ private CassandraAckRepository ackRepository;
+
+ @Before
+ public void init() {
+ ackRepository = new CassandraAckRepository(cassandraUnit.session, 1);
+ }
+
+ @Test
+ public void acksInPartitionCouldBeFound() {
+ UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
+
+ List<MsgAck> extectedAcks = Lists.newArrayList(
+ new MsgAck(UUID.fromString("bebaeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L),
+ new MsgAck(UUID.fromString("12baeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L)
+ );
+
+ List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 101L, 300L);
+ assertEquals(extectedAcks, actualAcks);
+ }
+
+ @Test
+ public void ackCanBeSavedAndRead() throws ExecutionException, InterruptedException {
+ UUID msgId = UUIDs.timeBased();
+ UUID nodeId = UUIDs.timeBased();
+ MsgAck ack = new MsgAck(msgId, nodeId, 10L, 20L);
+ ListenableFuture<Void> future = ackRepository.ack(ack);
+ future.get();
+ List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 10L, 20L);
+ assertEquals(1, actualAcks.size());
+ assertEquals(ack, actualAcks.get(0));
+ }
+
+ @Test
+ public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {
+ UUID msgId = UUIDs.timeBased();
+ UUID nodeId = UUIDs.timeBased();
+ MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);
+ ListenableFuture<Void> future = ackRepository.ack(ack);
+ future.get();
+ List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 30L, 40L);
+ assertEquals(1, actualAcks.size());
+ TimeUnit.SECONDS.sleep(2);
+ assertTrue(ackRepository.findAcks(nodeId, 30L, 40L).isEmpty());
+ }
+
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
new file mode 100644
index 0000000..a0827fc
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+//import static org.junit.jupiter.api.Assertions.*;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.api.TbMsgMetaData;
+import org.thingsboard.server.common.data.id.DeviceId;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest {
+
+ private CassandraMsgRepository msgRepository;
+
+ @Before
+ public void init() {
+ msgRepository = new CassandraMsgRepository(cassandraUnit.session, 1);
+ }
+
+ @Test
+ public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+ UUID nodeId = UUIDs.timeBased();
+ ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
+ future.get();
+ List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);
+ assertEquals(1, msgs.size());
+ }
+
+ @Test
+ public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+ UUID nodeId = UUIDs.timeBased();
+ ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
+ future.get();
+ List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 2L, 2L);
+ assertEquals(1, msgs.size());
+ TimeUnit.SECONDS.sleep(2);
+ assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());
+ }
+
+ @Test
+ public void protoBufConverterWorkAsExpected() throws ExecutionException, InterruptedException {
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("key", "value");
+ String dataStr = "someContent";
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, dataStr.getBytes());
+ UUID nodeId = UUIDs.timeBased();
+ ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
+ future.get();
+ List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);
+ assertEquals(1, msgs.size());
+ assertEquals(msg, msgs.get(0));
+ }
+
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
new file mode 100644
index 0000000..a731452
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository.impl;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class CassandraProcessedPartitionRepositoryTest extends SimpleAbstractCassandraDaoTest {
+
+ private CassandraProcessedPartitionRepository partitionRepository;
+
+ @Before
+ public void init() {
+ partitionRepository = new CassandraProcessedPartitionRepository(cassandraUnit.session, 1);
+ }
+
+ @Test
+ public void lastProcessedPartitionCouldBeFound() {
+ UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");
+ Optional<Long> lastProcessedPartition = partitionRepository.findLastProcessedPartition(nodeId, 101L);
+ assertTrue(lastProcessedPartition.isPresent());
+ assertEquals((Long) 777L, lastProcessedPartition.get());
+ }
+
+ @Test
+ public void highestProcessedPartitionReturned() throws ExecutionException, InterruptedException {
+ UUID nodeId = UUIDs.timeBased();
+ ListenableFuture<Void> future1 = partitionRepository.partitionProcessed(nodeId, 303L, 100L);
+ ListenableFuture<Void> future2 = partitionRepository.partitionProcessed(nodeId, 303L, 200L);
+ ListenableFuture<Void> future3 = partitionRepository.partitionProcessed(nodeId, 303L, 10L);
+ ListenableFuture<List<Void>> allFutures = Futures.allAsList(future1, future2, future3);
+ allFutures.get();
+ Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 303L);
+ assertTrue(actual.isPresent());
+ assertEquals((Long) 200L, actual.get());
+ }
+
+ @Test
+ public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {
+ UUID nodeId = UUIDs.timeBased();
+ ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);
+ future.get();
+ Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 404L);
+ assertEquals((Long) 10L, actual.get());
+ TimeUnit.SECONDS.sleep(2);
+ assertFalse(partitionRepository.findLastProcessedPartition(nodeId, 404L).isPresent());
+ }
+
+ @Test
+ public void ifNoPartitionsWereProcessedEmptyResultReturned() {
+ UUID nodeId = UUIDs.timeBased();
+ Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 505L);
+ assertFalse(actual.isPresent());
+ }
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java
new file mode 100644
index 0000000..dec8e35
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.thingsboard.rule.engine.api.TbMsg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class UnprocessedMsgFilterTest {
+
+ private UnprocessedMsgFilter msgFilter = new UnprocessedMsgFilter();
+
+ @Test
+ public void acknowledgedMsgsAreFilteredOut() {
+ UUID id1 = UUID.randomUUID();
+ UUID id2 = UUID.randomUUID();
+ TbMsg msg1 = new TbMsg(id1, "T", null, null, null);
+ TbMsg msg2 = new TbMsg(id2, "T", null, null, null);
+ List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
+ List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
+ Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
+ assertEquals(1, actual.size());
+ assertEquals(msg1, actual.iterator().next());
+ }
+
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql b/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql
new file mode 100644
index 0000000..7ba9f26
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql
@@ -0,0 +1,75 @@
+CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
+ node_id timeuuid,
+ clustered_hash bigint,
+ partition bigint,
+ ts bigint,
+ msg blob,
+ PRIMARY KEY ((node_id, clustered_hash, partition), ts))
+WITH CLUSTERING ORDER BY (ts DESC)
+AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200',
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true'
+};
+
+
+CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
+ node_id timeuuid,
+ clustered_hash bigint,
+ partition bigint,
+ msg_id timeuuid,
+ PRIMARY KEY ((node_id, clustered_hash, partition), msg_id))
+WITH CLUSTERING ORDER BY (msg_id DESC)
+AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200',
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true'
+};
+
+CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
+ node_id timeuuid,
+ clustered_hash bigint,
+ partition bigint,
+ PRIMARY KEY ((node_id, clustered_hash), partition))
+WITH CLUSTERING ORDER BY (partition DESC)
+AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200',
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true'
+};
+
+
+
+-- msg_queue dataset
+
+INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null);
+INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null);
+INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null);
+
+-- ack_queue dataset
+INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9);
+INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9);
+ INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9);
+
+-- processed partition dataset
+INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100);
+INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777);
+INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition)
+ VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200);
\ No newline at end of file