thingsboard-memoizeit

RPC refactoring

4/16/2018 2:09:47 PM

Changes

extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java 161(+0 -161)

extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRuleMsgHandler.java 102(+0 -102)

extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/RpcManager.java 69(+0 -69)

extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/RpcPlugin.java 86(+0 -86)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 3644a49..07b6b9b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -26,11 +26,13 @@ import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.msg.session.MsgType;
 import org.thingsboard.server.common.msg.session.SessionType;
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 10f16ce..7862d23 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -36,9 +36,11 @@ import org.thingsboard.server.common.data.page.TextPageLink;
 import org.thingsboard.server.common.data.plugin.PluginMetaData;
 import org.thingsboard.server.common.data.relation.EntityRelation;
 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.data.rule.RuleMetaData;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.PluginApiCallSecurityContext;
 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
@@ -348,7 +350,7 @@ public final class PluginProcessingContext implements PluginContext {
                     throw new IllegalStateException("Not Implemented!");
             }
         } else {
-            callback.onSuccess(this, ValidationResult.ok());
+            callback.onSuccess(this, ValidationResult.ok(null));
         }
     }
 
@@ -366,7 +368,7 @@ public final class PluginProcessingContext implements PluginContext {
                     } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
                         return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(null);
                     }
                 }
             }));
@@ -387,7 +389,7 @@ public final class PluginProcessingContext implements PluginContext {
                     } else if (ctx.isCustomerUser() && !asset.getCustomerId().equals(ctx.getCustomerId())) {
                         return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(null);
                     }
                 }
             }));
@@ -408,7 +410,7 @@ public final class PluginProcessingContext implements PluginContext {
                     } else if (ctx.isSystemAdmin() && !rule.getTenantId().isNullUid()) {
                         return ValidationResult.accessDenied("Rule is not in system scope!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(null);
                     }
                 }
             }));
@@ -429,7 +431,7 @@ public final class PluginProcessingContext implements PluginContext {
                     } else if (ctx.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
                         return ValidationResult.accessDenied("Rule chain is not in system scope!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(null);
                     }
                 }
             }));
@@ -451,7 +453,7 @@ public final class PluginProcessingContext implements PluginContext {
                     } else if (ctx.isSystemAdmin() && !plugin.getTenantId().isNullUid()) {
                         return ValidationResult.accessDenied("Plugin is not in system scope!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(null);
                     }
                 }
             }));
@@ -472,7 +474,7 @@ public final class PluginProcessingContext implements PluginContext {
                     } else if (ctx.isCustomerUser() && !customer.getId().equals(ctx.getCustomerId())) {
                         return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(null);
                     }
                 }
             }));
