thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 2(+2 -0)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 20(+11 -9)
application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java 4(+1 -3)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java 23(+4 -19)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java 5(+4 -1)
common/data/src/main/java/org/thingsboard/server/common/data/rpc/ToDeviceRpcRequestBody.java 2(+1 -1)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java 1(+1 -0)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java 3(+2 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java 15(+12 -3)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java 161(+0 -161)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRuleMsgHandler.java 102(+0 -102)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 3644a49..07b6b9b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -26,11 +26,13 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.SessionType;
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 10f16ce..7862d23 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -36,9 +36,11 @@ import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.PluginApiCallSecurityContext;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
@@ -348,7 +350,7 @@ public final class PluginProcessingContext implements PluginContext {
throw new IllegalStateException("Not Implemented!");
}
} else {
- callback.onSuccess(this, ValidationResult.ok());
+ callback.onSuccess(this, ValidationResult.ok(null));
}
}
@@ -366,7 +368,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}
}));
@@ -387,7 +389,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (ctx.isCustomerUser() && !asset.getCustomerId().equals(ctx.getCustomerId())) {
return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}
}));
@@ -408,7 +410,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (ctx.isSystemAdmin() && !rule.getTenantId().isNullUid()) {
return ValidationResult.accessDenied("Rule is not in system scope!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}
}));
@@ -429,7 +431,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (ctx.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
return ValidationResult.accessDenied("Rule chain is not in system scope!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}
}));
@@ -451,7 +453,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (ctx.isSystemAdmin() && !plugin.getTenantId().isNullUid()) {
return ValidationResult.accessDenied("Plugin is not in system scope!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}
}));
@@ -472,7 +474,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (ctx.isCustomerUser() && !customer.getId().equals(ctx.getCustomerId())) {
return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}
}));
@@ -483,7 +485,7 @@ public final class PluginProcessingContext implements PluginContext {
if (ctx.isCustomerUser()) {
callback.onSuccess(this, ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else if (ctx.isSystemAdmin()) {
- callback.onSuccess(this, ValidationResult.ok());
+ callback.onSuccess(this, ValidationResult.ok(null));
} else {
ListenableFuture<Tenant> tenantFuture = pluginCtx.tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
@@ -492,7 +494,7 @@ public final class PluginProcessingContext implements PluginContext {
} else if (!tenant.getId().equals(ctx.getTenantId())) {
return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(null);
}
}));
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index 1828970..ad3fef5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -18,9 +18,7 @@ package org.thingsboard.server.actors.plugin;
import akka.actor.ActorRef;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
@@ -38,7 +36,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java
index f135ffb..0319a3e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationResult.java
@@ -20,29 +20,30 @@ import lombok.Data;
@Data
@AllArgsConstructor
-public class ValidationResult {
+public class ValidationResult<V> {
private final ValidationResultCode resultCode;
private final String message;
+ private final V v;
- public static ValidationResult ok() {
- return new ValidationResult(ValidationResultCode.OK, "Ok");
+ public static <V> ValidationResult<V> ok(V v) {
+ return new ValidationResult<>(ValidationResultCode.OK, "Ok", v);
}
- public static ValidationResult accessDenied(String message) {
- return new ValidationResult(ValidationResultCode.ACCESS_DENIED, message);
+ public static <V> ValidationResult<V> accessDenied(String message) {
+ return new ValidationResult<>(ValidationResultCode.ACCESS_DENIED, message, null);
}
- public static ValidationResult entityNotFound(String message) {
- return new ValidationResult(ValidationResultCode.ENTITY_NOT_FOUND, message);
+ public static <V> ValidationResult<V> entityNotFound(String message) {
+ return new ValidationResult<>(ValidationResultCode.ENTITY_NOT_FOUND, message, null);
}
- public static ValidationResult unauthorized(String message) {
- return new ValidationResult(ValidationResultCode.UNAUTHORIZED, message);
+ public static <V> ValidationResult<V> unauthorized(String message) {
+ return new ValidationResult<>(ValidationResultCode.UNAUTHORIZED, message, null);
}
- public static ValidationResult internalError(String message) {
- return new ValidationResult(ValidationResultCode.INTERNAL_ERROR, message);
+ public static <V> ValidationResult<V> internalError(String message) {
+ return new ValidationResult<>(ValidationResultCode.INTERNAL_ERROR, message, null);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 4fa2227..22fce84 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -24,10 +24,12 @@ import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.*;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
@@ -35,6 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import java.io.Serializable;
import java.util.UUID;
@@ -139,28 +142,20 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
}
- private static ToDeviceRpcRequestPluginMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
- ClusterAPIProtos.PluginAddress address = msg.getAddress();
- TenantId pluginTenantId = new TenantId(toUUID(address.getTenantId()));
- PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
-
+ private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
- ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), null, deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
+ ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
- return new ToDeviceRpcRequestPluginMsg(serverAddress, pluginId, pluginTenantId, request);
+ return new ToDeviceRpcRequestMsg(serverAddress, request);
}
private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
- ClusterAPIProtos.PluginAddress address = msg.getAddress();
- TenantId pluginTenantId = new TenantId(toUUID(address.getTenantId()));
- PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
-
RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
- return new ToPluginRpcResponseDeviceMsg(pluginId, pluginTenantId, response);
+ return new ToPluginRpcResponseDeviceMsg(null, null, response);
}
@SuppressWarnings("unchecked")
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index c7574e8..f08feec 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -602,7 +602,7 @@ public abstract class BaseController {
auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo);
}
- protected static Exception toException(Throwable error) {
+ public static Exception toException(Throwable error) {
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
new file mode 100644
index 0000000..9810c03
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.controller;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
+import org.thingsboard.server.common.data.exception.ThingsboardException;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
+import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
+import org.thingsboard.server.extensions.api.plugins.PluginConstants;
+import org.thingsboard.server.common.data.rpc.RpcRequest;
+import org.thingsboard.server.service.rpc.LocalRequestMetaData;
+import org.thingsboard.server.service.rpc.RpcService;
+import org.thingsboard.server.service.security.AccessValidator;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 22.03.18.
+ */
+@RestController
+@RequestMapping(PluginConstants.RPC_URL_PREFIX)
+@Slf4j
+public class RpcController extends BaseController {
+
+ public static final int DEFAULT_TIMEOUT = 10000;
+ protected final ObjectMapper jsonMapper = new ObjectMapper();
+
+ @Autowired
+ private RpcService rpcService;
+
+ @Autowired
+ private AccessValidator accessValidator;
+
+ private ExecutorService executor;
+
+ @PostConstruct
+ public void initExecutor() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> handleOneWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException {
+ return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/twoway/{deviceId}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> handleTwoWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException {
+ return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
+ }
+
+
+ private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
+ try {
+ JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
+ RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
+ jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
+
+ if (rpcRequestBody.has("timeout")) {
+ cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
+ }
+ SecurityUser currentUser = getCurrentUser();
+ TenantId tenantId = currentUser.getTenantId();
+ final DeferredResult<ResponseEntity> response = new DeferredResult<>();
+ long timeout = System.currentTimeMillis() + (cmd.getTimeout() != null ? cmd.getTimeout() : DEFAULT_TIMEOUT);
+ ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
+ accessValidator.validate(currentUser, deviceId, new FutureCallback<ValidationResult>() {
+ @Override
+ public void onSuccess(@Nullable ValidationResult result) {
+ ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
+ tenantId,
+ deviceId,
+ oneWay,
+ timeout,
+ body
+ );
+ rpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, response));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ ResponseEntity entity;
+ if (e instanceof ToErrorResponseEntity) {
+ entity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
+ } else {
+ entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
+ }
+ rpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
+ response.setResult(entity);
+ }
+ });
+ return response;
+ } catch (IOException ioe) {
+ throw new ThingsboardException("Invalid request body", ioe, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
+ }
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index 5a6b307..f73c5eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -19,7 +19,6 @@ import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -27,7 +26,6 @@ import org.springframework.util.SerializationUtils;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@@ -35,21 +33,18 @@ import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
-import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
-import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -138,7 +133,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
- public void tell(ServerAddress serverAddress, ToDeviceRpcRequestPluginMsg toForward) {
+ public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) {
ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
.setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
tell(serverAddress, msg);
@@ -202,15 +197,10 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
).build();
}
- private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestPluginMsg msg) {
+ private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) {
ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
ToDeviceRpcRequest request = msg.getMsg();
- builder.setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
- .setTenantId(toUid(msg.getPluginTenantId().getId()))
- .setPluginId(toUid(msg.getPluginId().getId()))
- .build());
-
builder.setDeviceTenantId(toUid(msg.getTenantId()));
builder.setDeviceId(toUid(msg.getDeviceId()));
@@ -227,11 +217,6 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
FromDeviceRpcResponse request = msg.getResponse();
- builder.setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
- .setTenantId(toUid(msg.getPluginTenantId().getId()))
- .setPluginId(toUid(msg.getPluginId().getId()))
- .build());
-
builder.setMsgId(toUid(request.getId()));
request.getResponse().ifPresent(builder::setResponse);
request.getError().ifPresent(e -> builder.setError(e.name()));
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 8c50bb7..b2b9e88 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -20,11 +20,13 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import java.util.UUID;
@@ -41,7 +43,7 @@ public interface ClusterRpcService {
void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
- void tell(ServerAddress serverAddress, ToDeviceRpcRequestPluginMsg toForward);
+ void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward);
void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
@@ -50,4 +52,5 @@ public interface ClusterRpcService {
void broadcast(ToAllNodesMsg msg);
void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultRpcService.java
new file mode 100644
index 0000000..f81de99
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultRpcService.java
@@ -0,0 +1,201 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.rpc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.audit.ActionType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
+import org.thingsboard.server.controller.BaseController;
+import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Service
+@Slf4j
+public class DefaultRpcService implements RpcService {
+
+ private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+ @Autowired
+ private ClusterRoutingService routingService;
+
+ @Autowired
+ private ClusterRpcService rpcService;
+
+ @Autowired
+ private ActorService actorService;
+
+ @Autowired
+ private AuditLogService auditLogService;
+
+ private ScheduledExecutorService rpcCallBackExecutor;
+
+ private final ConcurrentMap<UUID, LocalRequestMetaData> localRpcRequests = new ConcurrentHashMap<>();
+
+
+ @PostConstruct
+ public void initExecutor() {
+ rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ if (rpcCallBackExecutor != null) {
+ rpcCallBackExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) {
+ log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
+ sendRpcRequest(request);
+ UUID requestId = request.getId();
+ localRpcRequests.put(requestId, metaData);
+ long timeout = Math.max(0, System.currentTimeMillis() - request.getExpirationTime());
+ rpcCallBackExecutor.schedule(() -> {
+ LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
+ if (localMetaData != null) {
+ reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
+ }
+ }, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void process(FromDeviceRpcResponse response) {
+ UUID requestId = response.getId();
+ LocalRequestMetaData md = localRpcRequests.remove(requestId);
+ if (md != null) {
+ log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
+ reply(md, response);
+ } else {
+ log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+ }
+ }
+
+ public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
+ Optional<RpcError> rpcError = response.getError();
+ DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
+ if (rpcError.isPresent()) {
+ logRpcCall(rpcRequest, rpcError, null);
+ RpcError error = rpcError.get();
+ switch (error) {
+ case TIMEOUT:
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+ break;
+ case NO_ACTIVE_CONNECTION:
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
+ break;
+ default:
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+ break;
+ }
+ } else {
+ Optional<String> responseData = response.getResponse();
+ if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
+ String data = responseData.get();
+ try {
+ logRpcCall(rpcRequest, rpcError, null);
+ responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
+ } catch (IOException e) {
+ log.debug("Failed to decode device response: {}", data, e);
+ logRpcCall(rpcRequest, rpcError, e);
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
+ }
+ } else {
+ logRpcCall(rpcRequest, rpcError, null);
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
+ }
+ }
+ }
+
+ private void sendRpcRequest(ToDeviceRpcRequest msg) {
+ log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
+ ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg);
+ forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
+ }
+
+ private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer<ServerAddress, ToDeviceRpcRequestMsg> rpcFunction) {
+ Optional<ServerAddress> instance = routingService.resolveById(deviceId);
+ if (instance.isPresent()) {
+ log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
+ rpcFunction.accept(instance.get(), msg);
+ } else {
+ log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
+ actorService.onMsg(msg);
+ }
+ }
+
+ private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
+ logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
+ }
+
+ @Override
+ public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
+ String rpcErrorStr = "";
+ if (rpcError.isPresent()) {
+ rpcErrorStr = "RPC Error: " + rpcError.get().name();
+ }
+ String method = body.getMethod();
+ String params = body.getParams();
+
+ auditLogService.logEntityAction(
+ user.getTenantId(),
+ user.getCustomerId(),
+ user.getId(),
+ user.getName(),
+ (UUIDBased & EntityId) entityId,
+ null,
+ ActionType.RPC_CALL,
+ BaseController.toException(e),
+ rpcErrorStr,
+ oneWay,
+ method,
+ params);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java
new file mode 100644
index 0000000..4ff2a7c
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.rpc;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
+import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 16.04.18.
+ */
+@ToString
+@RequiredArgsConstructor
+public class ToDeviceRpcRequestMsg implements ToDeviceActorNotificationMsg {
+
+ private final ServerAddress serverAddress;
+ @Getter
+ private final ToDeviceRpcRequest msg;
+
+ public ToDeviceRpcRequestMsg(ToDeviceRpcRequest msg) {
+ this(null, msg);
+ }
+
+ public Optional<ServerAddress> getServerAddress() {
+ return Optional.ofNullable(serverAddress);
+ }
+
+ @Override
+ public DeviceId getDeviceId() {
+ return msg.getDeviceId();
+ }
+
+ @Override
+ public TenantId getTenantId() {
+ return msg.getTenantId();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
index c1f3688..8487279 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -25,6 +25,7 @@ import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
@@ -35,8 +36,10 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.controller.HttpValidationCallback;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
@@ -140,7 +143,7 @@ public class AccessValidator {
return response;
}
- public <T> void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+ public void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
switch (entityId.getEntityType()) {
case DEVICE:
validateDevice(currentUser, entityId, callback);
@@ -177,14 +180,14 @@ public class AccessValidator {
} else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(device);
}
}
}), executor);
}
}
- private <T> void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+ private void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -198,15 +201,14 @@ public class AccessValidator {
} else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(asset);
}
}
}), executor);
}
}
-
- private <T> void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+ private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
if (currentUser.isCustomerUser()) {
callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -220,14 +222,40 @@ public class AccessValidator {
} else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
return ValidationResult.accessDenied("Rule chain is not in system scope!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(ruleChain);
+ }
+ }
+ }), executor);
+ }
+ }
+
+ private void validateRule(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+ if (currentUser.isCustomerUser()) {
+ callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+ } else {
+ ListenableFuture<RuleNode> ruleNodeFuture = ruleChainService.findRuleNodeByIdAsync(new RuleNodeId(entityId.getId()));
+ Futures.addCallback(ruleNodeFuture, getCallback(callback, ruleNodeTmp -> {
+ RuleNode ruleNode = ruleNodeTmp;
+ if (ruleNode == null) {
+ return ValidationResult.entityNotFound("Rule node with requested id wasn't found!");
+ } else if (ruleNode.getRuleChainId() == null) {
+ return ValidationResult.entityNotFound("Rule chain with requested node id wasn't found!");
+ } else {
+ //TODO: make async
+ RuleChain ruleChain = ruleChainService.findRuleChainById(ruleNode.getRuleChainId());
+ if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
+ return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
+ } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
+ return ValidationResult.accessDenied("Rule chain is not in system scope!");
+ } else {
+ return ValidationResult.ok(ruleNode);
}
}
}), executor);
}
}
- private <T> void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+ private void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
@@ -241,18 +269,18 @@ public class AccessValidator {
} else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(customer);
}
}
}), executor);
}
}
- private <T> void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
+ private void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
if (currentUser.isCustomerUser()) {
callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else if (currentUser.isSystemAdmin()) {
- callback.onSuccess(ValidationResult.ok());
+ callback.onSuccess(ValidationResult.ok(null));
} else {
ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
@@ -261,13 +289,13 @@ public class AccessValidator {
} else if (!tenant.getId().equals(currentUser.getTenantId())) {
return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
} else {
- return ValidationResult.ok();
+ return ValidationResult.ok(tenant);
}
}), executor);
}
}
- private <T> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult> transformer) {
+ private <T, V> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult<V>> transformer) {
return new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable T result) {
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index e106d1b..3bb0db7 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -61,7 +61,6 @@ message ConnectRpcMessage {
}
message ToDeviceRpcRequestRpcMessage {
- PluginAddress address = 1;
Uid deviceTenantId = 2;
Uid deviceId = 3;
@@ -73,8 +72,6 @@ message ToDeviceRpcRequestRpcMessage {
}
message ToPluginRpcResponseRpcMessage {
- PluginAddress address = 1;
-
Uid msgId = 2;
string response = 3;
string error = 4;
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
index fbc1103..808166b 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
@@ -22,6 +22,7 @@ import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
+import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -32,6 +33,7 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
private static final long serialVersionUID = -5656679015121235465L;
+ private RuleChainId ruleChainId;
private String type;
private String name;
private boolean debugMode;
@@ -49,8 +51,10 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
public RuleNode(RuleNode ruleNode) {
super(ruleNode);
+ this.ruleChainId = ruleNode.getRuleChainId();
this.type = ruleNode.getType();
this.name = ruleNode.getName();
+ this.debugMode = ruleNode.isDebugMode();
this.setConfiguration(ruleNode.getConfiguration());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 8c34cd3..aff6381 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -350,6 +350,7 @@ public class ModelConstants {
* Cassandra rule node constants.
*/
public static final String RULE_NODE_COLUMN_FAMILY_NAME = "rule_node";
+ public static final String RULE_NODE_CHAIN_ID_PROPERTY = "rule_chain_id";
public static final String RULE_NODE_TYPE_PROPERTY = "type";
public static final String RULE_NODE_NAME_PROPERTY = "name";
public static final String RULE_NODE_CONFIGURATION_PROPERTY = "configuration";
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
index 8d3f3c3..ec13022 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
@@ -24,6 +24,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.model.SearchTextEntity;
@@ -41,6 +42,8 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
@PartitionKey
@Column(name = ID_PROPERTY)
private UUID id;
+ @Column(name = RULE_NODE_CHAIN_ID_PROPERTY)
+ private UUID ruleChainId;
@Column(name = RULE_NODE_TYPE_PROPERTY)
private String type;
@Column(name = RULE_NODE_NAME_PROPERTY)
@@ -56,7 +59,6 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
@Column(name = DEBUG_MODE)
private boolean debugMode;
-
public RuleNodeEntity() {
}
@@ -64,6 +66,9 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
if (ruleNode.getId() != null) {
this.id = ruleNode.getUuidId();
}
+ if (ruleNode.getRuleChainId() != null) {
+ this.ruleChainId = ruleNode.getRuleChainId().getId();
+ }
this.type = ruleNode.getType();
this.name = ruleNode.getName();
this.debugMode = ruleNode.isDebugMode();
@@ -92,6 +97,14 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
this.id = id;
}
+ public UUID getRuleChainId() {
+ return ruleChainId;
+ }
+
+ public void setRuleChainId(UUID ruleChainId) {
+ this.ruleChainId = ruleChainId;
+ }
+
public String getType() {
return type;
}
@@ -132,6 +145,9 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
public RuleNode toData() {
RuleNode ruleNode = new RuleNode(new RuleNodeId(id));
ruleNode.setCreatedTime(UUIDs.unixTimestamp(id));
+ if (this.ruleChainId != null) {
+ ruleNode.setRuleChainId(new RuleChainId(this.ruleChainId));
+ }
ruleNode.setType(this.type);
ruleNode.setName(this.name);
ruleNode.setDebugMode(this.debugMode);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
index 6a888c2..1d05433 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
@@ -21,8 +21,10 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
+import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.BaseSqlEntity;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.SearchTextEntity;
@@ -39,6 +41,9 @@ import javax.persistence.Table;
@Table(name = ModelConstants.RULE_NODE_COLUMN_FAMILY_NAME)
public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTextEntity<RuleNode> {
+ @Column(name = ModelConstants.RULE_NODE_CHAIN_ID_PROPERTY)
+ private String ruleChainId;
+
@Column(name = ModelConstants.RULE_NODE_TYPE_PROPERTY)
private String type;
@@ -66,6 +71,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
if (ruleNode.getId() != null) {
this.setId(ruleNode.getUuidId());
}
+ if (ruleNode.getRuleChainId() != null) {
+ this.ruleChainId = toString(DaoUtil.getId(ruleNode.getRuleChainId()));
+ }
this.type = ruleNode.getType();
this.name = ruleNode.getName();
this.debugMode = ruleNode.isDebugMode();
@@ -88,6 +96,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
public RuleNode toData() {
RuleNode ruleNode = new RuleNode(new RuleNodeId(getId()));
ruleNode.setCreatedTime(UUIDs.unixTimestamp(getId()));
+ if (ruleChainId != null) {
+ ruleNode.setRuleChainId(new RuleChainId(toUUID(ruleChainId)));
+ }
ruleNode.setType(type);
ruleNode.setName(name);
ruleNode.setDebugMode(debugMode);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
index cdb9a80..9ce1fbe 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.thingsboard.server.dao.rule;
import com.google.common.util.concurrent.ListenableFuture;
@@ -124,6 +123,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
}
for (RuleNode node : toAddOrUpdate) {
+ node.setRuleChainId(ruleChain.getId());
RuleNode savedNode = ruleNodeDao.save(node);
try {
createRelation(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),
@@ -137,7 +137,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
nodes.set(index, savedNode);
ruleNodeIndexMap.put(savedNode.getId(), index);
}
- for (RuleNode node: toDelete) {
+ for (RuleNode node : toDelete) {
deleteRuleNode(node.getId());
}
RuleNodeId firstRuleNodeId = null;
@@ -234,6 +234,12 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
@Override
+ public ListenableFuture<RuleNode> findRuleNodeByIdAsync(RuleNodeId ruleNodeId) {
+ Validator.validateId(ruleNodeId, "Incorrect rule node id for search request.");
+ return ruleNodeDao.findByIdAsync(ruleNodeId.getId());
+ }
+
+ @Override
public RuleChain getRootTenantRuleChain(TenantId tenantId) {
Validator.validateId(tenantId, "Incorrect tenant id for search request.");
List<EntityRelation> relations = relationService.findByFrom(tenantId, RelationTypeGroup.RULE_CHAIN);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
index da7833d..d516e54 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
@@ -46,6 +46,8 @@ public interface RuleChainService {
ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId);
+ ListenableFuture<RuleNode> findRuleNodeByIdAsync(RuleNodeId ruleNodeId);
+
RuleChain getRootTenantRuleChain(TenantId tenantId);
List<RuleNode> getRuleChainNodes(RuleChainId ruleChainId);
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index d0e62b2..9150587 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -684,6 +684,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_and_sear
CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
id uuid,
+ rule_chain_id uuid,
type text,
name text,
debug_mode boolean,
diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql
index d7a0978..5876fbb 100644
--- a/dao/src/main/resources/sql/schema.sql
+++ b/dao/src/main/resources/sql/schema.sql
@@ -270,6 +270,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
CREATE TABLE IF NOT EXISTS rule_node (
id varchar(31) NOT NULL CONSTRAINT rule_node_pkey PRIMARY KEY,
+ rule_chain_id varchar(31),
additional_info varchar,
configuration varchar(10000000),
type varchar(255),
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java
index faf1696..b71bcb9 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestPluginMsg.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import java.util.Optional;
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index 37f4c4a..4cd6456 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -22,8 +22,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.data.relation.EntityRelation;
-import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.msg.*;
import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java
index bd18b8b..5ef0650 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/messaging/DeviceMessagingRuleMsgHandler.java
@@ -26,15 +26,24 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
+import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcResponsePluginToRuleMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
/**
* @author Andrew Shvayka
@@ -152,7 +161,7 @@ public class DeviceMessagingRuleMsgHandler implements RuleMsgHandler {
pendingMsgs.put(uid, requestMd);
log.trace("[{}] Forwarding {} to [{}]", uid, params, targetDeviceId);
ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(ON_MSG_METHOD_NAME, GSON.toJson(params.get("body")));
- ctx.sendRpcRequest(new ToDeviceRpcRequest(uid, null, targetDevice.getTenantId(), targetDeviceId, oneWay, System.currentTimeMillis() + timeout, requestBody));
+ ctx.sendRpcRequest(new ToDeviceRpcRequest(uid, targetDevice.getTenantId(), targetDeviceId, oneWay, System.currentTimeMillis() + timeout, requestBody));
} else {
replyWithError(ctx, requestMd, RpcError.FORBIDDEN);
}