thingsboard-aplcache

RPC

4/19/2018 11:07:11 AM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 6302963..c1fd878 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -199,6 +199,10 @@ public class ActorSystemContext {
     @Getter
     private long queuePersistenceTimeout;
 
+    @Value("${actors.client_side_rpc.timeout}")
+    @Getter
+    private long clientSideRpcTimeout;
+
     @Value("${actors.rule.chain.error_persist_frequency}")
     @Getter
     private long ruleChainErrorPersistFrequency;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index dfae339..93df180 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -25,12 +25,13 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
-import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
+import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
 
 public class DeviceActor extends ContextAwareActor {
 
@@ -62,10 +63,16 @@ public class DeviceActor extends ContextAwareActor {
                 processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
                 break;
             case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
-                processor.processRpcRequest(context(), (ToDeviceRpcRequestMsg) msg);
+                processor.processRpcRequest(context(), (ToDeviceRpcRequestActorMsg) msg);
                 break;
-            case DEVICE_ACTOR_RPC_TIMEOUT_MSG:
-                processor.processRpcTimeout(context(), (DeviceActorRpcTimeoutMsg) msg);
+            case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
+                processor.processToServerRPCResponse(context(), (ToServerRpcResponseActorMsg) msg);
+                break;
+            case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG:
+                processor.processServerSideRpcTimeout(context(), (DeviceActorServerSideRpcTimeoutMsg) msg);
+                break;
+            case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
+                processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
                 break;
             case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG:
                 processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 26c0a28..5c15241 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
 import org.thingsboard.server.common.data.DataConstants;
@@ -53,28 +54,28 @@ import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
 import org.thingsboard.server.common.msg.core.SessionCloseMsg;
 import org.thingsboard.server.common.msg.core.SessionCloseNotification;
 import org.thingsboard.server.common.msg.core.SessionOpenMsg;
-import org.thingsboard.server.common.msg.core.StatusCodeResponse;
 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
-import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.session.SessionType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
-import org.thingsboard.server.extensions.api.plugins.PluginCallback;
-import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
+import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -87,7 +88,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -103,10 +103,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     private final Map<SessionId, SessionInfo> sessions;
     private final Map<SessionId, SessionInfo> attributeSubscriptions;
     private final Map<SessionId, SessionInfo> rpcSubscriptions;
-    private final Map<Integer, ToDeviceRpcRequestMetadata> rpcPendingMap;
+    private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
+    private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
     private final Map<UUID, PendingSessionMsgData> pendingMsgs;
 
     private final Gson gson = new Gson();
+    private final JsonParser jsonParser = new JsonParser();
 
     private int rpcSeq = 0;
     private String deviceName;
@@ -120,7 +122,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         this.sessions = new HashMap<>();
         this.attributeSubscriptions = new HashMap<>();
         this.rpcSubscriptions = new HashMap<>();
-        this.rpcPendingMap = new HashMap<>();
+        this.toDeviceRpcPendingMap = new HashMap<>();
+        this.toServerRpcPendingMap = new HashMap<>();
         this.pendingMsgs = new HashMap<>();
         initAttributes();
     }
@@ -134,7 +137,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         this.defaultMetaData.putValue("deviceType", deviceType);
     }
 
-    void processRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg) {
+    void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) {
         ToDeviceRpcRequest request = msg.getMsg();
         ToDeviceRpcRequestBody body = request.getBody();
         ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
@@ -174,14 +177,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     }
 
-    private void registerPendingRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
-        rpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
-        DeviceActorRpcTimeoutMsg timeoutMsg = new DeviceActorRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
+    private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
+        toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
+        DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
         scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
     }
 
