thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 3(+3 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 30(+25 -5)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 9(+7 -2)
application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java 12(+8 -4)
msa/black-box-tests/README.md 4(+3 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 15e9e9d..6df37d6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -51,7 +51,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-@Slf4j
public class AppActor extends RuleChainManagerActor {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index f450027..f53a410 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.server.actors.device;
-import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -29,7 +28,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
-@Slf4j
public class DeviceActor extends ContextAwareActor {
private final DeviceActorMessageProcessor processor;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 47417d1..41d1ea0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -348,9 +348,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
int requestId = msg.getMsg().getRequestId();
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
if (data != null) {
+ log.debug("[{}] Pushing reply to [{}][{}]!", deviceId, data.getNodeId(), data.getSessionId());
sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
, data.getSessionId(), data.getNodeId());
+ } else {
+ log.debug("[{}][{}] Pending RPC request to server not found!", deviceId, requestId);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 1eba066..760d4a6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -40,7 +40,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
@Override
public void onConnected(GrpcSession session) {
- log.info("{} session started -> {}", getType(session), session.getRemoteServer());
+ log.info("[{}][{}] session started", session.getRemoteServer(), getType(session));
if (!session.isClient()) {
manager.tell(new RpcSessionConnectedMsg(session.getRemoteServer(), session.getSessionId()), self);
}
@@ -48,21 +48,19 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
@Override
public void onDisconnected(GrpcSession session) {
- log.info("{} session closed -> {}", getType(session), session.getRemoteServer());
+ log.info("[{}][{}] session closed", session.getRemoteServer(), getType(session));
manager.tell(new RpcSessionDisconnectedMsg(session.isClient(), session.getRemoteServer()), self);
}
@Override
public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
- log.trace("{} Service [{}] received session actor msg {}", getType(session),
- session.getRemoteServer(),
- clusterMessage);
+ log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
}
@Override
public void onError(GrpcSession session, Throwable t) {
- log.warn("{} session got error -> {}", getType(session), session.getRemoteServer(), t);
+ log.warn("[{}][{}] session got error -> {}", session.getRemoteServer(), getType(session), t);
manager.tell(new RpcSessionClosedMsg(session.isClient(), session.getRemoteServer()), self);
session.close();
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 922d2ba..9fa087e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -36,7 +36,6 @@ import java.util.*;
/**
* @author Andrew Shvayka
*/
-@Slf4j
public class RpcManagerActor extends ContextAwareActor {
private final Map<ServerAddress, SessionActorInfo> sessionActors;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index e0f7e4a..e4e82c5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -69,6 +69,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
private boolean started;
+ private String ruleChainName;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, ActorRef parent, ActorRef self) {
@@ -78,15 +79,24 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
this.nodeActors = new HashMap<>();
this.nodeRoutes = new HashMap<>();
this.service = systemContext.getRuleChainService();
+ this.ruleChainName = ruleChainId.toString();
}
@Override
- public void start(ActorContext context) throws Exception {
+ public String getComponentName() {
+ return null;
+ }
+
+ @Override
+ public void start(ActorContext context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(entityId);
+ ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+ log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
+ log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
@@ -98,16 +108,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
@Override
- public void onUpdate(ActorContext context) throws Exception {
+ public void onUpdate(ActorContext context) {
RuleChain ruleChain = service.findRuleChainById(entityId);
+ ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
-
+ log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
for (RuleNode ruleNode : ruleNodeList) {
RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
if (existing == null) {
+ log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
} else {
+ log.trace("[{}][{}] Updating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
existing.setSelf(ruleNode);
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
}
@@ -116,6 +129,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
removedRules.forEach(ruleNodeId -> {
+ log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self);
});
@@ -124,7 +138,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
@Override
- public void stop(ActorContext context) throws Exception {
+ public void stop(ActorContext context) {
+ log.trace("[{}][{}] Stopping rule chain with {} nodes", tenantId, entityId, nodeActors.size());
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
nodeActors.clear();
nodeRoutes.clear();
@@ -133,7 +148,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
@Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+ public void onClusterEventMsg(ClusterEventMsg msg) {
}
@@ -150,10 +165,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
+ log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
+ log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) {
@@ -232,17 +249,20 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
int relationsCount = relations.size();
EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) {
+ log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (ackId != null) {
// TODO: Ack this message in Kafka
// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
}
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
+ log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(msg, relation.getOut(), relation.getType());
}
} else {
for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut();
+ log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
switch (target.getEntityType()) {
case RULE_NODE:
enqueueAndForwardMsgCopyToNode(msg, target, relation.getType());
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
index 807c729..f5521a0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.server.actors.ruleChain;
-import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -25,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
-@Slf4j
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
private final RuleChainId ruleChainId;
@@ -62,7 +60,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
- log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
+ }
try {
processor.onRuleToSelfMsg(msg);
increaseMessagesProcessedCount();
@@ -72,7 +72,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
- log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
+ }
try {
processor.onRuleChainToRuleNodeMsg(msg);
increaseMessagesProcessedCount();
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index 5d226ef..a4bd1d0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -75,7 +75,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
@Override
- public void stop(ActorContext context) throws Exception {
+ public void stop(ActorContext context) {
if (tbNode != null) {
tbNode.destroy();
}
@@ -83,7 +83,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
@Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+ public void onClusterEventMsg(ClusterEventMsg msg) {
}
@@ -111,6 +111,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
}
+ @Override
+ public String getComponentName() {
+ return ruleNode.getName();
+ }
+
private TbNode initComponent(RuleNode ruleNode) throws Exception {
Class<?> componentClazz = Class.forName(ruleNode.getType());
TbNode tbNode = (TbNode) (componentClazz.newInstance());
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index ff54fc8..1f084e1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -31,7 +31,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
/**
* @author Andrew Shvayka
*/
-@Slf4j
public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
private long lastPersistedErrorTs = 0L;
@@ -54,6 +53,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void preStart() {
try {
+ log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
processor.start(context());
logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) {
@@ -78,6 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void postStop() {
try {
+ log.debug("[{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
processor.stop(context());
logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
} catch (Exception e) {
@@ -88,6 +89,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
}
protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
+ log.debug("[{}][{}][{}] onComponentLifecycleMsg: [{}]", tenantId, id, id.getEntityType(), msg.getEvent());
try {
switch (msg.getEvent()) {
case CREATED:
@@ -148,9 +150,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
private void logAndPersist(String method, Exception e, boolean critical) {
errorsOccurred++;
if (critical) {
- log.warn("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+ log.warn("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
} else {
- log.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+ log.debug("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
}
long ts = System.currentTimeMillis();
if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index c9f1dcd..1c7e2d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -17,15 +17,16 @@ package org.thingsboard.server.actors.service;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.msg.TbActorMsg;
-@Slf4j
+
public abstract class ContextAwareActor extends UntypedActor {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
public static final int ENTITY_PACK_LIMIT = 1024;
protected final ActorSystemContext systemContext;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 9e5d063..46a76e6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -44,6 +44,8 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
this.entityId = id;
}
+ public abstract String getComponentName();
+
public abstract void start(ActorContext context) throws Exception;
public abstract void stop(ActorContext context) throws Exception;
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index b76e298..6c65dbe 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -48,7 +48,6 @@ import scala.concurrent.duration.Duration;
import java.util.HashMap;
import java.util.Map;
-@Slf4j
public class TenantActor extends RuleChainManagerActor {
private final TenantId tenantId;
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index 05ed285..edd7b35 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -71,7 +71,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
private int maxErrors;
private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
- protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
+ private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
@@ -100,7 +100,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
- responseBuilder.groupId("rule-engine-node");
+ responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new RemoteJsResponseDecoder());
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 6c903cc..a8ab2cd 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -149,6 +149,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
records.forEach(record -> {
try {
ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
+ log.trace("Forwarding message to rule engine {}", toRuleEngineMsg);
if (toRuleEngineMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
}
@@ -175,18 +176,21 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
@Override
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
- notificationsProducer.send(notificationsTopic + "." + nodeId,
- new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(),
- ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
- , new QueueCallbackAdaptor(onSuccess, onFailure));
+ String topic = notificationsTopic + "." + nodeId;
+ UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
+ ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build();
+ log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg);
+ notificationsProducer.send(topic, sessionId.toString(), transportMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
}
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
if (address.isPresent()) {
+ log.trace("[{}] Pushing message to remote server: {}", address.get(), toDeviceActorMsg);
rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
} else {
+ log.trace("Pushing message to local server: {}", toDeviceActorMsg);
actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index bd42f31..ee652f4 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -77,9 +77,10 @@ public class TBKafkaProducerTemplate<T> {
result.all().get();
} catch (Exception e) {
if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
- log.trace("[{}] Topic already exists: ", defaultTopic);
+ log.trace("[{}] Topic already exists.", defaultTopic);
} else {
- log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+ log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+ throw new RuntimeException(e);
}
}
//Maybe this should not be cached, but we don't plan to change size of partitions
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index b0aca0f..b18ccc2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -23,6 +23,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -83,7 +86,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
result.all().get();
} catch (Exception e) {
- log.trace("Failed to create topic: {}", e.getMessage(), e);
+ if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
+ log.trace("[{}] Topic already exists. ", responseTemplate.getTopic());
+ } else {
+ log.info("[{}] Failed to create topic: {}", responseTemplate.getTopic(), e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
}
this.requestTemplate.init();
tickTs = System.currentTimeMillis();
@@ -96,6 +105,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
log.trace("Polling responses completed, consumer records count [{}]", responses.count());
}
responses.forEach(response -> {
+ log.trace("Received response to Kafka Template request: {}", response);
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
Response decodedResponse = null;
UUID requestId = null;
@@ -167,7 +177,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
pendingRequests.putIfAbsent(requestId, responseMetaData);
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
- requestTemplate.send(key, request, headers, null);
+ requestTemplate.send(key, request, headers, (metadata, exception) -> {
+ if (exception != null) {
+ log.trace("[{}] Failed to post the request", requestId, exception);
+ } else {
+ log.trace("[{}] Posted the request", requestId, metadata);
+ }
+ });
return future;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
index 3001e12..1c9f17a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
@@ -20,14 +20,29 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.EntityType;
+import javax.persistence.Column;
+import javax.persistence.Embeddable;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
import java.io.Serializable;
+import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_KEY_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_TYPE_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
+
@Data
@AllArgsConstructor
@NoArgsConstructor
+@Embeddable
public class AttributeKvCompositeKey implements Serializable {
+ @Enumerated(EnumType.STRING)
+ @Column(name = ENTITY_TYPE_COLUMN)
private EntityType entityType;
+ @Column(name = ENTITY_ID_COLUMN)
private String entityId;
+ @Column(name = ATTRIBUTE_TYPE_COLUMN)
private String attributeType;
+ @Column(name = ATTRIBUTE_KEY_COLUMN)
private String attributeKey;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
index 587a314..515c86c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.model.ToData;
import javax.persistence.Column;
+import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
@@ -48,25 +49,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUM
@Data
@Entity
@Table(name = "attribute_kv")
-@IdClass(AttributeKvCompositeKey.class)
public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
- @Id
- @Enumerated(EnumType.STRING)
- @Column(name = ENTITY_TYPE_COLUMN)
- private EntityType entityType;
-
- @Id
- @Column(name = ENTITY_ID_COLUMN)
- private String entityId;
-
- @Id
- @Column(name = ATTRIBUTE_TYPE_COLUMN)
- private String attributeType;
-
- @Id
- @Column(name = ATTRIBUTE_KEY_COLUMN)
- private String attributeKey;
+ @EmbeddedId
+ private AttributeKvCompositeKey id;
@Column(name = BOOLEAN_VALUE_COLUMN)
private Boolean booleanValue;
@@ -87,13 +73,13 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable
public AttributeKvEntry toData() {
KvEntry kvEntry = null;
if (strValue != null) {
- kvEntry = new StringDataEntry(attributeKey, strValue);
+ kvEntry = new StringDataEntry(id.getAttributeKey(), strValue);
} else if (booleanValue != null) {
- kvEntry = new BooleanDataEntry(attributeKey, booleanValue);
+ kvEntry = new BooleanDataEntry(id.getAttributeKey(), booleanValue);
} else if (doubleValue != null) {
- kvEntry = new DoubleDataEntry(attributeKey, doubleValue);
+ kvEntry = new DoubleDataEntry(id.getAttributeKey(), doubleValue);
} else if (longValue != null) {
- kvEntry = new LongDataEntry(attributeKey, longValue);
+ kvEntry = new LongDataEntry(id.getAttributeKey(), longValue);
}
return new BaseAttributeKvEntry(kvEntry, lastUpdateTs);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
index c76cefe..d716886 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
@@ -15,7 +15,9 @@
*/
package org.thingsboard.server.dao.sql.attributes;
+import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
@@ -26,8 +28,11 @@ import java.util.List;
@SqlDao
public interface AttributeKvRepository extends CrudRepository<AttributeKvEntity, AttributeKvCompositeKey> {
- List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(EntityType entityType,
- String entityId,
- String attributeType);
+ @Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityType = :entityType " +
+ "AND a.id.entityId = :entityId " +
+ "AND a.id.attributeType = :attributeType")
+ List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(@Param("entityType") EntityType entityType,
+ @Param("entityId") String entityId,
+ @Param("attributeType") String attributeType);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
index 0dabf4c..4ac0c0c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
@@ -79,10 +79,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
@Override
public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
AttributeKvEntity entity = new AttributeKvEntity();
- entity.setEntityType(entityId.getEntityType());
- entity.setEntityId(fromTimeUUID(entityId.getId()));
- entity.setAttributeType(attributeType);
- entity.setAttributeKey(attribute.getKey());
+ entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, attribute.getKey()));
entity.setLastUpdateTs(attribute.getLastUpdateTs());
entity.setStrValue(attribute.getStrValue().orElse(null));
entity.setDoubleValue(attribute.getDoubleValue().orElse(null));
@@ -100,10 +97,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
.stream()
.map(key -> {
AttributeKvEntity entityToDelete = new AttributeKvEntity();
- entityToDelete.setEntityType(entityId.getEntityType());
- entityToDelete.setEntityId(fromTimeUUID(entityId.getId()));
- entityToDelete.setAttributeType(attributeType);
- entityToDelete.setAttributeKey(key);
+ entityToDelete.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, key));
return entityToDelete;
}).collect(Collectors.toList());
msa/black-box-tests/README.md 4(+3 -1)
diff --git a/msa/black-box-tests/README.md b/msa/black-box-tests/README.md
index c26d9c5..044fe00 100644
--- a/msa/black-box-tests/README.md
+++ b/msa/black-box-tests/README.md
@@ -18,4 +18,6 @@ As result, in REPOSITORY column, next images should be present:
- Run the black box tests in the [msa/black-box-tests](../black-box-tests) directory:
- mvn clean install -DblackBoxTests.skip=false
\ No newline at end of file
+ mvn clean install -DblackBoxTests.skip=false
+
+
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
index 5ebab78..4c3ac83 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
@@ -33,6 +33,9 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.junit.*;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.thingsboard.client.tools.RestClient;
import org.thingsboard.server.common.data.Device;
@@ -60,6 +63,33 @@ public abstract class AbstractContainerTest {
restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
}
+ @Rule
+ public TestRule watcher = new TestWatcher() {
+ protected void starting(Description description) {
+ log.info("=================================================");
+ log.info("STARTING TEST: {}" , description.getMethodName());
+ log.info("=================================================");
+ }
+
+ /**
+ * Invoked when a test succeeds
+ */
+ protected void succeeded(Description description) {
+ log.info("=================================================");
+ log.info("SUCCEEDED TEST: {}" , description.getMethodName());
+ log.info("=================================================");
+ }
+
+ /**
+ * Invoked when a test fails
+ */
+ protected void failed(Throwable e, Description description) {
+ log.info("=================================================");
+ log.info("FAILED TEST: {}" , description.getMethodName(), e);
+ log.info("=================================================");
+ }
+ };
+
protected Device createDevice(String name) {
return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
}
@@ -82,6 +112,7 @@ public abstract class AbstractContainerTest {
JsonObject wsRequest = new JsonObject();
wsRequest.add(property.toString(), cmd);
wsClient.send(wsRequest.toString());
+ wsClient.waitForFirstReply();
return wsClient;
}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
index d889d2c..9918963 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
@@ -28,6 +28,9 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
@@ -65,6 +68,7 @@ public class MqttClientTest extends AbstractContainerTest {
MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking();
Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@@ -91,6 +95,7 @@ public class MqttClientTest extends AbstractContainerTest {
MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking();
Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@@ -120,6 +125,7 @@ public class MqttClientTest extends AbstractContainerTest {
clientAttributes.addProperty("attr4", 73);
mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking();
Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@@ -168,6 +174,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
+ log.info("Received telemetry: {}", attributes);
Assert.assertEquals(1, attributes.getClient().size());
Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
@@ -281,6 +288,7 @@ public class MqttClientTest extends AbstractContainerTest {
// Create a new root rule chain
RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
+ TimeUnit.SECONDS.sleep(3);
// Send the request to the server
JsonObject clientRequest = new JsonObject();
clientRequest.addProperty("method", "getResponse");
@@ -360,12 +368,12 @@ public class MqttClientTest extends AbstractContainerTest {
return defaultRuleChain.get().getId();
}
- private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException {
+ private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId());
MqttClient mqttClient = MqttClient.create(clientConfig, listener);
- mqttClient.connect("localhost", 1883).sync();
+ mqttClient.connect("localhost", 1883).get();
return mqttClient;
}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
index a9835ed..fa3a63a 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
@@ -31,9 +31,11 @@ public class WsClient extends WebSocketClient {
private static final ObjectMapper mapper = new ObjectMapper();
private WsTelemetryResponse message;
- private CountDownLatch latch = new CountDownLatch(1);;
+ private volatile boolean firstReplyReceived;
+ private CountDownLatch firstReply = new CountDownLatch(1);
+ private CountDownLatch latch = new CountDownLatch(1);
- public WsClient(URI serverUri) {
+ WsClient(URI serverUri) {
super(serverUri);
}
@@ -43,14 +45,19 @@ public class WsClient extends WebSocketClient {
@Override
public void onMessage(String message) {
- try {
- WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class);
- if (!response.getData().isEmpty()) {
- this.message = response;
- latch.countDown();
+ if (!firstReplyReceived) {
+ firstReplyReceived = true;
+ firstReply.countDown();
+ } else {
+ try {
+ WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class);
+ if (!response.getData().isEmpty()) {
+ this.message = response;
+ latch.countDown();
+ }
+ } catch (IOException e) {
+ log.error("ws message can't be read");
}
- } catch (IOException e) {
- log.error("ws message can't be read");
}
}
@@ -73,4 +80,13 @@ public class WsClient extends WebSocketClient {
}
return null;
}
+
+ void waitForFirstReply() {
+ try {
+ firstReply.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Timeout, ws message wasn't received");
+ throw new RuntimeException(e);
+ }
+ }
}