thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 6(+3 -3)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java 3(+3 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 6(+5 -1)
application/src/main/proto/cluster.proto 70(+3 -67)
base-docker-compose.yml 13(+0 -13)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index a1e6f06..066b1df 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -207,6 +207,10 @@ public class ActorSystemContext {
@Getter
private DeviceStateService deviceStateService;
+ @Value("${cluster.partition_id}")
+ @Getter
+ private long queuePartitionId;
+
@Value("${actors.session.sync.timeout}")
@Getter
private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 039fdf6..07e3836 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -114,12 +114,12 @@ class DefaultTbContext implements TbContext {
@Override
public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+ return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
}
@Override
public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+ return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 9c60327..e9691b7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void reprocess(List<RuleNode> ruleNodeList) {
for (RuleNode ruleNode : ruleNodeList) {
- for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
+ for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
}
}
if (firstNode != null) {
- for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
+ for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), systemContext.getQueuePartitionId())) {
pushMsgToNode(firstNode, tbMsg);
}
}
@@ -269,6 +269,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
// We don't put firstNodeId because it may change over time;
- return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, 0L);
+ return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, systemContext.getQueuePartitionId());
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 4a7e072..136d4f6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -88,7 +88,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
- Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+ Futures.addCallback(queue.put(tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
onSuccess.accept(tbMsg);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 075857c..4e664c6 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.rpc;
import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
@@ -42,4 +43,6 @@ public interface ClusterRpcService {
void tell(ClusterAPIProtos.ClusterMessage message);
+ void tell(ServerAddress serverAddress, TbActorMsg actorMsg);
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 00a337a..f0fc966 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
@@ -83,6 +84,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
private ClusterRoutingService routingService;
@Autowired
+ private ClusterRpcService rpcService;
+
+ @Autowired
@Lazy
private DeviceStateService stateService;
@@ -106,7 +110,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
-
private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
@Override
@@ -117,6 +120,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
ServerAddress address = server.get();
log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
subscription = new Subscription(sub, true, address);
+// rpcService.tell();
// rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
} else {
log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
application/src/main/proto/cluster.proto 70(+3 -67)
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 2ecffe7..94b43de 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -19,70 +19,6 @@ package cluster;
option java_package = "org.thingsboard.server.gen.cluster";
option java_outer_classname = "ClusterAPIProtos";
-//message Uid {
-// sint64 pluginUuidMsb = 1;
-// sint64 pluginUuidLsb = 2;
-//}
-//
-//message PluginAddress {
-// Uid pluginId = 1;
-// Uid tenantId = 2;
-//}
-//
-//message ToPluginRpcMessage {
-// PluginAddress address = 1;
-// int32 clazz = 2;
-// bytes data = 3;
-//}
-//
-//message ToDeviceActorRpcMessage {
-// bytes data = 1;
-//}
-//
-//message ToDeviceSessionActorRpcMessage {
-// bytes data = 1;
-//}
-//
-//message ToDeviceActorNotificationRpcMessage {
-// bytes data = 1;
-//}
-//
-//message ToAllNodesRpcMessage {
-// bytes data = 1;
-//}
-//
-//message ConnectRpcMessage {
-// ServerAddress serverAddress = 1;
-//}
-//
-//message ToDeviceRpcRequestRpcMessage {
-// Uid deviceTenantId = 2;
-// Uid deviceId = 3;
-//
-// Uid msgId = 4;
-// bool oneway = 5;
-// int64 expTime = 6;
-// string method = 7;
-// string params = 8;
-//}
-//
-//message ToPluginRpcResponseRpcMessage {
-// Uid msgId = 2;
-// string response = 3;
-// string error = 4;
-//}
-//
-//message ToRpcServerMessage {
-// ConnectRpcMessage connectMsg = 1;
-// ToPluginRpcMessage toPluginRpcMsg = 2;
-// ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
-// ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
-// ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
-// ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
-// ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
-// ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
-//}
-
service ClusterRpcService {
rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
}
@@ -103,7 +39,6 @@ message MessageMataInfo {
repeated string tags = 2;
}
-
enum MessageType {
//Cluster control messages
@@ -113,6 +48,7 @@ enum MessageType {
RPC_BROADCAST_MSG = 3;
CONNECT_RPC_MESSAGE =4;
- //CLUSTER_DATA_MESSAGE
- CLUSTER_NETWORK_SERVER_DATA_MESSAGE = 5;
+ CLUSTER_ACTOR_MESSAGE = 5;
+ CLUSTER_TELEMETRY_MESSAGE = 6;
+ CLUSTER_DEVICE_RPC_MESSAGE = 7;
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index af5b8d4..ad29dd2 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -58,6 +58,8 @@ cluster:
hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
# Amount of virtual nodes in consistent hash ring.
vitrual_nodes_size: "${CLUSTER_VIRTUAL_NODES_SIZE:16}"
+ # Queue partition id for current node
+ partition_id: "${QUEUE_PARTITION_ID:0}"
# Plugins configuration parameters
plugins:
@@ -308,6 +310,7 @@ rule:
max_size: 10000
+
# PostgreSQL DAO Configuration
#spring:
# data:
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
new file mode 100644
index 0000000..3765246
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
@@ -0,0 +1,16 @@
+package org.thingsboard.server.mqtt;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Created by ashvayka on 11.05.18.
+ */
+public class DbConfigurationTestRule implements TestRule {
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ return null;
+ }
+}
base-docker-compose.yml 13(+0 -13)
diff --git a/base-docker-compose.yml b/base-docker-compose.yml
index 4c19a54..e1f6a5e 100644
--- a/base-docker-compose.yml
+++ b/base-docker-compose.yml
@@ -7,18 +7,6 @@ services:
ports:
- "2181:2181"
- postgres-tb:
- image: postgres
- command: postgres -c 'max_connections=500'
- environment:
- - POSTGRES_USER=postgres
- - POSTGRES_PASSWORD=postgres
- - POSTGRES_DB=thingsboard
- networks:
- - core
- ports:
- - "5432:5432"
-
cassandra-tb:
image: cassandra
networks:
@@ -36,7 +24,6 @@ services:
ports:
- "6379:6379"
-
networks:
core:
diff --git a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
index d214a70..fbed694 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.Session;
import java.util.List;
public class CustomCassandraCQLUnit extends BaseCassandraUnit {
- private List<CQLDataSet> dataSets;
+ protected List<CQLDataSet> dataSets;
public Session session;
public Cluster cluster;