-    void processRpcTimeout(ActorContext context, DeviceActorRpcTimeoutMsg msg) {
-        ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(msg.getId());
+    void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) {
+        ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
         if (requestMd != null) {
             logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
             systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
@@ -200,7 +203,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
         PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
-        if (data != null) {
+        if (data != null && data.isReplyOnQueueAck()) {
             logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId());
             ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
             sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
@@ -208,8 +211,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     }
 
     private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
-        if (!rpcPendingMap.isEmpty()) {
-            logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId);
+        if (!toDeviceRpcPendingMap.isEmpty()) {
+            logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
             if (type == SessionType.SYNC) {
                 logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
                 rpcSubscriptions.remove(sessionId);
@@ -219,12 +222,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
         Set<Integer> sentOneWayIds = new HashSet<>();
         if (type == SessionType.ASYNC) {
-            rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
+            toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
         } else {
-            rpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
+            toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
         }
 
-        sentOneWayIds.forEach(rpcPendingMap::remove);
+        sentOneWayIds.forEach(toDeviceRpcPendingMap::remove);
     }
 
     private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
@@ -263,8 +266,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                     handlePostTelemetryRequest(context, msg);
                     break;
                 case TO_SERVER_RPC_REQUEST:
+                    handleClientSideRPCRequest(context, msg);
                     break;
-                //TODO: push to queue and start processing!
             }
         }
     }
@@ -345,11 +348,47 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         pushToRuleEngineWithTimeout(context, tbMsg, src, request);
     }
 
+    private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
+        ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
+
+        JsonObject json = new JsonObject();
+        json.addProperty("method", request.getMethod());
+        json.add("params", jsonParser.parse(request.getParams()));
+
+        TbMsgMetaData requestMetaData = defaultMetaData.copy();
+        requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json));
+        pushToRuleEngineWithTimeout(context, tbMsg, src, request);
+
+        scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
+        toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
+    }
+
+    public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
+        ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
+        if (data != null) {
+            logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
+            ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
+            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+        }
+    }
+
+    void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
+        ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
+        if (data != null) {
+            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+        }
+    }
+
     private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {
+        pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true);
+    }
+
+    private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) {
         SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType();
         int requestId = fromDeviceRequestMsg.getRequestId();
         if (systemContext.isQueuePersistenceEnabled()) {
-            pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId));
+            pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck));
             scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
         } else {
             ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId());
@@ -394,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
             logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
             ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
-            ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(responseMsg.getRequestId());
+            ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
             boolean success = requestMd != null;
             if (success) {
                 systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null));
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
index b3f381d..2ce05b2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
@@ -32,5 +32,6 @@ public final class PendingSessionMsgData {
     private final Optional<ServerAddress> serverAddress;
     private final SessionMsgType sessionMsgType;
     private final int requestId;
+    private final boolean replyOnQueueAck;
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
index 4ee9a20..8a4262c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
@@ -16,13 +16,13 @@
 package org.thingsboard.server.actors.device;
 
 import lombok.Data;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 /**
  * @author Andrew Shvayka
  */
 @Data
 public class ToDeviceRpcRequestMetadata {
-    private final ToDeviceRpcRequestMsg msg;
+    private final ToDeviceRpcRequestActorMsg msg;
     private final boolean sent;
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
new file mode 100644
index 0000000..f82a8c2
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.session.SessionType;
+
+import java.util.Optional;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+public class ToServerRpcRequestMetadata {
+    private final SessionId sessionId;
+    private final SessionType type;
+    private final Optional<ServerAddress> server;
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 65cbc22..bc36dc8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -37,7 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 import java.io.Serializable;
 import java.util.UUID;
@@ -142,14 +142,14 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
         return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
     }
 
-    private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
+    private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
         TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
         DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
 
         ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
         ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
 
-        return new ToDeviceRpcRequestMsg(serverAddress, request);
+        return new ToDeviceRpcRequestActorMsg(serverAddress, request);
     }
 
     private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 8ffd378..e1331f9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,15 +16,20 @@
 package org.thingsboard.server.actors.ruleChain;
 
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import com.datastax.driver.core.utils.UUIDs;
 import com.google.common.base.Function;
 import org.thingsboard.rule.engine.api.*;
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.dao.alarm.AlarmService;
 import org.thingsboard.server.dao.asset.AssetService;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -41,6 +46,7 @@ import scala.concurrent.duration.Duration;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * Created by ashvayka on 19.03.18.
@@ -113,6 +119,11 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
+    public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
+        return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data);
+    }
+
+    @Override
     public RuleNodeId getSelfId() {
         return nodeCtx.getSelf().getId();
     }