@@ -483,7 +485,7 @@ public final class PluginProcessingContext implements PluginContext {
         if (ctx.isCustomerUser()) {
             callback.onSuccess(this, ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else if (ctx.isSystemAdmin()) {
-            callback.onSuccess(this, ValidationResult.ok());
+            callback.onSuccess(this, ValidationResult.ok(null));
         } else {
             ListenableFuture<Tenant> tenantFuture = pluginCtx.tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
             Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
@@ -492,7 +494,7 @@ public final class PluginProcessingContext implements PluginContext {
                 } else if (!tenant.getId().equals(ctx.getTenantId())) {
                     return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
                 } else {
-                    return ValidationResult.ok();
+                    return ValidationResult.ok(null);
                 }
             }));
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index 1828970..ad3fef5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -18,9 +18,7 @@ package org.thingsboard.server.actors.plugin;
 import akka.actor.ActorRef;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
@@ -38,7 +36,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java
index f135ffb..0319a3e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java
@@ -20,29 +20,30 @@ import lombok.Data;
 
 @Data
 @AllArgsConstructor
-public class ValidationResult {
+public class ValidationResult<V> {
 
     private final ValidationResultCode resultCode;
     private final String message;
+    private final V v;
 
-    public static ValidationResult ok() {
-        return new ValidationResult(ValidationResultCode.OK, "Ok");
+    public static <V> ValidationResult<V> ok(V v) {
+        return new ValidationResult<>(ValidationResultCode.OK, "Ok", v);
     }
 
-    public static ValidationResult accessDenied(String message) {
-        return new ValidationResult(ValidationResultCode.ACCESS_DENIED, message);
+    public static <V> ValidationResult<V> accessDenied(String message) {
+        return new ValidationResult<>(ValidationResultCode.ACCESS_DENIED, message, null);
     }
 
-    public static ValidationResult entityNotFound(String message) {
-        return new ValidationResult(ValidationResultCode.ENTITY_NOT_FOUND, message);
+    public static <V> ValidationResult<V> entityNotFound(String message) {
+        return new ValidationResult<>(ValidationResultCode.ENTITY_NOT_FOUND, message, null);
     }
 
-    public static ValidationResult unauthorized(String message) {
-        return new ValidationResult(ValidationResultCode.UNAUTHORIZED, message);
+    public static <V> ValidationResult<V> unauthorized(String message) {
+        return new ValidationResult<>(ValidationResultCode.UNAUTHORIZED, message, null);
     }
 
-    public static ValidationResult internalError(String message) {
-        return new ValidationResult(ValidationResultCode.INTERNAL_ERROR, message);
+    public static <V> ValidationResult<V> internalError(String message) {
+        return new ValidationResult<>(ValidationResultCode.INTERNAL_ERROR, message, null);
     }
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 4fa2227..22fce84 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -24,10 +24,12 @@ import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.*;
 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
@@ -35,6 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
 
 import java.io.Serializable;
 import java.util.UUID;
@@ -139,28 +142,20 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
         return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
     }
 
-    private static ToDeviceRpcRequestPluginMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
-        ClusterAPIProtos.PluginAddress address = msg.getAddress();
-        TenantId pluginTenantId = new TenantId(toUUID(address.getTenantId()));
-        PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
-
+    private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
         TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
         DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
 
         ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
-        ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), null, deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
+        ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
 
-        return new ToDeviceRpcRequestPluginMsg(serverAddress, pluginId, pluginTenantId, request);
+        return new ToDeviceRpcRequestMsg(serverAddress, request);
     }
 
     private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
-        ClusterAPIProtos.PluginAddress address = msg.getAddress();
-        TenantId pluginTenantId = new TenantId(toUUID(address.getTenantId()));
-        PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
-
         RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
         FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
-        return new ToPluginRpcResponseDeviceMsg(pluginId, pluginTenantId, response);
+        return new ToPluginRpcResponseDeviceMsg(null, null, response);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index c7574e8..f08feec 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -602,7 +602,7 @@ public abstract class BaseController {
         auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo);
     }
 
-    protected static Exception toException(Throwable error) {
+    public static Exception toException(Throwable error) {
         return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
new file mode 100644
index 0000000..9810c03
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.controller;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
+import org.thingsboard.server.common.data.exception.ThingsboardException;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
+import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
+import org.thingsboard.server.extensions.api.plugins.PluginConstants;
+import org.thingsboard.server.common.data.rpc.RpcRequest;
+import org.thingsboard.server.service.rpc.LocalRequestMetaData;
+import org.thingsboard.server.service.rpc.RpcService;
+import org.thingsboard.server.service.security.AccessValidator;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 22.03.18.
+ */
+@RestController
+@RequestMapping(PluginConstants.RPC_URL_PREFIX)
+@Slf4j
+public class RpcController extends BaseController {
+
+    public static final int DEFAULT_TIMEOUT = 10000;
+    protected final ObjectMapper jsonMapper = new ObjectMapper();
+
+    @Autowired
+    private RpcService rpcService;
+
+    @Autowired
+    private AccessValidator accessValidator;
+
+    private ExecutorService executor;
+
+    @PostConstruct
+    public void initExecutor() {
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @PreDestroy
+    public void shutdownExecutor() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> handleOneWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException {
+        return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/twoway/{deviceId}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> handleTwoWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException {
+        return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
+    }
+
+
+    private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
+        try {
+            JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
+            RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
+                    jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
+
+            if (rpcRequestBody.has("timeout")) {
+                cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
+            }
+            SecurityUser currentUser = getCurrentUser();
+            TenantId tenantId = currentUser.getTenantId();
+            final DeferredResult<ResponseEntity> response = new DeferredResult<>();
+            long timeout = System.currentTimeMillis() + (cmd.getTimeout() != null ? cmd.getTimeout() : DEFAULT_TIMEOUT);
+            ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
+            accessValidator.validate(currentUser, deviceId, new FutureCallback<ValidationResult>() {
+                @Override
+                public void onSuccess(@Nullable ValidationResult result) {
+                    ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
+                            tenantId,
+                            deviceId,
+                            oneWay,
+                            timeout,
+                            body
+                    );
+                    rpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, response));
+                }
+
+                @Override
+                public void onFailure(Throwable e) {
+                    ResponseEntity entity;
+                    if (e instanceof ToErrorResponseEntity) {
+                        entity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
+                    } else {
+                        entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
+                    }
+                    rpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
+                    response.setResult(entity);
+                }
+            });
+            return response;
+        } catch (IOException ioe) {
+            throw new ThingsboardException("Invalid request body", ioe, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
+        }
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index 5a6b307..f73c5eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -19,7 +19,6 @@ import com.google.protobuf.ByteString;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -27,7 +26,6 @@ import org.springframework.util.SerializationUtils;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@@ -35,21 +33,18 @@ import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
-import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.io.IOException;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -138,7 +133,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     }
 
     @Override
-    public void tell(ServerAddress serverAddress, ToDeviceRpcRequestPluginMsg toForward) {
+    public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) {
         ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
                 .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
         tell(serverAddress, msg);
@@ -202,15 +197,10 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         ).build();
     }
 
-    private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestPluginMsg msg) {
+    private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) {
         ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
         ToDeviceRpcRequest request = msg.getMsg();
 
-        builder.setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
-                .setTenantId(toUid(msg.getPluginTenantId().getId()))
-                .setPluginId(toUid(msg.getPluginId().getId()))
-                .build());
-
         builder.setDeviceTenantId(toUid(msg.getTenantId()));
         builder.setDeviceId(toUid(msg.getDeviceId()));
 
@@ -227,11 +217,6 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
         FromDeviceRpcResponse request = msg.getResponse();
 
-        builder.setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
-                .setTenantId(toUid(msg.getPluginTenantId().getId()))
-                .setPluginId(toUid(msg.getPluginId().getId()))
-                .build());
-
         builder.setMsgId(toUid(request.getId()));
         request.getResponse().ifPresent(builder::setResponse);
         request.getError().ifPresent(e -> builder.setError(e.name()));
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 8c50bb7..b2b9e88 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -20,11 +20,13 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
 
 import java.util.UUID;
 
@@ -41,7 +43,7 @@ public interface ClusterRpcService {
 
     void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
 
-    void tell(ServerAddress serverAddress, ToDeviceRpcRequestPluginMsg toForward);
+    void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward);
 
     void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
 
@@ -50,4 +52,5 @@ public interface ClusterRpcService {
     void broadcast(ToAllNodesMsg msg);
 
     void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultRpcService.java
new file mode 100644
index 0000000..f81de99
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultRpcService.java
@@ -0,0 +1,201 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.rpc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.audit.ActionType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
+import org.thingsboard.server.controller.BaseController;
+import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Service
+@Slf4j
+public class DefaultRpcService implements RpcService {
+
+    private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+    @Autowired
+    private ClusterRoutingService routingService;
+
+    @Autowired
+    private ClusterRpcService rpcService;
+
+    @Autowired
+    private ActorService actorService;
+
+    @Autowired
+    private AuditLogService auditLogService;
+
+    private ScheduledExecutorService rpcCallBackExecutor;
+
+    private final ConcurrentMap<UUID, LocalRequestMetaData> localRpcRequests = new ConcurrentHashMap<>();
+
+
+    @PostConstruct
+    public void initExecutor() {
+        rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @PreDestroy
+    public void shutdownExecutor() {
+        if (rpcCallBackExecutor != null) {
+            rpcCallBackExecutor.shutdownNow();
+        }
+    }
+
+    @Override
+    public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) {
+        log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
+        sendRpcRequest(request);
+        UUID requestId = request.getId();
+        localRpcRequests.put(requestId, metaData);
+        long timeout = Math.max(0, System.currentTimeMillis() - request.getExpirationTime());
+        rpcCallBackExecutor.schedule(() -> {
+            LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
+            if (localMetaData != null) {
+                reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
+            }
+        }, timeout, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void process(FromDeviceRpcResponse response) {
+        UUID requestId = response.getId();
+        LocalRequestMetaData md = localRpcRequests.remove(requestId);
+        if (md != null) {
+            log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
+            reply(md, response);
+        } else {
+            log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+        }
+    }
+
+    public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
+        Optional<RpcError> rpcError = response.getError();
+        DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
+        if (rpcError.isPresent()) {
+            logRpcCall(rpcRequest, rpcError, null);
+            RpcError error = rpcError.get();
+            switch (error) {
+                case TIMEOUT:
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+                    break;
+                case NO_ACTIVE_CONNECTION:
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
+                    break;
+                default:
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+                    break;
+            }
+        } else {
+            Optional<String> responseData = response.getResponse();
+            if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
+                String data = responseData.get();
+                try {
+                    logRpcCall(rpcRequest, rpcError, null);
+                    responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
+                } catch (IOException e) {
+                    log.debug("Failed to decode device response: {}", data, e);
+                    logRpcCall(rpcRequest, rpcError, e);
+                    responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
+                }
+            } else {
+                logRpcCall(rpcRequest, rpcError, null);
+                responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
+            }
+        }
+    }
+
+    private void sendRpcRequest(ToDeviceRpcRequest msg) {
+        log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
+        ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg);
+        forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
+    }
+
+    private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer<ServerAddress, ToDeviceRpcRequestMsg> rpcFunction) {
+        Optional<ServerAddress> instance = routingService.resolveById(deviceId);
+        if (instance.isPresent()) {
+            log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
+            rpcFunction.accept(instance.get(), msg);
+        } else {
+            log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
+            actorService.onMsg(msg);
+        }
+    }
+
+    private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
+        logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
+    }
+
+    @Override
+    public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
+        String rpcErrorStr = "";
+        if (rpcError.isPresent()) {
+            rpcErrorStr = "RPC Error: " + rpcError.get().name();
+        }
+        String method = body.getMethod();
+        String params = body.getParams();
+
+        auditLogService.logEntityAction(
+                user.getTenantId(),
+                user.getCustomerId(),
+                user.getId(),
+                user.getName(),
+                (UUIDBased & EntityId) entityId,
+                null,
+                ActionType.RPC_CALL,
+                BaseController.toException(e),
+                rpcErrorStr,
+                oneWay,
+                method,
+                params);
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java
new file mode 100644
index 0000000..4ff2a7c
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.rpc;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
+import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 16.04.18.
+ */
+@ToString
+@RequiredArgsConstructor
+public class ToDeviceRpcRequestMsg implements ToDeviceActorNotificationMsg {
+
+    private final ServerAddress serverAddress;
+    @Getter
+    private final ToDeviceRpcRequest msg;
+
+    public ToDeviceRpcRequestMsg(ToDeviceRpcRequest msg) {
+        this(null, msg);
+    }
+
+    public Optional<ServerAddress> getServerAddress() {
+        return Optional.ofNullable(serverAddress);
+    }
+
+    @Override
+    public DeviceId getDeviceId() {
+        return msg.getDeviceId();
+    }
+
+    @Override
+    public TenantId getTenantId() {
+        return msg.getTenantId();
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
index c1f3688..8487279 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -25,6 +25,7 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 import org.springframework.web.context.request.async.DeferredResult;
 import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.BaseData;
 import org.thingsboard.server.common.data.Customer;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.Tenant;
@@ -35,8 +36,10 @@ import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.controller.HttpValidationCallback;
 import org.thingsboard.server.dao.alarm.AlarmService;
 import org.thingsboard.server.dao.asset.AssetService;
@@ -140,7 +143,7 @@ public class AccessValidator {
         return response;
     }
 
-    public <T> void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+    public void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         switch (entityId.getEntityType()) {
             case DEVICE:
                 validateDevice(currentUser, entityId, callback);
@@ -177,14 +180,14 @@ public class AccessValidator {
                     } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
                         return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(device);
                     }
                 }
             }), executor);
         }
     }
 
