thingsboard-aplcache

TMP commit

5/11/2018 11:12:41 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index a1e6f06..066b1df 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -207,6 +207,10 @@ public class ActorSystemContext {
     @Getter
     private DeviceStateService deviceStateService;
 
+    @Value("${cluster.partition_id}")
+    @Getter
+    private long queuePartitionId;
+
     @Value("${actors.session.sync.timeout}")
     @Getter
     private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 039fdf6..07e3836 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -114,12 +114,12 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+        return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
     }
 
     @Override
     public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 9c60327..e9691b7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     private void reprocess(List<RuleNode> ruleNodeList) {
         for (RuleNode ruleNode : ruleNodeList) {
-            for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
+            for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
                 pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
             }
         }
         if (firstNode != null) {
-            for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
+            for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), systemContext.getQueuePartitionId())) {
                 pushMsgToNode(firstNode, tbMsg);
             }
         }
@@ -269,6 +269,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
         // We don't put firstNodeId because it may change over time;
-        return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, 0L);
+        return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, systemContext.getQueuePartitionId());
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 4a7e072..136d4f6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -88,7 +88,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
         EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
-        Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+        Futures.addCallback(queue.put(tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable Void result) {
                 onSuccess.accept(tbMsg);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 075857c..4e664c6 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.rpc;
 
 import io.grpc.stub.StreamObserver;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
@@ -42,4 +43,6 @@ public interface ClusterRpcService {
 
     void tell(ClusterAPIProtos.ClusterMessage message);
 
+    void tell(ServerAddress serverAddress, TbActorMsg actorMsg);
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 00a337a..f0fc966 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.state.DefaultDeviceStateService;
 import org.thingsboard.server.service.state.DeviceStateService;
 
@@ -83,6 +84,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     private ClusterRoutingService routingService;
 
     @Autowired
+    private ClusterRpcService rpcService;
+
+    @Autowired
     @Lazy
     private DeviceStateService stateService;
 
@@ -106,7 +110,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     }
 
     private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
-
     private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
 
     @Override
@@ -117,6 +120,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
             ServerAddress address = server.get();
             log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
             subscription = new Subscription(sub, true, address);
+//            rpcService.tell();
 //            rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
         } else {
             log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 2ecffe7..94b43de 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -19,70 +19,6 @@ package cluster;
 option java_package = "org.thingsboard.server.gen.cluster";
 option java_outer_classname = "ClusterAPIProtos";
 
-//message Uid {
-//  sint64 pluginUuidMsb = 1;
-//  sint64 pluginUuidLsb = 2;
-//}
-//
-//message PluginAddress {
-//  Uid pluginId = 1;
-//  Uid tenantId = 2;
-//}
-//
-//message ToPluginRpcMessage {
-//  PluginAddress address = 1;
-//  int32 clazz = 2;
-//  bytes data = 3;
-//}
-//
-//message ToDeviceActorRpcMessage {
-//  bytes data = 1;
-//}
-//
-//message ToDeviceSessionActorRpcMessage {
-//  bytes data = 1;
-//}
-//
-//message ToDeviceActorNotificationRpcMessage {
-//  bytes data = 1;
-//}
-//
-//message ToAllNodesRpcMessage {
-//  bytes data = 1;
-//}
-//
-//message ConnectRpcMessage {
-//  ServerAddress serverAddress = 1;
-//}
-//
-//message ToDeviceRpcRequestRpcMessage {
-//  Uid deviceTenantId = 2;
-//  Uid deviceId = 3;
-//
-//  Uid msgId = 4;
-//  bool oneway = 5;
-//  int64 expTime = 6;
-//  string method = 7;
-//  string params = 8;
-//}
-//
-//message ToPluginRpcResponseRpcMessage {
-//  Uid msgId = 2;
-//  string response = 3;
-//  string error = 4;
-//}
-//
-//message ToRpcServerMessage {
-//  ConnectRpcMessage connectMsg = 1;
-//  ToPluginRpcMessage toPluginRpcMsg = 2;
-//  ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
-//  ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
-//  ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
-//  ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
-//  ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
-//  ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
-//}
-
 service ClusterRpcService {
   rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
 }
@@ -103,7 +39,6 @@ message MessageMataInfo {
   repeated string tags = 2;
 }
 
-
 enum MessageType {
 
   //Cluster control messages
@@ -113,6 +48,7 @@ enum MessageType {
   RPC_BROADCAST_MSG = 3;
   CONNECT_RPC_MESSAGE =4;
 
-  //CLUSTER_DATA_MESSAGE
-  CLUSTER_NETWORK_SERVER_DATA_MESSAGE = 5;
+  CLUSTER_ACTOR_MESSAGE = 5;
+  CLUSTER_TELEMETRY_MESSAGE = 6;
+  CLUSTER_DEVICE_RPC_MESSAGE = 7;
 }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index af5b8d4..ad29dd2 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -58,6 +58,8 @@ cluster:
   hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
   # Amount of virtual nodes in consistent hash ring.
   vitrual_nodes_size: "${CLUSTER_VIRTUAL_NODES_SIZE:16}"
+  # Queue partition id for current node
+  partition_id: "${QUEUE_PARTITION_ID:0}"
 
 # Plugins configuration parameters
 plugins:
@@ -308,6 +310,7 @@ rule:
     max_size: 10000
 
 
+
 # PostgreSQL DAO Configuration
 #spring:
 #  data:
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
new file mode 100644
index 0000000..3765246
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
@@ -0,0 +1,16 @@
+package org.thingsboard.server.mqtt;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Created by ashvayka on 11.05.18.
+ */
+public class DbConfigurationTestRule implements TestRule {
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+        return null;
+    }
+}
diff --git a/base-docker-compose.yml b/base-docker-compose.yml
index 4c19a54..e1f6a5e 100644
--- a/base-docker-compose.yml
+++ b/base-docker-compose.yml
@@ -7,18 +7,6 @@ services:
     ports:
       - "2181:2181"
 
-  postgres-tb:
-    image: postgres
-    command: postgres  -c 'max_connections=500'
-    environment:
-      - POSTGRES_USER=postgres
-      - POSTGRES_PASSWORD=postgres
-      - POSTGRES_DB=thingsboard
-    networks:
-      - core
-    ports:
-      - "5432:5432"
-
   cassandra-tb:
     image: cassandra
     networks:
@@ -36,7 +24,6 @@ services:
     ports:
       - "6379:6379"
 
-
 networks:
   core:
 
diff --git a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
index d214a70..fbed694 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.Session;
 import java.util.List;
 
 public class CustomCassandraCQLUnit extends BaseCassandraUnit {
-    private List<CQLDataSet> dataSets;
+    protected List<CQLDataSet> dataSets;
 
     public Session session;
     public Cluster cluster;