@@ -206,4 +217,29 @@ class DefaultTbContext implements TbContext {
     public MailService getMailService() {
         return mainCtx.getMailService();
     }
+
+    @Override
+    public RuleEngineRpcService getRpcService() {
+        return new RuleEngineRpcService() {
+            @Override
+            public void sendRpcReply(DeviceId deviceId, int requestId, String body) {
+                mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body);
+            }
+
+            @Override
+            public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
+                ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
+                        src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
+                mainCtx.getDeviceRpcService().process(request, response -> {
+                    consumer.accept(RuleEngineDeviceRpcResponse.builder()
+                            .deviceId(src.getDeviceId())
+                            .requestId(src.getRequestId())
+                            .error(response.getError())
+                            .response(response.getResponse())
+                            .build());
+                });
+            }
+
+        };
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
index 2c7d2e6..28261be 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.util.StringUtils;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -30,18 +31,22 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.context.request.async.DeferredResult;
-import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.audit.ActionType;
 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
 import org.thingsboard.server.common.data.exception.ThingsboardException;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.rpc.RpcRequest;
 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
 import org.thingsboard.server.extensions.api.plugins.PluginConstants;
-import org.thingsboard.server.common.data.rpc.RpcRequest;
-import org.thingsboard.server.service.rpc.LocalRequestMetaData;
+import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
 import org.thingsboard.server.service.rpc.DeviceRpcService;
+import org.thingsboard.server.service.rpc.LocalRequestMetaData;
 import org.thingsboard.server.service.security.AccessValidator;
 import org.thingsboard.server.service.security.model.SecurityUser;
 
@@ -53,6 +58,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Consumer;
 
 /**
  * Created by ashvayka on 22.03.18.
@@ -117,7 +123,6 @@ public class RpcController extends BaseController {
             accessValidator.validate(currentUser, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
                 @Override
                 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
-
                     ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
                             tenantId,
                             deviceId,
@@ -125,7 +130,13 @@ public class RpcController extends BaseController {
                             timeout,
                             body
                     );
-                    deviceRpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, result));
+                    deviceRpcService.process(rpcRequest, new Consumer<FromDeviceRpcResponse>(){
+
+                        @Override
+                        public void accept(FromDeviceRpcResponse fromDeviceRpcResponse) {
+                            reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse);
+                        }
+                    });
                 }
 
                 @Override
@@ -136,7 +147,7 @@ public class RpcController extends BaseController {
                     } else {
                         entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
                     }
-                    deviceRpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
+                    logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
                     response.setResult(entity);
                 }
             }));
@@ -146,4 +157,69 @@ public class RpcController extends BaseController {
         }
     }
 
+    public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
+        Optional<RpcError> rpcError = response.getError();
+        DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
+        if (rpcError.isPresent()) {
+            logRpcCall(rpcRequest, rpcError, null);
+            RpcError error = rpcError.get();
+            switch (error) {
+                case TIMEOUT:
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+                    break;
+                case NO_ACTIVE_CONNECTION:
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
+                    break;
+                default:
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+                    break;
+            }
+        } else {
+            Optional<String> responseData = response.getResponse();
+            if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
+                String data = responseData.get();
+                try {
+                    logRpcCall(rpcRequest, rpcError, null);
+                    responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
+                } catch (IOException e) {
+                    log.debug("Failed to decode device response: {}", data, e);
+                    logRpcCall(rpcRequest, rpcError, e);
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
+                }
+            } else {
+                logRpcCall(rpcRequest, rpcError, null);
+                responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
+            }
+        }
+    }
+
+    private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
+        logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
+    }
+
+
+    private void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
+        String rpcErrorStr = "";
+        if (rpcError.isPresent()) {
+            rpcErrorStr = "RPC Error: " + rpcError.get().name();
+        }
+        String method = body.getMethod();
+        String params = body.getParams();
+
+        auditLogService.logEntityAction(
+                user.getTenantId(),
+                user.getCustomerId(),
+                user.getId(),
+                user.getName(),
+                (UUIDBased & EntityId) entityId,
+                null,
+                ActionType.RPC_CALL,
+                BaseController.toException(e),
+                rpcErrorStr,
+                oneWay,
+                method,
+                params);
+    }
+
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index cdf8842..27334c6 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -40,7 +40,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 import javax.annotation.PreDestroy;
 import java.io.IOException;
@@ -132,7 +132,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     }
 
     @Override
-    public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) {
+    public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
         ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
                 .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
         tell(serverAddress, msg);
@@ -196,7 +196,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         ).build();
     }
 
-    private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) {
+    private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
         ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
         ToDeviceRpcRequest request = msg.getMsg();
 
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index e5a0434..6aefe46 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -24,7 +24,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg
 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 import java.util.UUID;
 
@@ -41,7 +41,7 @@ public interface ClusterRpcService {
 
     void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
 
-    void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward);
+    void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
 
     void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
 
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 4a730db..fcb9c26 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +15,6 @@
  */
 package org.thingsboard.server.service.rpc;
 
