thingsboard-aplcache

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 15e9e9d..6df37d6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -51,7 +51,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-@Slf4j
 public class AppActor extends RuleChainManagerActor {
 
     private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index f450027..f53a410 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -15,7 +15,6 @@
  */
 package org.thingsboard.server.actors.device;
 
-import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -29,7 +28,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
 
-@Slf4j
 public class DeviceActor extends ContextAwareActor {
 
     private final DeviceActorMessageProcessor processor;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 47417d1..41d1ea0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -348,9 +348,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
         int requestId = msg.getMsg().getRequestId();
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
         if (data != null) {
+            log.debug("[{}] Pushing reply to [{}][{}]!", deviceId, data.getNodeId(), data.getSessionId());
             sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
                             .setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
                     , data.getSessionId(), data.getNodeId());
+        } else {
+            log.debug("[{}][{}] Pending RPC request to server not found!", deviceId, requestId);
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 1eba066..760d4a6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -40,7 +40,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
 
     @Override
     public void onConnected(GrpcSession session) {
-        log.info("{} session started -> {}", getType(session), session.getRemoteServer());
+        log.info("[{}][{}] session started", session.getRemoteServer(), getType(session));
         if (!session.isClient()) {
             manager.tell(new RpcSessionConnectedMsg(session.getRemoteServer(), session.getSessionId()), self);
         }
@@ -48,21 +48,19 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
 
     @Override
     public void onDisconnected(GrpcSession session) {
-        log.info("{} session closed -> {}", getType(session), session.getRemoteServer());
+        log.info("[{}][{}] session closed", session.getRemoteServer(), getType(session));
         manager.tell(new RpcSessionDisconnectedMsg(session.isClient(), session.getRemoteServer()), self);
     }
 
     @Override
     public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
-        log.trace("{} Service [{}] received session actor msg {}", getType(session),
-                session.getRemoteServer(),
-                clusterMessage);
+        log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
         service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
     }
 
     @Override
     public void onError(GrpcSession session, Throwable t) {
-        log.warn("{} session got error -> {}", getType(session), session.getRemoteServer(), t);
+        log.warn("[{}][{}] session got error -> {}", session.getRemoteServer(), getType(session), t);
         manager.tell(new RpcSessionClosedMsg(session.isClient(), session.getRemoteServer()), self);
         session.close();
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 922d2ba..9fa087e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -36,7 +36,6 @@ import java.util.*;
 /**
  * @author Andrew Shvayka
  */
-@Slf4j
 public class RpcManagerActor extends ContextAwareActor {
 
     private final Map<ServerAddress, SessionActorInfo> sessionActors;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index e0f7e4a..e4e82c5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -69,6 +69,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     private RuleNodeId firstId;
     private RuleNodeCtx firstNode;
     private boolean started;
+    private String ruleChainName;
 
     RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
             , ActorRef parent, ActorRef self) {
@@ -78,15 +79,24 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         this.nodeActors = new HashMap<>();
         this.nodeRoutes = new HashMap<>();
         this.service = systemContext.getRuleChainService();
+        this.ruleChainName = ruleChainId.toString();
     }
 
     @Override
-    public void start(ActorContext context) throws Exception {
+    public String getComponentName() {
+        return null;
+    }
+
+    @Override
+    public void start(ActorContext context) {
         if (!started) {
             RuleChain ruleChain = service.findRuleChainById(entityId);
+            ruleChainName = ruleChain.getName();
             List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+            log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
             // Creating and starting the actors;
             for (RuleNode ruleNode : ruleNodeList) {
+                log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
                 ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
                 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
             }
@@ -98,16 +108,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     }
 
     @Override
-    public void onUpdate(ActorContext context) throws Exception {
+    public void onUpdate(ActorContext context) {
         RuleChain ruleChain = service.findRuleChainById(entityId);
+        ruleChainName = ruleChain.getName();
         List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
-
+        log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
         for (RuleNode ruleNode : ruleNodeList) {
             RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
             if (existing == null) {
+                log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
                 ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
                 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
             } else {
+                log.trace("[{}][{}] Updating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
                 existing.setSelf(ruleNode);
                 existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
             }
@@ -116,6 +129,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
         List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
         removedRules.forEach(ruleNodeId -> {
+            log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
             RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
             removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self);
         });
@@ -124,7 +138,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     }
 
     @Override
-    public void stop(ActorContext context) throws Exception {
+    public void stop(ActorContext context) {
+        log.trace("[{}][{}] Stopping rule chain with {} nodes", tenantId, entityId, nodeActors.size());
         nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
         nodeActors.clear();
         nodeRoutes.clear();
@@ -133,7 +148,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     }
 
     @Override
-    public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+    public void onClusterEventMsg(ClusterEventMsg msg) {
 
     }
 
@@ -150,10 +165,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         // Populating the routes map;
         for (RuleNode ruleNode : ruleNodeList) {
             List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
+            log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
             if (relations.size() == 0) {
                 nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
             } else {
                 for (EntityRelation relation : relations) {
+                    log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
                     if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
                         RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
                         if (ruleNodeCtx == null) {
@@ -232,17 +249,20 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         int relationsCount = relations.size();
         EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
         if (relationsCount == 0) {
+            log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
             if (ackId != null) {
 //                TODO: Ack this message in Kafka
 //                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
             }
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
+                log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
                 pushToTarget(msg, relation.getOut(), relation.getType());
             }
         } else {
             for (RuleNodeRelation relation : relations) {
                 EntityId target = relation.getOut();
+                log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
                 switch (target.getEntityType()) {
                     case RULE_NODE:
                         enqueueAndForwardMsgCopyToNode(msg, target, relation.getType());
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
index 807c729..f5521a0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -15,7 +15,6 @@
  */
 package org.thingsboard.server.actors.ruleChain;
 
-import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ComponentActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -25,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 
-@Slf4j
 public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
 
     private final RuleChainId ruleChainId;
@@ -62,7 +60,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
     }
 
     private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
-        log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
+        }
         try {
             processor.onRuleToSelfMsg(msg);
             increaseMessagesProcessedCount();
@@ -72,7 +72,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
     }
 
     private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
-        log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
+        }
         try {
             processor.onRuleChainToRuleNodeMsg(msg);
             increaseMessagesProcessedCount();
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index 5d226ef..a4bd1d0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -75,7 +75,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
     }
 
     @Override
-    public void stop(ActorContext context) throws Exception {
+    public void stop(ActorContext context) {
         if (tbNode != null) {
             tbNode.destroy();
         }
@@ -83,7 +83,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
     }
 
     @Override
-    public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+    public void onClusterEventMsg(ClusterEventMsg msg) {
 
     }
 
@@ -111,6 +111,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
         }
     }
 
+    @Override
+    public String getComponentName() {
+        return ruleNode.getName();
+    }
+
     private TbNode initComponent(RuleNode ruleNode) throws Exception {
         Class<?> componentClazz = Class.forName(ruleNode.getType());
         TbNode tbNode = (TbNode) (componentClazz.newInstance());
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index ff54fc8..1f084e1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -31,7 +31,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 /**
  * @author Andrew Shvayka
  */
-@Slf4j
 public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
 
     private long lastPersistedErrorTs = 0L;
@@ -54,6 +53,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     @Override
     public void preStart() {
         try {
+            log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
             processor.start(context());
             logLifecycleEvent(ComponentLifecycleEvent.STARTED);
             if (systemContext.isStatisticsEnabled()) {
@@ -78,6 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     @Override
     public void postStop() {
         try {
+            log.debug("[{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
             processor.stop(context());
             logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
         } catch (Exception e) {
@@ -88,6 +89,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     }
 
     protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
+        log.debug("[{}][{}][{}] onComponentLifecycleMsg: [{}]", tenantId, id, id.getEntityType(), msg.getEvent());
         try {
             switch (msg.getEvent()) {
                 case CREATED:
@@ -148,9 +150,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     private void logAndPersist(String method, Exception e, boolean critical) {
         errorsOccurred++;
         if (critical) {
-            log.warn("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+            log.warn("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
         } else {
-            log.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+            log.debug("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
         }
         long ts = System.currentTimeMillis();
         if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index c9f1dcd..1c7e2d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -17,15 +17,16 @@ package org.thingsboard.server.actors.service;
 
 import akka.actor.Terminated;
 import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.msg.TbActorMsg;
 
-@Slf4j
+
 public abstract class ContextAwareActor extends UntypedActor {
 
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
     public static final int ENTITY_PACK_LIMIT = 1024;
 
     protected final ActorSystemContext systemContext;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 9e5d063..46a76e6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -44,6 +44,8 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
         this.entityId = id;
     }
 
+    public abstract String getComponentName();
+
     public abstract void start(ActorContext context) throws Exception;
 
     public abstract void stop(ActorContext context) throws Exception;
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index b76e298..6c65dbe 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -48,7 +48,6 @@ import scala.concurrent.duration.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
-@Slf4j
 public class TenantActor extends RuleChainManagerActor {
 
     private final TenantId tenantId;
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index 05ed285..edd7b35 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -71,7 +71,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
     private int maxErrors;
 
     private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
-    protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
+    private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
 
     @PostConstruct
     public void init() {
@@ -100,7 +100,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
         responseBuilder.settings(kafkaSettings);
         responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
         responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
-        responseBuilder.groupId("rule-engine-node");
+        responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
         responseBuilder.autoCommit(true);
         responseBuilder.autoCommitIntervalMs(autoCommitInterval);
         responseBuilder.decoder(new RemoteJsResponseDecoder());
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 6c903cc..a8ab2cd 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -149,6 +149,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
                     records.forEach(record -> {
                         try {
                             ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
+                            log.trace("Forwarding message to rule engine {}", toRuleEngineMsg);
                             if (toRuleEngineMsg.hasToDeviceActorMsg()) {
                                 forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
                             }
@@ -175,18 +176,21 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
 
     @Override
     public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
-        notificationsProducer.send(notificationsTopic + "." + nodeId,
-                new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(),
-                ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
-                , new QueueCallbackAdaptor(onSuccess, onFailure));
+        String topic = notificationsTopic + "." + nodeId;
+        UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
+        ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build();
+        log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg);
+        notificationsProducer.send(topic, sessionId.toString(), transportMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
     }
 
     private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
         TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
         Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
         if (address.isPresent()) {
+            log.trace("[{}] Pushing message to remote server: {}", address.get(), toDeviceActorMsg);
             rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
         } else {
+            log.trace("Pushing message to local server: {}", toDeviceActorMsg);
             actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
         }
     }
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index bd42f31..ee652f4 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -77,9 +77,10 @@ public class TBKafkaProducerTemplate<T> {
             result.all().get();
         } catch (Exception e) {
             if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
-                log.trace("[{}] Topic already exists: ", defaultTopic);
+                log.trace("[{}] Topic already exists.", defaultTopic);
             } else {
-                log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+                log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+                throw new RuntimeException(e);
             }
         }
         //Maybe this should not be cached, but we don't plan to change size of partitions
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index b0aca0f..b18ccc2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -23,6 +23,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 
@@ -83,7 +86,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
             CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
             result.all().get();
         } catch (Exception e) {
-            log.trace("Failed to create topic: {}", e.getMessage(), e);
+            if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
+                log.trace("[{}] Topic already exists. ", responseTemplate.getTopic());
+            } else {
+                log.info("[{}] Failed to create topic: {}", responseTemplate.getTopic(), e.getMessage(), e);
+                throw new RuntimeException(e);
+            }
+
         }
         this.requestTemplate.init();
         tickTs = System.currentTimeMillis();
@@ -96,6 +105,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
                     log.trace("Polling responses completed, consumer records count [{}]", responses.count());
                 }
                 responses.forEach(response -> {
+                    log.trace("Received response to Kafka Template request: {}", response);
                     Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
                     Response decodedResponse = null;
                     UUID requestId = null;
@@ -167,7 +177,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
         pendingRequests.putIfAbsent(requestId, responseMetaData);
         request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
         log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
-        requestTemplate.send(key, request, headers, null);
+        requestTemplate.send(key, request, headers, (metadata, exception) -> {
+            if (exception != null) {
+                log.trace("[{}] Failed to post the request", requestId, exception);
+            } else {
+                log.trace("[{}] Posted the request", requestId, metadata);
+            }
+        });
         return future;
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
index 3001e12..1c9f17a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
@@ -20,14 +20,29 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.thingsboard.server.common.data.EntityType;
 
+import javax.persistence.Column;
+import javax.persistence.Embeddable;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
 import java.io.Serializable;
 
+import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_KEY_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_TYPE_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
+
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
+@Embeddable
 public class AttributeKvCompositeKey implements Serializable {
+    @Enumerated(EnumType.STRING)
+    @Column(name = ENTITY_TYPE_COLUMN)
     private EntityType entityType;
+    @Column(name = ENTITY_ID_COLUMN)
     private String entityId;
+    @Column(name = ATTRIBUTE_TYPE_COLUMN)
     private String attributeType;
+    @Column(name = ATTRIBUTE_KEY_COLUMN)
     private String attributeKey;
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
index 587a314..515c86c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.dao.model.ToData;
 
 import javax.persistence.Column;
+import javax.persistence.EmbeddedId;
 import javax.persistence.Entity;
 import javax.persistence.EnumType;
 import javax.persistence.Enumerated;
@@ -48,25 +49,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUM
 @Data
 @Entity
 @Table(name = "attribute_kv")
-@IdClass(AttributeKvCompositeKey.class)
 public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
 
-    @Id
-    @Enumerated(EnumType.STRING)
-    @Column(name = ENTITY_TYPE_COLUMN)
-    private EntityType entityType;
-
-    @Id
-    @Column(name = ENTITY_ID_COLUMN)
-    private String entityId;
-
-    @Id
-    @Column(name = ATTRIBUTE_TYPE_COLUMN)
-    private String attributeType;
-
-    @Id
-    @Column(name = ATTRIBUTE_KEY_COLUMN)
-    private String attributeKey;
+    @EmbeddedId
+    private AttributeKvCompositeKey id;
 
     @Column(name = BOOLEAN_VALUE_COLUMN)
     private Boolean booleanValue;
@@ -87,13 +73,13 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable
     public AttributeKvEntry toData() {
         KvEntry kvEntry = null;
         if (strValue != null) {
-            kvEntry = new StringDataEntry(attributeKey, strValue);
+            kvEntry = new StringDataEntry(id.getAttributeKey(), strValue);
         } else if (booleanValue != null) {
-            kvEntry = new BooleanDataEntry(attributeKey, booleanValue);
+            kvEntry = new BooleanDataEntry(id.getAttributeKey(), booleanValue);
         } else if (doubleValue != null) {
-            kvEntry = new DoubleDataEntry(attributeKey, doubleValue);
+            kvEntry = new DoubleDataEntry(id.getAttributeKey(), doubleValue);
         } else if (longValue != null) {
-            kvEntry = new LongDataEntry(attributeKey, longValue);
+            kvEntry = new LongDataEntry(id.getAttributeKey(), longValue);
         }
         return new BaseAttributeKvEntry(kvEntry, lastUpdateTs);
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
index c76cefe..d716886 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
@@ -15,7 +15,9 @@
  */
 package org.thingsboard.server.dao.sql.attributes;
 
+import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
 import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
@@ -26,8 +28,11 @@ import java.util.List;
 @SqlDao
 public interface AttributeKvRepository extends CrudRepository<AttributeKvEntity, AttributeKvCompositeKey> {
 
-    List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(EntityType entityType,
-                                                                           String entityId,
-                                                                           String attributeType);
+    @Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityType = :entityType " +
+            "AND a.id.entityId = :entityId " +
+            "AND a.id.attributeType = :attributeType")
+    List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(@Param("entityType") EntityType entityType,
+                                                                           @Param("entityId") String entityId,
+                                                                           @Param("attributeType") String attributeType);
 }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
index 0dabf4c..4ac0c0c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
@@ -79,10 +79,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
     @Override
     public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
         AttributeKvEntity entity = new AttributeKvEntity();
-        entity.setEntityType(entityId.getEntityType());
-        entity.setEntityId(fromTimeUUID(entityId.getId()));
-        entity.setAttributeType(attributeType);
-        entity.setAttributeKey(attribute.getKey());
+        entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, attribute.getKey()));
         entity.setLastUpdateTs(attribute.getLastUpdateTs());
         entity.setStrValue(attribute.getStrValue().orElse(null));
         entity.setDoubleValue(attribute.getDoubleValue().orElse(null));
@@ -100,10 +97,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
                 .stream()
                 .map(key -> {
                     AttributeKvEntity entityToDelete = new AttributeKvEntity();
-                    entityToDelete.setEntityType(entityId.getEntityType());
-                    entityToDelete.setEntityId(fromTimeUUID(entityId.getId()));
-                    entityToDelete.setAttributeType(attributeType);
-                    entityToDelete.setAttributeKey(key);
+                    entityToDelete.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, key));
                     return entityToDelete;
                 }).collect(Collectors.toList());
 
diff --git a/msa/black-box-tests/README.md b/msa/black-box-tests/README.md
index c26d9c5..044fe00 100644
--- a/msa/black-box-tests/README.md
+++ b/msa/black-box-tests/README.md
@@ -18,4 +18,6 @@ As result, in REPOSITORY column, next images should be present:
 
 - Run the black box tests in the [msa/black-box-tests](../black-box-tests) directory:
 
-        mvn clean install -DblackBoxTests.skip=false
\ No newline at end of file
+        mvn clean install -DblackBoxTests.skip=false
+
+
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
index 5ebab78..4c3ac83 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
@@ -33,6 +33,9 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.SSLContexts;
 import org.junit.*;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
 import org.thingsboard.client.tools.RestClient;
 import org.thingsboard.server.common.data.Device;
@@ -60,6 +63,33 @@ public abstract class AbstractContainerTest {
         restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
     }
 
+    @Rule
+    public TestRule watcher = new TestWatcher() {
+        protected void starting(Description description) {
+            log.info("=================================================");
+            log.info("STARTING TEST: {}" , description.getMethodName());
+            log.info("=================================================");
+        }
+
+        /**
+         * Invoked when a test succeeds
+         */
+        protected void succeeded(Description description) {
+            log.info("=================================================");
+            log.info("SUCCEEDED TEST: {}" , description.getMethodName());
+            log.info("=================================================");
+        }
+
+        /**
+         * Invoked when a test fails
+         */
+        protected void failed(Throwable e, Description description) {
+            log.info("=================================================");
+            log.info("FAILED TEST: {}" , description.getMethodName(), e);
+            log.info("=================================================");
+        }
+    };
+
     protected Device createDevice(String name) {
         return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
     }
@@ -82,6 +112,7 @@ public abstract class AbstractContainerTest {
         JsonObject wsRequest = new JsonObject();
         wsRequest.add(property.toString(), cmd);
         wsClient.send(wsRequest.toString());
+        wsClient.waitForFirstReply();
         return wsClient;
     }
 
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
index d889d2c..9918963 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
@@ -28,6 +28,9 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.*;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpMethod;
 import org.springframework.http.ResponseEntity;
@@ -65,6 +68,7 @@ public class MqttClientTest extends AbstractContainerTest {
         MqttClient mqttClient = getMqttClient(deviceCredentials, null);
         mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
         WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+        log.info("Received telemetry: {}", actualLatestTelemetry);
         wsClient.closeBlocking();
 
         Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@@ -91,6 +95,7 @@ public class MqttClientTest extends AbstractContainerTest {
         MqttClient mqttClient = getMqttClient(deviceCredentials, null);
         mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
         WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+        log.info("Received telemetry: {}", actualLatestTelemetry);
         wsClient.closeBlocking();
 
         Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@@ -120,6 +125,7 @@ public class MqttClientTest extends AbstractContainerTest {
         clientAttributes.addProperty("attr4", 73);
         mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
         WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+        log.info("Received telemetry: {}", actualLatestTelemetry);
         wsClient.closeBlocking();
 
         Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@@ -168,6 +174,7 @@ public class MqttClientTest extends AbstractContainerTest {
         mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
         MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
         AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
+        log.info("Received telemetry: {}", attributes);
 
         Assert.assertEquals(1, attributes.getClient().size());
         Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
@@ -281,6 +288,7 @@ public class MqttClientTest extends AbstractContainerTest {
         // Create a new root rule chain
         RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
 
+        TimeUnit.SECONDS.sleep(3);
         // Send the request to the server
         JsonObject clientRequest = new JsonObject();
         clientRequest.addProperty("method", "getResponse");
@@ -360,12 +368,12 @@ public class MqttClientTest extends AbstractContainerTest {
         return defaultRuleChain.get().getId();
     }
 
-    private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException {
+    private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
         MqttClientConfig clientConfig = new MqttClientConfig();
         clientConfig.setClientId("MQTT client from test");
         clientConfig.setUsername(deviceCredentials.getCredentialsId());
         MqttClient mqttClient = MqttClient.create(clientConfig, listener);
-        mqttClient.connect("localhost", 1883).sync();
+        mqttClient.connect("localhost", 1883).get();
         return mqttClient;
     }
 
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
index a9835ed..fa3a63a 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
@@ -31,9 +31,11 @@ public class WsClient extends WebSocketClient {
     private static final ObjectMapper mapper = new ObjectMapper();
     private WsTelemetryResponse message;
 
-    private CountDownLatch latch = new CountDownLatch(1);;
+    private volatile boolean firstReplyReceived;
+    private CountDownLatch firstReply = new CountDownLatch(1);
+    private CountDownLatch latch = new CountDownLatch(1);
 
-    public WsClient(URI serverUri) {
+    WsClient(URI serverUri) {
         super(serverUri);
     }
 
@@ -43,14 +45,19 @@ public class WsClient extends WebSocketClient {
 
     @Override
     public void onMessage(String message) {
-        try {
-            WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class);
-            if (!response.getData().isEmpty()) {
-                this.message = response;
-                latch.countDown();
+        if (!firstReplyReceived) {
+            firstReplyReceived = true;
+            firstReply.countDown();
+        } else {
+            try {
+                WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class);
+                if (!response.getData().isEmpty()) {
+                    this.message = response;
+                    latch.countDown();
+                }
+            } catch (IOException e) {
+                log.error("ws message can't be read");
             }
-        } catch (IOException e) {
-            log.error("ws message can't be read");
         }
     }
 
@@ -73,4 +80,13 @@ public class WsClient extends WebSocketClient {
         }
         return null;
     }
+
+    void waitForFirstReply() {
+        try {
+            firstReply.await(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Timeout, ws message wasn't received");
+            throw new RuntimeException(e);
+        }
+    }
 }