-    private <T> void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+    private void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isSystemAdmin()) {
             callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -198,15 +201,14 @@ public class AccessValidator {
                     } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
                         return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(asset);
                     }
                 }
             }), executor);
         }
     }
 
-
-    private <T> void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+    private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isCustomerUser()) {
             callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -220,14 +222,40 @@ public class AccessValidator {
                     } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
                         return ValidationResult.accessDenied("Rule chain is not in system scope!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(ruleChain);
+                    }
+                }
+            }), executor);
+        }
+    }
+
+    private void validateRule(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+        if (currentUser.isCustomerUser()) {
+            callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+        } else {
+            ListenableFuture<RuleNode> ruleNodeFuture = ruleChainService.findRuleNodeByIdAsync(new RuleNodeId(entityId.getId()));
+            Futures.addCallback(ruleNodeFuture, getCallback(callback, ruleNodeTmp -> {
+                RuleNode ruleNode = ruleNodeTmp;
+                if (ruleNode == null) {
+                    return ValidationResult.entityNotFound("Rule node with requested id wasn't found!");
+                } else if (ruleNode.getRuleChainId() == null) {
+                    return ValidationResult.entityNotFound("Rule chain with requested node id wasn't found!");
+                } else {
+                    //TODO: make async
+                    RuleChain ruleChain = ruleChainService.findRuleChainById(ruleNode.getRuleChainId());
+                    if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
+                        return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
+                    } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
+                        return ValidationResult.accessDenied("Rule chain is not in system scope!");
+                    } else {
+                        return ValidationResult.ok(ruleNode);
                     }
                 }
             }), executor);
         }
     }
 