-import akka.actor.ActorRef;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -28,12 +27,15 @@ import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.audit.ActionType;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.controller.BaseController;
 import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
@@ -51,6 +53,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 /**
  * Created by ashvayka on 27.03.18.
@@ -59,8 +62,6 @@ import java.util.function.BiConsumer;
 @Slf4j
 public class DefaultDeviceRpcService implements DeviceRpcService {
 
-    private static final ObjectMapper jsonMapper = new ObjectMapper();
-
     @Autowired
     private ClusterRoutingService routingService;
 
@@ -75,7 +76,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
 
     private ScheduledExecutorService rpcCallBackExecutor;
 
-    private final ConcurrentMap<UUID, LocalRequestMetaData> localRpcRequests = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
 
 
     @PostConstruct
@@ -91,18 +92,18 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     }
 
     @Override
-    public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) {
+    public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
         log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
         sendRpcRequest(request);
         UUID requestId = request.getId();
-        localRpcRequests.put(requestId, metaData);
+        localRpcRequests.put(requestId, responseConsumer);
         long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
         log.error("[{}] processing the request: [{}]", this.hashCode(), requestId);
         rpcCallBackExecutor.schedule(() -> {
             log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
-            LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
-            if (localMetaData != null) {
-                reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
+            Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+            if (consumer != null) {
+                consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
             }
         }, timeout, TimeUnit.MILLISECONDS);
     }
@@ -123,58 +124,27 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
         log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
         //TODO: send to another server if needed.
         UUID requestId = response.getId();
-        LocalRequestMetaData md = localRpcRequests.remove(requestId);
-        if (md != null) {
-            log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
-            reply(md, response);
+        Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+        if (consumer != null) {
+            consumer.accept(response);
         } else {
             log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
         }
     }
 
-    public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
-        Optional<RpcError> rpcError = response.getError();
-        DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
-        if (rpcError.isPresent()) {
-            logRpcCall(rpcRequest, rpcError, null);
-            RpcError error = rpcError.get();
-            switch (error) {
-                case TIMEOUT:
-                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
-                    break;
-                case NO_ACTIVE_CONNECTION:
-                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
-                    break;
-                default:
-                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
-                    break;
-            }
-        } else {
-            Optional<String> responseData = response.getResponse();
-            if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
-                String data = responseData.get();
-                try {
-                    logRpcCall(rpcRequest, rpcError, null);
-                    responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
-                } catch (IOException e) {
-                    log.debug("Failed to decode device response: {}", data, e);
-                    logRpcCall(rpcRequest, rpcError, e);
-                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
-                }
-            } else {
-                logRpcCall(rpcRequest, rpcError, null);
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
-            }
-        }
+    @Override
+    public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
+        ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
+        forward(deviceId, rpcMsg, rpcService::tell);
     }
 
     private void sendRpcRequest(ToDeviceRpcRequest msg) {
         log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
-        ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg);
+        ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
         forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
     }
 
-    private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer<ServerAddress, ToDeviceRpcRequestMsg> rpcFunction) {
+    private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
         Optional<ServerAddress> instance = routingService.resolveById(deviceId);
         if (instance.isPresent()) {
             log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
@@ -184,32 +154,4 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
             actorService.onMsg(msg);
         }
     }
-
-    private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
-        logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
-    }
-
-    @Override
-    public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
-        String rpcErrorStr = "";
-        if (rpcError.isPresent()) {
-            rpcErrorStr = "RPC Error: " + rpcError.get().name();
-        }
-        String method = body.getMethod();
-        String params = body.getParams();
-
-        auditLogService.logEntityAction(
-                user.getTenantId(),
-                user.getCustomerId(),
-                user.getId(),
-                user.getName(),
-                (UUIDBased & EntityId) entityId,
-                null,
-                ActionType.RPC_CALL,
-                BaseController.toException(e),
-                rpcErrorStr,
-                oneWay,
-                method,
-                params);
-    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
index f58db7f..4299120 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
@@ -15,26 +15,25 @@
  */
 package org.thingsboard.server.service.rpc;
 
