thingsboard-memoizeit
Changes
dao/src/main/resources/cassandra/schema.cql 52(+52 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java 15(+15 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java 15(+15 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java 15(+15 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java 15(+15 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java 15(+15 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java 15(+15 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java 15(+15 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java 15(+15 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java 29(+29 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java 82(+82 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java 31(+31 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java 28(+28 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java 29(+29 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java 12(+12 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java 28(+28 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/jpa/SqlMsgQueue.java 20(+20 -0)
Details
dao/src/main/resources/cassandra/schema.cql 52(+52 -0)
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index dda8067..876c9f7 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -548,3 +548,55 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.event_by_id AS
AND event_type IS NOT NULL AND event_uid IS NOT NULL
PRIMARY KEY ((tenant_id, entity_type, entity_id), id, event_type, event_uid)
WITH CLUSTERING ORDER BY (id ASC, event_type ASC, event_uid ASC);
+
+CREATE TABLE IF NOT EXISTS thingsboard.msg_queue (
+ node_id timeuuid,
+ clustered_hash bigint,
+ partition bigint,
+ ts bigint,
+ msg blob,
+ PRIMARY KEY ((node_id, cluster_hash, partition), ts)
+ WITH CLUSTERING ORDER BY (ts DESC)
+ AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200'
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true',
+ };
+);
+
+CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue (
+ node_id timeuuid,
+ clustered_hash bigint,
+ partition bigint,
+ ts bigint,
+ msg_id timeuuid,
+ PRIMARY KEY ((node_id, cluster_hash, partition), ts)
+ WITH CLUSTERING ORDER BY (ts DESC)
+ AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200'
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true',
+ };
+);
+
+CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions (
+ node_id timeuuid,
+ clustered_hash bigint,
+ partition bigint,
+ PRIMARY KEY ((node_id, cluster_hash), partition)
+ WITH CLUSTERING ORDER BY (partition DESC)
+ AND compaction = {
+ 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy',
+ 'min_threshold': '5',
+ 'base_time_seconds': '43200',
+ 'max_window_size_seconds': '43200'
+ 'tombstone_threshold': '0.9',
+ 'unchecked_tombstone_compaction': 'true',
+ };
+);
\ No newline at end of file
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MsqQueue.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MsqQueue.java
new file mode 100644
index 0000000..33bd32d
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MsqQueue.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.UUID;
+
+public interface MsqQueue {
+
+ ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash);
+
+ ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash);
+
+ Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash);
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index b833a97..eef5ff3 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java
index 9917c6a..f79a5ea 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
import lombok.Data;
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
index 56abf2a..92d3b8e 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
import lombok.Data;
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java
index 70993df..d5f6deb 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
import java.util.concurrent.ExecutionException;
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
index 4ab84aa..540c9a6 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
import com.fasterxml.jackson.databind.JsonNode;
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
index 5a3744b..86d3d03 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
import com.fasterxml.jackson.core.JsonProcessingException;
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
index 15f4c30..7311b76 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.api;
/**
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
index 034ee48..4f1bef4 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.filter;
import lombok.extern.slf4j.Slf4j;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
index e766207..54d0052 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.filter;
import lombok.Data;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index b623af7..5afbb80 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.metadata;
import lombok.extern.slf4j.Slf4j;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
index 076500c..6996af6 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.metadata;
import lombok.Data;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java
new file mode 100644
index 0000000..4f1e187
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.api.TbMsg;
+
+import java.util.UUID;
+
+@Component
+public class AckBuilder {
+
+ public MsgAck build(TbMsg msg, UUID nodeId, long clusteredHash) {
+ return null;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
new file mode 100644
index 0000000..38ae627
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.thingsboard.rule.engine.api.MsqQueue;
+import org.thingsboard.rule.engine.api.TbMsg;
+import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository;
+import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository;
+import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+@Component
+public class CassandraMsqQueue implements MsqQueue {
+
+ @Autowired
+ private MsgRepository msgRepository;
+
+ @Autowired
+ private AckRepository ackRepository;
+
+ @Autowired
+ private AckBuilder ackBuilder;
+
+ @Autowired
+ private UnprocessedMsgFilter unprocessedMsgFilter;
+
+ @Autowired
+ private ProcessedPartitionRepository processedPartitionRepository;
+
+ @Override
+ public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusteredHash) {
+ return msgRepository.save(msg, nodeId, clusteredHash, getPartition(msg));
+ }
+
+ @Override
+ public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusteredHash) {
+ MsgAck ack = ackBuilder.build(msg, nodeId, clusteredHash);
+ return ackRepository.ack(ack);
+ }
+
+ @Override
+ public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusteredHash) {
+ List<TbMsg> unprocessedMsgs = Lists.newArrayList();
+ for (Long partition : findUnprocessedPartitions(nodeId, clusteredHash)) {
+ Iterable<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition);
+ Iterable<MsgAck> acks = ackRepository.findAcks(nodeId, clusteredHash, partition);
+ unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));
+ }
+ return unprocessedMsgs;
+ }
+
+ private List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
+ Optional<Long> lastPartition = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
+ return Collections.emptyList();
+ }
+
+ private long getPartition(TbMsg msg) {
+ return Long.MIN_VALUE;
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
new file mode 100644
index 0000000..ca8d820
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+@Data
+public class MsgAck {
+
+ private final UUID msgId;
+ private final UUID nodeId;
+ private final long clusteredHash;
+ private final long partition;
+ private final long ts;
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
new file mode 100644
index 0000000..3d9b55f
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.queue.cassandra.MsgAck;
+
+import java.util.UUID;
+
+public interface AckRepository {
+
+ ListenableFuture<Void> ack(MsgAck msgAck);
+
+ Iterable<MsgAck> findAcks(UUID nodeId, long clusteredHash, long partition);
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
new file mode 100644
index 0000000..57e501e
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra.repository;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.api.TbMsg;
+
+import java.util.UUID;
+
+public interface MsgRepository {
+
+ ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusteredHash, long partition);
+
+ Iterable<TbMsg> findMsgs(UUID nodeId, long clusteredHash, long partition);
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
new file mode 100644
index 0000000..bc29050
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java
@@ -0,0 +1,12 @@
+package org.thingsboard.rule.engine.queue.cassandra.repository;
+
+import java.util.Optional;
+import java.util.UUID;
+
+public interface ProcessedPartitionRepository {
+
+ void partitionProcessed(UUID nodeId, long clusteredHash, long partition);
+
+ Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash);
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
new file mode 100644
index 0000000..99e9a92
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.cassandra;
+
+import org.thingsboard.rule.engine.api.TbMsg;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class UnprocessedMsgFilter {
+
+ public Collection<TbMsg> filter(Iterable<TbMsg> msgs, Iterable<MsgAck> acks) {
+ return Collections.emptyList();
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/jpa/SqlMsgQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/jpa/SqlMsgQueue.java
new file mode 100644
index 0000000..d1bf73c
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/jpa/SqlMsgQueue.java
@@ -0,0 +1,20 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.queue.jpa;
+
+//@todo-vp: implement
+public class SqlMsgQueue {
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java
index 962ea10..e8837be 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine;
import com.fasterxml.jackson.core.JsonProcessingException;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
index 837f333..c9d34fa 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.rule.engine.transform;
import lombok.extern.slf4j.Slf4j;