thingsboard-aplcache

Details

diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
index 225ccdc..1949877 100644
--- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
+++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json
@@ -69,6 +69,18 @@
         "configuration": {
           "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
         }
+      },
+      {
+        "additionalInfo": {
+          "layoutX": 825,
+          "layoutY": 468
+        },
+        "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
+        "name": "Test",
+        "debugMode": true,
+        "configuration": {
+          "timeoutInSeconds": 60
+        }
       }
     ],
     "connections": [
@@ -91,6 +103,11 @@
         "fromIndex": 2,
         "toIndex": 3,
         "type": "RPC Request"
+      },
+      {
+        "fromIndex": 2,
+        "toIndex": 5,
+        "type": "RPC Request to Device"
       }
     ],
     "ruleChainConnections": null
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index bcadd0c..079f66d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -225,9 +225,12 @@ class DefaultTbContext implements TbContext {
 
             @Override
             public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
-                ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
-                        src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
+                ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), nodeCtx.getTenantId(), src.getDeviceId(),
+                        src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
                 mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> {
+                    if (src.isRestApiCall()) {
+                        mainCtx.getDeviceRpcService().processRestAPIRpcResponseFromRuleEngine(response);
+                    }
                     consumer.accept(RuleEngineDeviceRpcResponse.builder()
                             .deviceId(src.getDeviceId())
                             .requestId(src.getRequestId())
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
index e39a3f6..a0d388c 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -128,7 +128,7 @@ public class RpcController extends BaseController {
                             timeout,
                             body
                     );
-                    deviceRpcService.processRpcRequestToDevice(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
+                    deviceRpcService.processRestAPIRpcRequestToRuleEngine(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
                 }
 
                 @Override
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 678495b..8e86661 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -15,6 +15,10 @@
  */
 package org.thingsboard.server.service.rpc;
 
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -22,22 +26,23 @@ import org.springframework.stereotype.Service;
 import org.thingsboard.rule.engine.api.RpcError;
 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
-import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
-import org.thingsboard.server.service.telemetry.sub.Subscription;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -53,6 +58,8 @@ import java.util.function.Consumer;
 @Slf4j
 public class DefaultDeviceRpcService implements DeviceRpcService {
 
+    private static final ObjectMapper json = new ObjectMapper();
+
     @Autowired
     private ClusterRoutingService routingService;
 
@@ -64,7 +71,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
 
     private ScheduledExecutorService rpcCallBackExecutor;
 
-    private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localToRuleEngineRpcRequests = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localToDeviceRpcRequests = new ConcurrentHashMap<>();
 
     @PostConstruct
     public void initExecutor() {
@@ -79,28 +87,40 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     }
 
     @Override
+    public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
+        log.trace("[{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getDeviceId());
+        UUID requestId = request.getId();
+        localToRuleEngineRpcRequests.put(requestId, responseConsumer);
+        sendRpcRequestToRuleEngine(request);
+        scheduleTimeout(request, requestId, localToRuleEngineRpcRequests);
+    }
+
+    @Override
+    public void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response) {
+        UUID requestId = response.getId();
+        Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
+        if (consumer != null) {
+            consumer.accept(response);
+        } else {
+            log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+        }
+    }
+
+    @Override
     public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
-        log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
-        sendRpcRequest(request);
+        log.trace("[{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getDeviceId());
         UUID requestId = request.getId();
-        localRpcRequests.put(requestId, responseConsumer);
-        long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
-        log.error("[{}] processing the request: [{}]", this.hashCode(), requestId);
-        rpcCallBackExecutor.schedule(() -> {
-            log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
-            Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
-            if (consumer != null) {
-                consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
-            }
-        }, timeout, TimeUnit.MILLISECONDS);
+        localToDeviceRpcRequests.put(requestId, responseConsumer);
+        sendRpcRequestToDevice(request);
+        scheduleTimeout(request, requestId, localToDeviceRpcRequests);
     }
 
     @Override
     public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
-        log.error("[{}] response to request: [{}]", this.hashCode(), response.getId());
+        log.trace("[{}] response to request: [{}]", this.hashCode(), response.getId());
         if (routingService.getCurrentServer().equals(response.getServerAddress())) {
             UUID requestId = response.getId();
-            Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+            Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId);
             if (consumer != null) {
                 consumer.accept(response);
             } else {
@@ -140,7 +160,27 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
         forward(deviceId, rpcMsg);
     }
 
-    private void sendRpcRequest(ToDeviceRpcRequest msg) {
+    private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg) {
+        ObjectNode entityNode = json.createObjectNode();
+        TbMsgMetaData metaData = new TbMsgMetaData();
+        metaData.putValue("requestUUID", msg.getId().toString());
+        metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
+        metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
+
+        entityNode.put("method", msg.getBody().getMethod());
+        entityNode.put("params", msg.getBody().getParams());
+
+        try {
+            TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON
+                    , json.writeValueAsString(entityNode)
+                    , null, null, 0L);
+            actorService.onMsg(new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg));
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void sendRpcRequestToDevice(ToDeviceRpcRequest msg) {
         ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(routingService.getCurrentServer(), msg);
         log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
         forward(msg.getDeviceId(), rpcMsg);
@@ -149,4 +189,18 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
         actorService.onMsg(new SendToClusterMsg(deviceId, msg));
     }
+
+    private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId, ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> requestsMap) {
+        long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
+        log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
+        rpcCallBackExecutor.schedule(() -> {
+            log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
+            Consumer<FromDeviceRpcResponse> consumer = requestsMap.remove(requestId);
+            if (consumer != null) {
+                consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
+            }
+        }, timeout, TimeUnit.MILLISECONDS);
+    }
+
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
index 8b4d47b..4cee96e 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
@@ -27,6 +27,10 @@ import java.util.function.Consumer;
  */
 public interface DeviceRpcService {
 
+    void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
+
+    void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response);
+
     void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
 
     void processRpcResponseFromDevice(FromDeviceRpcResponse response);
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index 6b1c4a4..34c14de 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -58,4 +58,6 @@ public class DataConstants {
     public static final String ATTRIBUTES_UPDATED = "ATTRIBUTES_UPDATED";
     public static final String ATTRIBUTES_DELETED = "ATTRIBUTES_DELETED";
 
+    public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
+
 }
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
index 5ea481a..9a93d1e 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
@@ -19,6 +19,8 @@ import lombok.Builder;
 import lombok.Data;
 import org.thingsboard.server.common.data.id.DeviceId;
 
+import java.util.UUID;
+
 /**
  * Created by ashvayka on 02.04.18.
  */
@@ -28,9 +30,11 @@ public final class RuleEngineDeviceRpcRequest {
 
     private final DeviceId deviceId;
     private final int requestId;
+    private final UUID requestUUID;
     private final boolean oneway;
     private final String method;
     private final String body;
-    private final long timeout;
+    private final long expirationTime;
+    private final boolean restApiCall;
 
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
index 5426278..f78b186 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
@@ -28,7 +28,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
         type = ComponentType.FILTER,
         name = "message type switch",
         configClazz = EmptyNodeConfiguration.class,
-        relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Activity Event", "Inactivity Event",
+        relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event",
                 "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
                 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Other"},
         nodeDescription = "Route incoming messages by Message Type",
@@ -52,7 +52,7 @@ public class TbMsgTypeSwitchNode implements TbNode {
         } else if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
             relationType = "Post telemetry";
         } else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) {
-            relationType = "RPC Request";
+            relationType = "RPC Request from Device";
         } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) {
             relationType = "Activity Event";
         } else if (msg.getType().equals(DataConstants.INACTIVITY_EVENT)) {
@@ -75,6 +75,8 @@ public class TbMsgTypeSwitchNode implements TbNode {
             relationType = "Attributes Updated";
         } else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
             relationType = "Attributes Deleted";
+        } else if (msg.getType().equals(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE)) {
+            relationType = "RPC Request to Device";
         } else {
             relationType = "Other";
         }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