-import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
-import org.thingsboard.server.service.security.model.SecurityUser;
 
-import java.util.Optional;
+import java.util.function.Consumer;
 
 /**
  * Created by ashvayka on 16.04.18.
  */
 public interface DeviceRpcService {
 
-    void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData);
+    void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
 
     void process(ToDeviceRpcRequest request, ServerAddress originator);
 
     void process(FromDeviceRpcResponse response);
 
-    void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e);
+    void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
new file mode 100644
index 0000000..f3183ec
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.rpc;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
+import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 16.04.18.
+ */
+@ToString
+@RequiredArgsConstructor
+public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
+
+    private final ServerAddress serverAddress;
+
+    @Getter
+    private final TenantId tenantId;
+
+    @Getter
+    private final DeviceId deviceId;
+
+    @Getter
+    private final ToServerRpcResponseMsg msg;
+
+    public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) {
+        this(null, tenantId, deviceId, msg);
+    }
+
+    public Optional<ServerAddress> getServerAddress() {
+        return Optional.ofNullable(serverAddress);
+    }
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
+    }
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 043b8da..d9e9012 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -229,6 +229,8 @@ actors:
     enabled: "${ACTORS_QUEUE_ENABLED:true}"
     # Maximum allowed timeout for persistence into the queue
     timeout: "${ACTORS_QUEUE_PERSISTENCE_TIMEOUT:30000}"
