thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 93(+66 -27)
application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java 33(+33 -0)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java 6(+3 -3)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java 104(+23 -81)
application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java 60(+60 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java 2(+2 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorClientSideRpcTimeoutMsg.java 6(+3 -3)
common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java 33(+33 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java 36(+36 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java 37(+37 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java 30(+30 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java 26(+26 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java 66(+66 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java 33(+33 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 6302963..c1fd878 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -199,6 +199,10 @@ public class ActorSystemContext {
@Getter
private long queuePersistenceTimeout;
+ @Value("${actors.client_side_rpc.timeout}")
+ @Getter
+ private long clientSideRpcTimeout;
+
@Value("${actors.rule.chain.error_persist_frequency}")
@Getter
private long ruleChainErrorPersistFrequency;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index dfae339..93df180 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -25,12 +25,13 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
-import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
+import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
public class DeviceActor extends ContextAwareActor {
@@ -62,10 +63,16 @@ public class DeviceActor extends ContextAwareActor {
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
break;
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
- processor.processRpcRequest(context(), (ToDeviceRpcRequestMsg) msg);
+ processor.processRpcRequest(context(), (ToDeviceRpcRequestActorMsg) msg);
break;
- case DEVICE_ACTOR_RPC_TIMEOUT_MSG:
- processor.processRpcTimeout(context(), (DeviceActorRpcTimeoutMsg) msg);
+ case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
+ processor.processToServerRPCResponse(context(), (ToServerRpcResponseActorMsg) msg);
+ break;
+ case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG:
+ processor.processServerSideRpcTimeout(context(), (DeviceActorServerSideRpcTimeoutMsg) msg);
+ break;
+ case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
+ processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
break;
case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG:
processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 26c0a28..5c15241 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants;
@@ -53,28 +54,28 @@ import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.core.SessionCloseNotification;
import org.thingsboard.server.common.msg.core.SessionOpenMsg;
-import org.thingsboard.server.common.msg.core.StatusCodeResponse;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
-import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
+import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
-import org.thingsboard.server.extensions.api.plugins.PluginCallback;
-import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
+import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -87,7 +88,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -103,10 +103,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private final Map<SessionId, SessionInfo> sessions;
private final Map<SessionId, SessionInfo> attributeSubscriptions;
private final Map<SessionId, SessionInfo> rpcSubscriptions;
- private final Map<Integer, ToDeviceRpcRequestMetadata> rpcPendingMap;
+ private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
+ private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
private final Map<UUID, PendingSessionMsgData> pendingMsgs;
private final Gson gson = new Gson();
+ private final JsonParser jsonParser = new JsonParser();
private int rpcSeq = 0;
private String deviceName;
@@ -120,7 +122,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.sessions = new HashMap<>();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
- this.rpcPendingMap = new HashMap<>();
+ this.toDeviceRpcPendingMap = new HashMap<>();
+ this.toServerRpcPendingMap = new HashMap<>();
this.pendingMsgs = new HashMap<>();
initAttributes();
}
@@ -134,7 +137,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.defaultMetaData.putValue("deviceType", deviceType);
}
- void processRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg) {
+ void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
@@ -174,14 +177,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
- private void registerPendingRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
- rpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
- DeviceActorRpcTimeoutMsg timeoutMsg = new DeviceActorRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
+ private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
+ toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
+ DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
}
- void processRpcTimeout(ActorContext context, DeviceActorRpcTimeoutMsg msg) {
- ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(msg.getId());
+ void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) {
+ ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
@@ -200,7 +203,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
- if (data != null) {
+ if (data != null && data.isReplyOnQueueAck()) {
logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
@@ -208,8 +211,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
- if (!rpcPendingMap.isEmpty()) {
- logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId);
+ if (!toDeviceRpcPendingMap.isEmpty()) {
+ logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
if (type == SessionType.SYNC) {
logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
@@ -219,12 +222,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
Set<Integer> sentOneWayIds = new HashSet<>();
if (type == SessionType.ASYNC) {
- rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
+ toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
} else {
- rpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
+ toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
}
- sentOneWayIds.forEach(rpcPendingMap::remove);
+ sentOneWayIds.forEach(toDeviceRpcPendingMap::remove);
}
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
@@ -263,8 +266,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
handlePostTelemetryRequest(context, msg);
break;
case TO_SERVER_RPC_REQUEST:
+ handleClientSideRPCRequest(context, msg);
break;
- //TODO: push to queue and start processing!
}
}
}
@@ -345,11 +348,47 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
pushToRuleEngineWithTimeout(context, tbMsg, src, request);
}
+ private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
+ ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
+
+ JsonObject json = new JsonObject();
+ json.addProperty("method", request.getMethod());
+ json.add("params", jsonParser.parse(request.getParams()));
+
+ TbMsgMetaData requestMetaData = defaultMetaData.copy();
+ requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json));
+ pushToRuleEngineWithTimeout(context, tbMsg, src, request);
+
+ scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
+ toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
+ }
+
+ public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
+ ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
+ if (data != null) {
+ logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
+ ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
+ sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+ }
+ }
+
+ void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
+ ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
+ if (data != null) {
+ sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+ }
+ }
+
private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {
+ pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true);
+ }
+
+ private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) {
SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType();
int requestId = fromDeviceRequestMsg.getRequestId();
if (systemContext.isQueuePersistenceEnabled()) {
- pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId));
+ pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck));
scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
} else {
ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId());
@@ -394,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
- ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(responseMsg.getRequestId());
+ ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null));
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
index b3f381d..2ce05b2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
@@ -32,5 +32,6 @@ public final class PendingSessionMsgData {
private final Optional<ServerAddress> serverAddress;
private final SessionMsgType sessionMsgType;
private final int requestId;
+ private final boolean replyOnQueueAck;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
index 4ee9a20..8a4262c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
@@ -16,13 +16,13 @@
package org.thingsboard.server.actors.device;
import lombok.Data;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
/**
* @author Andrew Shvayka
*/
@Data
public class ToDeviceRpcRequestMetadata {
- private final ToDeviceRpcRequestMsg msg;
+ private final ToDeviceRpcRequestActorMsg msg;
private final boolean sent;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
new file mode 100644
index 0000000..f82a8c2
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.session.SessionType;
+
+import java.util.Optional;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+public class ToServerRpcRequestMetadata {
+ private final SessionId sessionId;
+ private final SessionType type;
+ private final Optional<ServerAddress> server;
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 65cbc22..bc36dc8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -37,7 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.io.Serializable;
import java.util.UUID;
@@ -142,14 +142,14 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
}
- private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
+ private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
- return new ToDeviceRpcRequestMsg(serverAddress, request);
+ return new ToDeviceRpcRequestActorMsg(serverAddress, request);
}
private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 8ffd378..e1331f9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,15 +16,20 @@
package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
@@ -41,6 +46,7 @@ import scala.concurrent.duration.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* Created by ashvayka on 19.03.18.
@@ -113,6 +119,11 @@ class DefaultTbContext implements TbContext {
}
@Override
+ public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
+ return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data);
+ }
+
+ @Override
public RuleNodeId getSelfId() {
return nodeCtx.getSelf().getId();
}
@@ -206,4 +217,29 @@ class DefaultTbContext implements TbContext {
public MailService getMailService() {
return mainCtx.getMailService();
}
+
+ @Override
+ public RuleEngineRpcService getRpcService() {
+ return new RuleEngineRpcService() {
+ @Override
+ public void sendRpcReply(DeviceId deviceId, int requestId, String body) {
+ mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body);
+ }
+
+ @Override
+ public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
+ ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
+ src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
+ mainCtx.getDeviceRpcService().process(request, response -> {
+ consumer.accept(RuleEngineDeviceRpcResponse.builder()
+ .deviceId(src.getDeviceId())
+ .requestId(src.getRequestId())
+ .error(response.getError())
+ .response(response.getResponse())
+ .build());
+ });
+ }
+
+ };
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
index 2c7d2e6..28261be 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -30,18 +31,22 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
-import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.rpc.RpcRequest;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
import org.thingsboard.server.extensions.api.plugins.PluginConstants;
-import org.thingsboard.server.common.data.rpc.RpcRequest;
-import org.thingsboard.server.service.rpc.LocalRequestMetaData;
+import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.service.rpc.DeviceRpcService;
+import org.thingsboard.server.service.rpc.LocalRequestMetaData;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
@@ -53,6 +58,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Consumer;
/**
* Created by ashvayka on 22.03.18.
@@ -117,7 +123,6 @@ public class RpcController extends BaseController {
accessValidator.validate(currentUser, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
-
ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
tenantId,
deviceId,
@@ -125,7 +130,13 @@ public class RpcController extends BaseController {
timeout,
body
);
- deviceRpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, result));
+ deviceRpcService.process(rpcRequest, new Consumer<FromDeviceRpcResponse>(){
+
+ @Override
+ public void accept(FromDeviceRpcResponse fromDeviceRpcResponse) {
+ reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse);
+ }
+ });
}
@Override
@@ -136,7 +147,7 @@ public class RpcController extends BaseController {
} else {
entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
}
- deviceRpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
+ logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
response.setResult(entity);
}
}));
@@ -146,4 +157,69 @@ public class RpcController extends BaseController {
}
}
+ public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
+ Optional<RpcError> rpcError = response.getError();
+ DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
+ if (rpcError.isPresent()) {
+ logRpcCall(rpcRequest, rpcError, null);
+ RpcError error = rpcError.get();
+ switch (error) {
+ case TIMEOUT:
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+ break;
+ case NO_ACTIVE_CONNECTION:
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
+ break;
+ default:
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
+ break;
+ }
+ } else {
+ Optional<String> responseData = response.getResponse();
+ if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
+ String data = responseData.get();
+ try {
+ logRpcCall(rpcRequest, rpcError, null);
+ responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
+ } catch (IOException e) {
+ log.debug("Failed to decode device response: {}", data, e);
+ logRpcCall(rpcRequest, rpcError, e);
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
+ }
+ } else {
+ logRpcCall(rpcRequest, rpcError, null);
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
+ }
+ }
+ }
+
+ private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
+ logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
+ }
+
+
+ private void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
+ String rpcErrorStr = "";
+ if (rpcError.isPresent()) {
+ rpcErrorStr = "RPC Error: " + rpcError.get().name();
+ }
+ String method = body.getMethod();
+ String params = body.getParams();
+
+ auditLogService.logEntityAction(
+ user.getTenantId(),
+ user.getCustomerId(),
+ user.getId(),
+ user.getName(),
+ (UUIDBased & EntityId) entityId,
+ null,
+ ActionType.RPC_CALL,
+ BaseController.toException(e),
+ rpcErrorStr,
+ oneWay,
+ method,
+ params);
+ }
+
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index cdf8842..27334c6 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -40,7 +40,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import javax.annotation.PreDestroy;
import java.io.IOException;
@@ -132,7 +132,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
- public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) {
+ public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
.setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
tell(serverAddress, msg);
@@ -196,7 +196,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
).build();
}
- private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) {
+ private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
ToDeviceRpcRequest request = msg.getMsg();
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index e5a0434..6aefe46 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -24,7 +24,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.util.UUID;
@@ -41,7 +41,7 @@ public interface ClusterRpcService {
void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
- void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward);
+ void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 4a730db..fcb9c26 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.rpc;
-import akka.actor.ActorRef;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,12 +27,15 @@ import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.controller.BaseController;
import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
@@ -51,6 +53,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
/**
* Created by ashvayka on 27.03.18.
@@ -59,8 +62,6 @@ import java.util.function.BiConsumer;
@Slf4j
public class DefaultDeviceRpcService implements DeviceRpcService {
- private static final ObjectMapper jsonMapper = new ObjectMapper();
-
@Autowired
private ClusterRoutingService routingService;
@@ -75,7 +76,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
private ScheduledExecutorService rpcCallBackExecutor;
- private final ConcurrentMap<UUID, LocalRequestMetaData> localRpcRequests = new ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
@PostConstruct
@@ -91,18 +92,18 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
}
@Override
- public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) {
+ public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
sendRpcRequest(request);
UUID requestId = request.getId();
- localRpcRequests.put(requestId, metaData);
+ localRpcRequests.put(requestId, responseConsumer);
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
log.error("[{}] processing the request: [{}]", this.hashCode(), requestId);
rpcCallBackExecutor.schedule(() -> {
log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
- LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
- if (localMetaData != null) {
- reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
+ Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+ if (consumer != null) {
+ consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
}
}, timeout, TimeUnit.MILLISECONDS);
}
@@ -123,58 +124,27 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
//TODO: send to another server if needed.
UUID requestId = response.getId();
- LocalRequestMetaData md = localRpcRequests.remove(requestId);
- if (md != null) {
- log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
- reply(md, response);
+ Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+ if (consumer != null) {
+ consumer.accept(response);
} else {
log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
}
}
- public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
- Optional<RpcError> rpcError = response.getError();
- DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
- if (rpcError.isPresent()) {
- logRpcCall(rpcRequest, rpcError, null);
- RpcError error = rpcError.get();
- switch (error) {
- case TIMEOUT:
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
- break;
- case NO_ACTIVE_CONNECTION:
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
- break;
- default:
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
- break;
- }
- } else {
- Optional<String> responseData = response.getResponse();
- if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
- String data = responseData.get();
- try {
- logRpcCall(rpcRequest, rpcError, null);
- responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
- } catch (IOException e) {
- log.debug("Failed to decode device response: {}", data, e);
- logRpcCall(rpcRequest, rpcError, e);
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
- }
- } else {
- logRpcCall(rpcRequest, rpcError, null);
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
- }
- }
+ @Override
+ public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
+ ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
+ forward(deviceId, rpcMsg, rpcService::tell);
}
private void sendRpcRequest(ToDeviceRpcRequest msg) {
log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
- ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg);
+ ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
}
- private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer<ServerAddress, ToDeviceRpcRequestMsg> rpcFunction) {
+ private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
Optional<ServerAddress> instance = routingService.resolveById(deviceId);
if (instance.isPresent()) {
log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
@@ -184,32 +154,4 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
actorService.onMsg(msg);
}
}
-
- private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
- logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
- }
-
- @Override
- public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
- String rpcErrorStr = "";
- if (rpcError.isPresent()) {
- rpcErrorStr = "RPC Error: " + rpcError.get().name();
- }
- String method = body.getMethod();
- String params = body.getParams();
-
- auditLogService.logEntityAction(
- user.getTenantId(),
- user.getCustomerId(),
- user.getId(),
- user.getName(),
- (UUIDBased & EntityId) entityId,
- null,
- ActionType.RPC_CALL,
- BaseController.toException(e),
- rpcErrorStr,
- oneWay,
- method,
- params);
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
index f58db7f..4299120 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
@@ -15,26 +15,25 @@
*/
package org.thingsboard.server.service.rpc;
-import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
-import org.thingsboard.server.service.security.model.SecurityUser;
-import java.util.Optional;
+import java.util.function.Consumer;
/**
* Created by ashvayka on 16.04.18.
*/
public interface DeviceRpcService {
- void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData);
+ void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
void process(ToDeviceRpcRequest request, ServerAddress originator);
void process(FromDeviceRpcResponse response);
- void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e);
+ void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
new file mode 100644
index 0000000..f3183ec
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.rpc;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
+import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 16.04.18.
+ */
+@ToString
+@RequiredArgsConstructor
+public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
+
+ private final ServerAddress serverAddress;
+
+ @Getter
+ private final TenantId tenantId;
+
+ @Getter
+ private final DeviceId deviceId;
+
+ @Getter
+ private final ToServerRpcResponseMsg msg;
+
+ public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) {
+ this(null, tenantId, deviceId, msg);
+ }
+
+ public Optional<ServerAddress> getServerAddress() {
+ return Optional.ofNullable(serverAddress);
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
+ }
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 043b8da..d9e9012 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -229,6 +229,8 @@ actors:
enabled: "${ACTORS_QUEUE_ENABLED:true}"
# Maximum allowed timeout for persistence into the queue
timeout: "${ACTORS_QUEUE_PERSISTENCE_TIMEOUT:30000}"
+ client_side_rpc:
+ timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
cache:
# caffeine or redis
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
index f5e249c..dcfde0f 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
@@ -21,7 +21,7 @@ package org.thingsboard.server.common.msg.core;
public enum RuleEngineError {
- QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true);
+ QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true), TIMEOUT;
private final boolean critical;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
index 61e5cf5..e0ff23b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
@@ -44,6 +44,8 @@ public class RuleEngineErrorMsg implements ToDeviceMsg {
return "Timeout during persistence of the message to the queue!";
case SERVER_ERROR:
return "Error during processing of message by the server!";
+ case TIMEOUT:
+ return "Timeout during processing of message by the server!";
default:
throw new RuntimeException("Error " + error + " is not supported!");
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 6bde7fa..74c08da 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -75,7 +75,11 @@ public enum MsgType {
DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG,
- DEVICE_ACTOR_RPC_TIMEOUT_MSG,
+ SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG,
+
+ DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG,
+
+ DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
DEVICE_ACTOR_QUEUE_TIMEOUT_MSG,
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java
new file mode 100644
index 0000000..cd86892
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.timeout;
+
+import org.thingsboard.server.common.msg.MsgType;
+
+/**
+ * @author Andrew Shvayka
+ */
+public final class DeviceActorServerSideRpcTimeoutMsg extends TimeoutMsg<Integer> {
+
+ public DeviceActorServerSideRpcTimeoutMsg(Integer id, long timeout) {
+ super(id, timeout);
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG;
+ }
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
new file mode 100644
index 0000000..b3d724f
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+@Data
+@Builder
+public final class RuleEngineDeviceRpcRequest {
+
+ private final DeviceId deviceId;
+ private final int requestId;
+ private final boolean oneway;
+ private final String method;
+ private final String body;
+ private final long timeout;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java
new file mode 100644
index 0000000..bd0f3bb
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+@Data
+@Builder
+public final class RuleEngineDeviceRpcResponse {
+
+ private final DeviceId deviceId;
+ private final int requestId;
+ private final Optional<String> response;
+ private final Optional<RpcError> error;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
new file mode 100644
index 0000000..df9d8d9
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.id.DeviceId;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+public interface RuleEngineRpcService {
+
+ void sendRpcReply(DeviceId deviceId, int requestId, String body);
+
+ void sendRpcRequest(RuleEngineDeviceRpcRequest request, Consumer<RuleEngineDeviceRpcResponse> consumer);
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 6038e6d..bd0a062 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -15,10 +15,12 @@
*/
package org.thingsboard.rule.engine.api;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
@@ -58,6 +60,8 @@ public interface TbContext {
void updateSelf(RuleNode self);
+ TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
+
RuleNodeId getSelfId();
TenantId getTenantId();
@@ -78,6 +82,8 @@ public interface TbContext {
RuleChainService getRuleChainService();
+ RuleEngineRpcService getRpcService();
+
RuleEngineTelemetryService getTelemetryService();
TimeseriesService getTimeseriesService();
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java
new file mode 100644
index 0000000..e6455da
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+public final class TbRelationTypes {
+
+ public static String SUCCESS = "Success";
+ public static String FAILURE = "Failure";
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
new file mode 100644
index 0000000..9649768
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "rpc call reply",
+ configClazz = TbSendRpcReplyNodeConfiguration.class,
+ nodeDescription = "Sends reply to the RPC call from device",
+ nodeDetails = "Expects messages with any message type. Will forward message body to the device."
+)
+public class TbSendRPCReplyNode implements TbNode {
+
+ private TbSendRpcReplyNodeConfiguration config;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbSendRpcReplyNodeConfiguration.class);
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
+ if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
+ ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!"));
+ } else if (StringUtils.isEmpty(requestIdStr)) {
+ ctx.tellError(msg, new RuntimeException("Request id is not present in the metadata!"));
+ } else if (StringUtils.isEmpty(msg.getData())) {
+ ctx.tellError(msg, new RuntimeException("Request body is empty!"));
+ } else {
+ ctx.getRpcService().sendRpcReply(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData());
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java
new file mode 100644
index 0000000..402a33b
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.DataConstants;
+
+@Data
+public class TbSendRpcReplyNodeConfiguration implements NodeConfiguration<TbSendRpcReplyNodeConfiguration> {
+
+ private String requestIdMetaDataAttribute;
+
+ @Override
+ public TbSendRpcReplyNodeConfiguration defaultConfiguration() {
+ TbSendRpcReplyNodeConfiguration configuration = new TbSendRpcReplyNodeConfiguration();
+ configuration.setRequestIdMetaDataAttribute("requestId");
+ return configuration;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
new file mode 100644
index 0000000..937ced6
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.TbRelationTypes;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "rpc call request",
+ configClazz = TbSendRpcReplyNodeConfiguration.class,
+ nodeDescription = "Sends one-way RPC call to device",
+ nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes."
+)
+public class TbSendRPCRequestNode implements TbNode {
+
+ private Random random = new Random();
+ private Gson gson = new Gson();
+ private JsonParser jsonParser = new JsonParser();
+ private TbSendRpcRequestNodeConfiguration config;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbSendRpcRequestNodeConfiguration.class);
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject();
+
+ if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
+ ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!"));
+ } else if (!json.has("method")) {
+ ctx.tellError(msg, new RuntimeException("Method is not present in the message!"));
+ } else if (!json.has("params")) {
+ ctx.tellError(msg, new RuntimeException("Params are not present in the message!"));
+ } else {
+ int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt();
+ RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
+ .method(gson.toJson(json.get("method")))
+ .body(gson.toJson(json.get("params")))
+ .deviceId(new DeviceId(msg.getOriginator().getId()))
+ .requestId(requestId)
+ .timeout(TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()))
+ .build();
+
+ ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> {
+ if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
+ TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
+ ctx.tellNext(next, TbRelationTypes.SUCCESS);
+ } else {
+ TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
+ ctx.tellNext(next, TbRelationTypes.FAILURE);
+ ctx.tellError(msg, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
+ }
+ });
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ private String wrap(String name, String body) {
+ JsonObject json = new JsonObject();
+ json.addProperty(name, body);
+ return gson.toJson(json);
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java
new file mode 100644
index 0000000..214ce65
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.rpc;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+
+@Data
+public class TbSendRpcRequestNodeConfiguration implements NodeConfiguration<TbSendRpcRequestNodeConfiguration> {
+
+ private int timeoutInSeconds;
+
+ @Override
+ public TbSendRpcRequestNodeConfiguration defaultConfiguration() {
+ TbSendRpcRequestNodeConfiguration configuration = new TbSendRpcRequestNodeConfiguration();
+ configuration.setTimeoutInSeconds(60);
+ return configuration;
+ }
+}