index 288c2ea..87de490 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
@@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.TbMsg;
         type = ComponentType.ACTION,
         name = "rpc call reply",
         configClazz = TbSendRpcReplyNodeConfiguration.class,
-        nodeDescription = "Sends one-way RPC call to device",
+        nodeDescription = "Sends reply to RPC call from device",
         nodeDetails = "Expects messages with any message type. Will forward message body to the device.",
         uiResources = {"static/rulenode/rulenode-core-config.js"},
         configDirective = "tbActionNodeRpcReplyConfig",
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
index c1165ca..7cc08bf 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
@@ -15,10 +15,12 @@
  */
 package org.thingsboard.rule.engine.rpc;
 
+import com.datastax.driver.core.utils.UUIDs;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
 import org.thingsboard.rule.engine.api.RuleNode;
@@ -27,12 +29,14 @@ import org.thingsboard.rule.engine.api.TbNode;
 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.rule.engine.api.TbRelationTypes;
+import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.plugin.ComponentType;
 import org.thingsboard.server.common.msg.TbMsg;
 
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -40,8 +44,9 @@ import java.util.concurrent.TimeUnit;
         type = ComponentType.ACTION,
         name = "rpc call request",
         configClazz = TbSendRpcRequestNodeConfiguration.class,
-        nodeDescription = "Sends two-way RPC call to device",
-        nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes.",
+        nodeDescription = "Sends RPC call to device",
+        nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes." +
+                "If the RPC call request is originated by REST API call from user, will forward the response to user immediately.",
         uiResources = {"static/rulenode/rulenode-core-config.js"},
         configDirective = "tbActionNodeRpcRequestConfig",
         icon = "call_made"
@@ -61,7 +66,7 @@ public class TbSendRPCRequestNode implements TbNode {
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) {
         JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject();
-
+        String tmp;
         if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
             ctx.tellFailure(msg, new RuntimeException("Message originator is not a device entity!"));
         } else if (!json.has("method")) {
@@ -70,17 +75,31 @@ public class TbSendRPCRequestNode implements TbNode {
             ctx.tellFailure(msg, new RuntimeException("Params are not present in the message!"));
         } else {
             int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt();
+            boolean restApiCall = msg.getType().equals(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE);
+
+            tmp = msg.getMetaData().getValue("oneway");
+            boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
+
+            tmp = msg.getMetaData().getValue("requestUUID");
+            UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : UUIDs.timeBased();
+
+            tmp = msg.getMetaData().getValue("expirationTime");
+            long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
+
             RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
+                    .oneway(oneway)
                     .method(json.get("method").getAsString())
                     .body(gson.toJson(json.get("params")))
                     .deviceId(new DeviceId(msg.getOriginator().getId()))
                     .requestId(requestId)
-                    .timeout(TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()))
+                    .requestUUID(requestUUID)
+                    .expirationTime(expirationTime)
+                    .restApiCall(restApiCall)
                     .build();
 
             ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> {
                 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
-                    TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
+                    TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
                     ctx.tellNext(next, TbRelationTypes.SUCCESS);
                 } else {
                     TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));