+  client_side_rpc:
+    timeout:  "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
 
 cache:
   # caffeine or redis
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
index f5e249c..dcfde0f 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
@@ -21,7 +21,7 @@ package org.thingsboard.server.common.msg.core;
 
 public enum RuleEngineError {
 
-    QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true);
+    QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true), TIMEOUT;
 
     private final boolean critical;
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
index 61e5cf5..e0ff23b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
@@ -44,6 +44,8 @@ public class RuleEngineErrorMsg implements ToDeviceMsg {
                 return "Timeout during persistence of the message to the queue!";
             case SERVER_ERROR:
                 return "Error during processing of message by the server!";
+            case TIMEOUT:
+                return "Timeout during processing of message by the server!";
             default:
                 throw new RuntimeException("Error " + error + " is not supported!");
         }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 6bde7fa..74c08da 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -75,7 +75,11 @@ public enum MsgType {
 
     DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG,
 
-    DEVICE_ACTOR_RPC_TIMEOUT_MSG,
+    SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG,
+
+    DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG,
+
+    DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
 
     DEVICE_ACTOR_QUEUE_TIMEOUT_MSG,
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java
new file mode 100644
index 0000000..cd86892
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.timeout;
+
+import org.thingsboard.server.common.msg.MsgType;
+
+/**
+ * @author Andrew Shvayka
+ */
+public final class DeviceActorServerSideRpcTimeoutMsg extends TimeoutMsg<Integer> {
+
+    public DeviceActorServerSideRpcTimeoutMsg(Integer id, long timeout) {
+        super(id, timeout);
+    }
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG;
+    }
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
new file mode 100644
index 0000000..b3d724f
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+@Data
+@Builder
+public final class RuleEngineDeviceRpcRequest {
+
+    private final DeviceId deviceId;
+    private final int requestId;
+    private final boolean oneway;
+    private final String method;
+    private final String body;
+    private final long timeout;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java
new file mode 100644
index 0000000..bd0f3bb
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+@Data
+@Builder
+public final class RuleEngineDeviceRpcResponse {
+
+    private final DeviceId deviceId;
+    private final int requestId;
+    private final Optional<String> response;
+    private final Optional<RpcError> error;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
new file mode 100644
index 0000000..df9d8d9
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.id.DeviceId;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+public interface RuleEngineRpcService {
+
+    void sendRpcReply(DeviceId deviceId, int requestId, String body);
+
+    void sendRpcRequest(RuleEngineDeviceRpcRequest request, Consumer<RuleEngineDeviceRpcResponse> consumer);
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 6038e6d..bd0a062 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -15,10 +15,12 @@
  */
 package org.thingsboard.rule.engine.api;
 
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.alarm.AlarmService;
 import org.thingsboard.server.dao.asset.AssetService;
@@ -58,6 +60,8 @@ public interface TbContext {
 
     void updateSelf(RuleNode self);
 
+    TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
+
     RuleNodeId getSelfId();
 
     TenantId getTenantId();
@@ -78,6 +82,8 @@ public interface TbContext {
 
     RuleChainService getRuleChainService();
 
+    RuleEngineRpcService getRpcService();
+
     RuleEngineTelemetryService getTelemetryService();
 
     TimeseriesService getTimeseriesService();
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java
new file mode 100644
index 0000000..e6455da
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+public final class TbRelationTypes {
+
+    public static String SUCCESS = "Success";
+    public static String FAILURE = "Failure";
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
new file mode 100644
index 0000000..9649768
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "rpc call reply",
+        configClazz = TbSendRpcReplyNodeConfiguration.class,
+        nodeDescription = "Sends reply to the RPC call from device",
+        nodeDetails = "Expects messages with any message type. Will forward message body to the device."
+)
+public class TbSendRPCReplyNode implements TbNode {
+
+    private TbSendRpcReplyNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbSendRpcReplyNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) {
+        String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
+        if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
+            ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!"));
+        } else if (StringUtils.isEmpty(requestIdStr)) {
+            ctx.tellError(msg, new RuntimeException("Request id is not present in the metadata!"));
+        } else if (StringUtils.isEmpty(msg.getData())) {
+            ctx.tellError(msg, new RuntimeException("Request body is empty!"));
+        } else {
+            ctx.getRpcService().sendRpcReply(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData());
+        }
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java
new file mode 100644
index 0000000..402a33b
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.DataConstants;
+
+@Data
+public class TbSendRpcReplyNodeConfiguration implements NodeConfiguration<TbSendRpcReplyNodeConfiguration> {
+
+    private String requestIdMetaDataAttribute;
+
+    @Override
+    public TbSendRpcReplyNodeConfiguration defaultConfiguration() {
+        TbSendRpcReplyNodeConfiguration configuration = new TbSendRpcReplyNodeConfiguration();
+        configuration.setRequestIdMetaDataAttribute("requestId");
+        return configuration;
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
new file mode 100644
index 0000000..937ced6
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.TbRelationTypes;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "rpc call request",
+        configClazz = TbSendRpcReplyNodeConfiguration.class,
+        nodeDescription = "Sends one-way RPC call to device",
+        nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes."
+)
+public class TbSendRPCRequestNode implements TbNode {
+
+    private Random random = new Random();
+    private Gson gson = new Gson();
+    private JsonParser jsonParser = new JsonParser();
+    private TbSendRpcRequestNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbSendRpcRequestNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) {
+        JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject();
+
+        if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
+            ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!"));
+        } else if (!json.has("method")) {
+            ctx.tellError(msg, new RuntimeException("Method is not present in the message!"));
+        } else if (!json.has("params")) {
+            ctx.tellError(msg, new RuntimeException("Params are not present in the message!"));
+        } else {
+            int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt();
+            RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
+                    .method(gson.toJson(json.get("method")))
+                    .body(gson.toJson(json.get("params")))
+                    .deviceId(new DeviceId(msg.getOriginator().getId()))
+                    .requestId(requestId)
+                    .timeout(TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()))
+                    .build();
+
+            ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> {
+                if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
+                    TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
+                    ctx.tellNext(next, TbRelationTypes.SUCCESS);
+                } else {
+                    TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
+                    ctx.tellNext(next, TbRelationTypes.FAILURE);
+                    ctx.tellError(msg, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
+                }
+            });
+        }
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    private String wrap(String name, String body) {
+        JsonObject json = new JsonObject();
+        json.addProperty(name, body);
+        return gson.toJson(json);
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java
new file mode 100644
index 0000000..214ce65
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+
+@Data
+public class TbSendRpcRequestNodeConfiguration implements NodeConfiguration<TbSendRpcRequestNodeConfiguration> {
+
+    private int timeoutInSeconds;
+
+    @Override
+    public TbSendRpcRequestNodeConfiguration defaultConfiguration() {
+        TbSendRpcRequestNodeConfiguration configuration = new TbSendRpcRequestNodeConfiguration();
+        configuration.setTimeoutInSeconds(60);
+        return configuration;
+    }
+}