-    private <T> void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+    private void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isSystemAdmin()) {
             callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -241,18 +269,18 @@ public class AccessValidator {
                     } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
                         return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
                     } else {
-                        return ValidationResult.ok();
+                        return ValidationResult.ok(customer);
                     }
                 }
             }), executor);
         }
     }
 
-    private <T> void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+    private void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isCustomerUser()) {
             callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else if (currentUser.isSystemAdmin()) {
-            callback.onSuccess(ValidationResult.ok());
+            callback.onSuccess(ValidationResult.ok(null));
         } else {
             ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
             Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
@@ -261,13 +289,13 @@ public class AccessValidator {
                 } else if (!tenant.getId().equals(currentUser.getTenantId())) {
                     return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
                 } else {
-                    return ValidationResult.ok();
+                    return ValidationResult.ok(tenant);
                 }
             }), executor);
         }
     }
 
-    private <T> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult> transformer) {
+    private <T, V> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult<V>> transformer) {
         return new FutureCallback<T>() {
             @Override
             public void onSuccess(@Nullable T result) {
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index e106d1b..3bb0db7 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -61,7 +61,6 @@ message ConnectRpcMessage {
 }
 
 message ToDeviceRpcRequestRpcMessage {
-  PluginAddress address = 1;
   Uid deviceTenantId = 2;
   Uid deviceId = 3;
 
@@ -73,8 +72,6 @@ message ToDeviceRpcRequestRpcMessage {
 }
 
 message ToPluginRpcResponseRpcMessage {
-  PluginAddress address = 1;
-
   Uid msgId = 2;
   string response = 3;
   string error = 4;
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
index fbc1103..808166b 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
@@ -22,6 +22,7 @@ import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.HasName;
 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
+import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.TenantId;
 
@@ -32,6 +33,7 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
 
     private static final long serialVersionUID = -5656679015121235465L;
 
+    private RuleChainId ruleChainId;
     private String type;
     private String name;
     private boolean debugMode;
@@ -49,8 +51,10 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
 
     public RuleNode(RuleNode ruleNode) {
         super(ruleNode);
+        this.ruleChainId = ruleNode.getRuleChainId();
         this.type = ruleNode.getType();
         this.name = ruleNode.getName();
+        this.debugMode = ruleNode.isDebugMode();
         this.setConfiguration(ruleNode.getConfiguration());
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 8c34cd3..aff6381 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -350,6 +350,7 @@ public class ModelConstants {
      * Cassandra rule node constants.
      */
     public static final String RULE_NODE_COLUMN_FAMILY_NAME = "rule_node";
+    public static final String RULE_NODE_CHAIN_ID_PROPERTY = "rule_chain_id";
     public static final String RULE_NODE_TYPE_PROPERTY = "type";
     public static final String RULE_NODE_NAME_PROPERTY = "name";
     public static final String RULE_NODE_CONFIGURATION_PROPERTY = "configuration";
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
index 8d3f3c3..ec13022 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
@@ -24,6 +24,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.dao.model.SearchTextEntity;
@@ -41,6 +42,8 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
     @PartitionKey
     @Column(name = ID_PROPERTY)
     private UUID id;
+    @Column(name = RULE_NODE_CHAIN_ID_PROPERTY)
+    private UUID ruleChainId;
     @Column(name = RULE_NODE_TYPE_PROPERTY)
     private String type;
     @Column(name = RULE_NODE_NAME_PROPERTY)
@@ -56,7 +59,6 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
     @Column(name = DEBUG_MODE)
     private boolean debugMode;
 
-
     public RuleNodeEntity() {
     }
 
@@ -64,6 +66,9 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
         if (ruleNode.getId() != null) {
             this.id = ruleNode.getUuidId();
         }
+        if (ruleNode.getRuleChainId() != null) {
+            this.ruleChainId = ruleNode.getRuleChainId().getId();
+        }
         this.type = ruleNode.getType();
         this.name = ruleNode.getName();
         this.debugMode = ruleNode.isDebugMode();
@@ -92,6 +97,14 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
         this.id = id;
     }
 
+    public UUID getRuleChainId() {
+        return ruleChainId;
+    }
+
+    public void setRuleChainId(UUID ruleChainId) {
+        this.ruleChainId = ruleChainId;
+    }
+
     public String getType() {
         return type;
     }
@@ -132,6 +145,9 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
     public RuleNode toData() {
         RuleNode ruleNode = new RuleNode(new RuleNodeId(id));
         ruleNode.setCreatedTime(UUIDs.unixTimestamp(id));
+        if (this.ruleChainId != null) {
+            ruleNode.setRuleChainId(new RuleChainId(this.ruleChainId));
+        }
         ruleNode.setType(this.type);
         ruleNode.setName(this.name);
         ruleNode.setDebugMode(this.debugMode);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
index 6a888c2..1d05433 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
@@ -21,8 +21,10 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.hibernate.annotations.Type;
 import org.hibernate.annotations.TypeDef;
+import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.dao.DaoUtil;
 import org.thingsboard.server.dao.model.BaseSqlEntity;
 import org.thingsboard.server.dao.model.ModelConstants;
 import org.thingsboard.server.dao.model.SearchTextEntity;
@@ -39,6 +41,9 @@ import javax.persistence.Table;
 @Table(name = ModelConstants.RULE_NODE_COLUMN_FAMILY_NAME)
 public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTextEntity<RuleNode> {
 
+    @Column(name = ModelConstants.RULE_NODE_CHAIN_ID_PROPERTY)
+    private String ruleChainId;
+
     @Column(name = ModelConstants.RULE_NODE_TYPE_PROPERTY)
     private String type;
 
@@ -66,6 +71,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
         if (ruleNode.getId() != null) {
             this.setId(ruleNode.getUuidId());
         }
+        if (ruleNode.getRuleChainId() != null) {
+            this.ruleChainId = toString(DaoUtil.getId(ruleNode.getRuleChainId()));
+        }
         this.type = ruleNode.getType();
         this.name = ruleNode.getName();
         this.debugMode = ruleNode.isDebugMode();
@@ -88,6 +96,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
     public RuleNode toData() {
         RuleNode ruleNode = new RuleNode(new RuleNodeId(getId()));
         ruleNode.setCreatedTime(UUIDs.unixTimestamp(getId()));
+        if (ruleChainId != null) {
+            ruleNode.setRuleChainId(new RuleChainId(toUUID(ruleChainId)));
+        }
         ruleNode.setType(type);
         ruleNode.setName(name);
         ruleNode.setDebugMode(debugMode);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
index cdb9a80..9ce1fbe 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.thingsboard.server.dao.rule;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -124,6 +123,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
             }
         }
         for (RuleNode node : toAddOrUpdate) {
+            node.setRuleChainId(ruleChain.getId());
             RuleNode savedNode = ruleNodeDao.save(node);
             try {
                 createRelation(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),
@@ -137,7 +137,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
             nodes.set(index, savedNode);
             ruleNodeIndexMap.put(savedNode.getId(), index);
         }
-        for (RuleNode node: toDelete) {
+        for (RuleNode node : toDelete) {
             deleteRuleNode(node.getId());
         }
         RuleNodeId firstRuleNodeId = null;
@@ -234,6 +234,12 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
     }
 
     @Override
+    public ListenableFuture<RuleNode> findRuleNodeByIdAsync(RuleNodeId ruleNodeId) {
+        Validator.validateId(ruleNodeId, "Incorrect rule node id for search request.");
+        return ruleNodeDao.findByIdAsync(ruleNodeId.getId());
+    }
+
+    @Override
     public RuleChain getRootTenantRuleChain(TenantId tenantId) {
         Validator.validateId(tenantId, "Incorrect tenant id for search request.");
         List<EntityRelation> relations = relationService.findByFrom(tenantId, RelationTypeGroup.RULE_CHAIN);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
index da7833d..d516e54 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
@@ -46,6 +46,8 @@ public interface RuleChainService {
 
     ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId);
 
+    ListenableFuture<RuleNode> findRuleNodeByIdAsync(RuleNodeId ruleNodeId);
+
     RuleChain getRootTenantRuleChain(TenantId tenantId);
 
     List<RuleNode> getRuleChainNodes(RuleChainId ruleChainId);
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index d0e62b2..9150587 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -684,6 +684,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_and_sear
 
 CREATE TABLE IF NOT EXISTS  thingsboard.rule_node (
     id uuid,
+    rule_chain_id uuid,
     type text,
     name text,
     debug_mode boolean,
diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql
index d7a0978..5876fbb 100644
--- a/dao/src/main/resources/sql/schema.sql
+++ b/dao/src/main/resources/sql/schema.sql
@@ -270,6 +270,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
 
 CREATE TABLE IF NOT EXISTS rule_node (
     id varchar(31) NOT NULL CONSTRAINT rule_node_pkey PRIMARY KEY,
+    rule_chain_id varchar(31),
     additional_info varchar,
     configuration varchar(10000000),
     type varchar(255),
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java
index faf1696..b71bcb9 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 
 import java.util.Optional;
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index 37f4c4a..4cd6456 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -22,8 +22,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.common.data.relation.EntityRelation;
-import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.plugins.msg.*;
 import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
 import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java
index bd18b8b..5ef0650 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java
@@ -26,15 +26,24 @@ import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.RuleId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
+import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcResponsePluginToRuleMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
 import org.thingsboard.server.extensions.api.rules.RuleException;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
 
 /**
  * @author Andrew Shvayka
@@ -152,7 +161,7 @@ public class DeviceMessagingRuleMsgHandler implements RuleMsgHandler {
                         pendingMsgs.put(uid, requestMd);
                         log.trace("[{}] Forwarding {} to [{}]", uid, params, targetDeviceId);
                         ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(ON_MSG_METHOD_NAME, GSON.toJson(params.get("body")));
-                        ctx.sendRpcRequest(new ToDeviceRpcRequest(uid, null, targetDevice.getTenantId(), targetDeviceId, oneWay, System.currentTimeMillis() + timeout, requestBody));
+                        ctx.sendRpcRequest(new ToDeviceRpcRequest(uid, targetDevice.getTenantId(), targetDeviceId, oneWay, System.currentTimeMillis() + timeout, requestBody));
                     } else {
                         replyWithError(ctx, requestMd, RpcError.FORBIDDEN);
                     }