thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 306(+149 -157)
application/src/main/java/org/thingsboard/server/actors/device/RuleEngineQueuePutAckMsg.java 36(+0 -36)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 40(+12 -28)
application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java 16(+0 -16)
application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java 16(+15 -1)
application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java 17(+17 -0)
application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java 9(+4 -5)
application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java 8(+4 -4)
common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java 52(+0 -52)
common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorQueueTimeoutMsg.java 36(+0 -36)
common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java 180(+172 -8)
common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java 8(+4 -4)
common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java 8(+4 -4)
common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java 2(+0 -2)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java 1(+1 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 11(+7 -4)
common/transport/src/main/proto/transport.proto 36(+22 -14)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java 61(+61 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java 13(+13 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 302(+178 -124)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java 4(+3 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 8ff61ac..1081ab2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -200,10 +200,6 @@ public class ActorSystemContext {
@Autowired
@Getter
- private MsgQueueService msgQueueService;
-
- @Autowired
- @Getter
private DeviceStateService deviceStateService;
@Lazy
@@ -269,10 +265,6 @@ public class ActorSystemContext {
@Getter
@Setter
- private ActorRef sessionManagerActor;
-
- @Getter
- @Setter
private ActorRef statsActor;
@Getter
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 14ca586..f4373db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -38,7 +38,6 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.model.ModelConstants;
@@ -113,19 +112,12 @@ public class AppActor extends RuleChainManagerActor {
case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg);
break;
- case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
- onToDeviceSessionMsg((BasicActorSystemToDeviceSessionActorMsg) msg);
- break;
default:
return false;
}
return true;
}
- private void onToDeviceSessionMsg(BasicActorSystemToDeviceSessionActorMsg msg) {
- systemContext.getSessionManagerActor().tell(msg, self());
- }
-
private void onPossibleClusterMsg(SendToClusterMsg msg) {
Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
if (address.isPresent()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 7fabe49..bd2a0f4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -74,12 +73,6 @@ public class DeviceActor extends ContextAwareActor {
case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
break;
- case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG:
- processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg);
- break;
- case RULE_ENGINE_QUEUE_PUT_ACK_MSG:
- processor.processQueueAck(context(), (RuleEngineQueuePutAckMsg) msg);
- break;
default:
return false;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 1313f60..0822766 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,6 @@
package org.thingsboard.server.actors.device;
import akka.actor.ActorContext;
-import akka.actor.ActorRef;
import akka.event.LoggingAdapter;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.FutureCallback;
@@ -46,29 +45,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
-import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
-import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
-import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
-import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
-import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
-import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.SessionCloseNotification;
-import org.thingsboard.server.common.msg.core.SessionOpenMsg;
-import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@@ -88,9 +73,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.thingsboard.server.gen.transport.TransportProtos.*;
@@ -192,19 +175,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
- PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
- if (data != null && data.isReplyOnQueueAck()) {
- int remainingAcks = data.getAckMsgCount() - 1;
- data.setAckMsgCount(remainingAcks);
- logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
- if (remainingAcks == 0) {
- ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
- }
- }
- }
-
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
if (!toDeviceRpcPendingMap.isEmpty()) {
logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
@@ -239,8 +209,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
body.getMethod(),
body.getParams()
);
- ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
- sendMsgToSessionActor(response, server);
+// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
+// sendMsgToSessionActor(response, server);
};
}
@@ -292,57 +262,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
-
- Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
- @Override
- public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
- systemContext.getRuleEngineTransportService().process();
- BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
- request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (t instanceof Exception) {
- ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
- } else {
- logger.error("[{}] Failed to process attributes request", deviceId, t);
- }
- }
- });
- }
-
- private Optional<Set<String>> toOptionalSet(List<String> strings) {
- if (strings == null || strings.isEmpty()) {
- return Optional.empty();
- } else {
- return Optional.of(new HashSet<>(strings));
- }
- }
-
- private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
- GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
- ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
- ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, request.getSharedAttributeNames());
-
+ UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ int requestId = request.getRequestId();
Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
@Override
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
- BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
- request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+ GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
+ .setRequestId(requestId)
+ .addAllClientAttributeList(toTsKvProtos(result.get(0)))
+ .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
+ .build();
+ sendToTransport(responseMsg, sessionId, sessionInfo);
}
@Override
public void onFailure(Throwable t) {
- if (t instanceof Exception) {
- ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
- } else {
- logger.error("[{}] Failed to process attributes request", deviceId, t);
- }
+ GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
+ .setError(t.getMessage())
+ .build();
+ sendToTransport(responseMsg, sessionId, sessionInfo);
}
});
}
@@ -376,36 +314,36 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
- ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
-
- JsonObject json = new JsonObject();
- json.addProperty("method", request.getMethod());
- json.add("params", jsonParser.parse(request.getParams()));
-
- TbMsgMetaData requestMetaData = defaultMetaData.copy();
- requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
- PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
- pushToRuleEngineWithTimeout(context, tbMsg, msgData);
-
- scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
- toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
- }
+// private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
+// ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
+//
+// JsonObject json = new JsonObject();
+// json.addProperty("method", request.getMethod());
+// json.add("params", jsonParser.parse(request.getParams()));
+//
+// TbMsgMetaData requestMetaData = defaultMetaData.copy();
+// requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
+// TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+// PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
+// pushToRuleEngineWithTimeout(context, tbMsg, msgData);
+//
+// scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
+// toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
+// }
public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
if (data != null) {
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
}
}
@@ -433,68 +371,68 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
if (notification != null) {
ToDeviceMsg finalNotification = notification;
- attributeSubscriptions.entrySet().forEach(sub -> {
- ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
- sendMsgToSessionActor(response, sub.getValue().getServer());
- });
+// attributeSubscriptions.entrySet().forEach(sub -> {
+// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
+// sendMsgToSessionActor(response, sub.getValue().getServer());
+// });
}
} else {
logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
}
}
- private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
- SessionId sessionId = msg.getSessionId();
- FromDeviceMsg inMsg = msg.getPayload();
- if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
- logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
- ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
- ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
- boolean success = requestMd != null;
- if (success) {
- systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
- requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
- } else {
- logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
- }
- if (msg.getSessionType() == SessionType.SYNC) {
- BasicCommandAckResponse response = success
- ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
- : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
- }
- }
- }
+// private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
+// SessionId sessionId = msg.getSessionId();
+// FromDeviceMsg inMsg = msg.getPayload();
+// if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
+// logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
+// ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
+// ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
+// boolean success = requestMd != null;
+// if (success) {
+// systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+// requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
+// } else {
+// logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
+// }
+// if (msg.getSessionType() == SessionType.SYNC) {
+// BasicCommandAckResponse response = success
+// ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
+// : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
+// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
+// }
+// }
+// }
void processClusterEventMsg(ClusterEventMsg msg) {
- if (!msg.isAdded()) {
- logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
- Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
- .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
- attributeSubscriptions.entrySet().removeIf(filter);
- rpcSubscriptions.entrySet().removeIf(filter);
- }
+// if (!msg.isAdded()) {
+// logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
+// Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
+// .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
+// attributeSubscriptions.entrySet().removeIf(filter);
+// rpcSubscriptions.entrySet().removeIf(filter);
+// }
}
- private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
- SessionId sessionId = msg.getSessionId();
- SessionType sessionType = msg.getSessionType();
- FromDeviceMsg inMsg = msg.getPayload();
- if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
- logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
- attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
- } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
- logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
- attributeSubscriptions.remove(sessionId);
- } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
- logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
- rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
- sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
- } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
- logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
- rpcSubscriptions.remove(sessionId);
- }
- }
+// private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
+// SessionId sessionId = msg.getSessionId();
+// SessionType sessionType = msg.getSessionType();
+// FromDeviceMsg inMsg = msg.getPayload();
+// if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
+// logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
+// attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
+// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
+// logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
+// attributeSubscriptions.remove(sessionId);
+// } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
+// logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
+// rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
+// sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
+// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
+// logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
+// rpcSubscriptions.remove(sessionId);
+// }
+// }
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
@@ -506,15 +444,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
}
}
- sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
+ sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId()));
if (sessions.size() == 1) {
reportSessionOpen();
}
- }
- FromDeviceMsg inMsg = msg.getPayload();
- if (inMsg instanceof SessionOpenMsg) {
- logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
- } else if (inMsg instanceof SessionCloseMsg) {
+ } else if (msg.getEvent() == SessionEvent.CLOSED) {
logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
@@ -532,7 +466,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext.getRpcService().tell(systemContext.getEncodingService()
.convertToProtoDataMessage(sessionAddress.get(), response));
} else {
- systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
+// systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
}
}
@@ -578,4 +512,62 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
return json;
}
+
+ private Optional<Set<String>> toOptionalSet(List<String> strings) {
+ if (strings == null || strings.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new HashSet<>(strings));
+ }
+ }
+
+ private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) {
+ DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+ .setSessionIdMSB(sessionId.getMostSignificantBits())
+ .setSessionIdLSB(sessionId.getLeastSignificantBits())
+ .setGetAttributesResponse(responseMsg).build();
+ systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+ }
+
+ private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
+ List<TsKvProto> clientAttributes;
+ if (result == null || result.isEmpty()) {
+ clientAttributes = Collections.emptyList();
+ } else {
+ clientAttributes = new ArrayList<>(result.size());
+ for (AttributeKvEntry attrEntry : result) {
+ clientAttributes.add(toTsKvProto(attrEntry));
+ }
+ }
+ return clientAttributes;
+ }
+
+ private TsKvProto toTsKvProto(AttributeKvEntry attrEntry) {
+ return TsKvProto.newBuilder().setTs(attrEntry.getLastUpdateTs())
+ .setKv(toKeyValueProto(attrEntry)).build();
+ }
+
+ private KeyValueProto toKeyValueProto(KvEntry kvEntry) {
+ KeyValueProto.Builder builder = KeyValueProto.newBuilder();
+ builder.setKey(kvEntry.getKey());
+ switch (kvEntry.getDataType()) {
+ case BOOLEAN:
+ builder.setType(KeyValueType.BOOLEAN_V);
+ builder.setBoolV(kvEntry.getBooleanValue().get());
+ break;
+ case DOUBLE:
+ builder.setType(KeyValueType.DOUBLE_V);
+ builder.setDoubleV(kvEntry.getDoubleValue().get());
+ break;
+ case LONG:
+ builder.setType(KeyValueType.LONG_V);
+ builder.setLongV(kvEntry.getLongValue().get());
+ break;
+ case STRING:
+ builder.setType(KeyValueType.STRING_V);
+ builder.setStringV(kvEntry.getStrValue().get());
+ break;
+ }
+ return builder.build();
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 9faaade..32cb60d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index fe02335..3da90d1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -25,7 +25,6 @@ import java.util.Optional;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
-import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.EntityType;
@@ -90,26 +89,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
- reprocess(ruleNodeList);
started = true;
} else {
onUpdate(context);
}
}
- private void reprocess(List<RuleNode> ruleNodeList) {
- for (RuleNode ruleNode : ruleNodeList) {
- for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
- pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
- }
- }
- if (firstNode != null) {
- for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), systemContext.getQueuePartitionId())) {
- pushMsgToNode(firstNode, tbMsg, "");
- }
- }
- }
-
@Override
public void onUpdate(ActorContext context) throws Exception {
RuleChain ruleChain = service.findRuleChainById(entityId);
@@ -134,7 +119,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
});
initRoutes(ruleChain, ruleNodeList);
- reprocess(ruleNodeList);
}
@Override
@@ -188,17 +172,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
if (firstNode != null) {
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+ pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
}
}
void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
checkActive();
if (firstNode != null) {
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
- pushMsgToNode(firstNode, msg, "");
- envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
- });
+ pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
}
}
@@ -206,15 +187,16 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
checkActive();
if (envelope.isEnqueue()) {
if (firstNode != null) {
- putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+ pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getMsg()), envelope.getFromRelationType());
}
} else {
if (firstNode != null) {
pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
} else {
- TbMsg msg = envelope.getMsg();
- EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
- queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
+// TODO: Ack this message in Kafka
+// TbMsg msg = envelope.getMsg();
+// EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+// queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
}
}
}
@@ -249,7 +231,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) {
if (ackId != null) {
- queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+// TODO: Ack this message in Kafka
+// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
}
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
@@ -269,7 +252,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
//TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
if (ackId != null) {
- queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+// TODO: Ack this message in Kafka
+// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
}
}
}
@@ -296,7 +280,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
RuleNodeId targetId = new RuleNodeId(target.getId());
RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
- putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg, fromRelationType));
+ pushMsgToNode(targetNodeCtx, copy, fromRelationType);
}
private void pushToTarget(TbMsg msg, EntityId target, String fromRelationType) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 1e9e23b..b70d1ed 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -30,7 +30,6 @@ import org.thingsboard.server.actors.app.AppActor;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcManagerActor;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -90,8 +89,6 @@ public class DefaultActorService implements ActorService {
private ActorRef appActor;
- private ActorRef sessionManagerActor;
-
private ActorRef rpcManagerActor;
@PostConstruct
@@ -104,10 +101,6 @@ public class DefaultActorService implements ActorService {
appActor = system.actorOf(Props.create(new AppActor.ActorCreator(actorContext)).withDispatcher(APP_DISPATCHER_NAME), "appActor");
actorContext.setAppActor(appActor);
- sessionManagerActor = system.actorOf(Props.create(new SessionManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
- "sessionManagerActor");
- actorContext.setSessionManagerActor(sessionManagerActor);
-
rpcManagerActor = system.actorOf(Props.create(new RpcManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
"rpcManagerActor");
@@ -135,12 +128,6 @@ public class DefaultActorService implements ActorService {
}
@Override
- public void process(SessionAwareMsg msg) {
- log.debug("Processing session aware msg: {}", msg);
- sessionManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
public void onServerAdded(ServerInstance server) {
log.trace("Processing onServerAdded msg: {}", server);
broadcast(new ClusterEventMsg(server.getServerAddress(), true));
@@ -194,7 +181,6 @@ public class DefaultActorService implements ActorService {
private void broadcast(ClusterEventMsg msg) {
this.appActor.tell(msg, ActorRef.noSender());
- this.sessionManagerActor.tell(msg, ActorRef.noSender());
this.rpcManagerActor.tell(msg, ActorRef.noSender());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 1cf8339..3441abc 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -35,14 +35,12 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
protected final TenantId tenantId;
protected final T entityId;
- protected final MsgQueueService queue;
protected ComponentLifecycleState state;
protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
super(systemContext, logger);
this.tenantId = tenantId;
this.entityId = id;
- this.queue = systemContext.getMsgQueueService();
}
public abstract void start(ActorContext context) throws Exception;
@@ -86,18 +84,4 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
}
}
- protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
- EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
- Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable Void result) {
- onSuccess.accept(tbMsg);
- }
-
- @Override
- public void onFailure(Throwable t) {
- logger.debug("Failed to push message [{}] to queue due to [{}]", tbMsg, t);
- }
- });
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
index 3ee1858..570082d 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.transport.msg;
import lombok.Data;
@@ -7,7 +22,6 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import java.io.Serializable;
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 6b82792..aa36b9b 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.transport;
import akka.actor.ActorRef;
@@ -30,6 +45,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@@ -136,6 +152,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
@Override
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
notificationsProducer.send(notificationsTopic + "." + nodeId,
+ new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(),
ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
, new QueueCallbackAdaptor(onSuccess, onFailure));
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
index e6b3dd3..8ca484f 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,6 @@
package org.thingsboard.server.service.transport;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.;
import java.util.function.Consumer;
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
index 5f3c026..bb0f5b6 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index dfd8f98..24758b5 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -91,8 +91,6 @@ public enum MsgType {
DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
- DEVICE_ACTOR_QUEUE_TIMEOUT_MSG,
-
/**
* Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
*/
@@ -101,7 +99,6 @@ public enum MsgType {
/**
* Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
*/
- RULE_ENGINE_QUEUE_PUT_ACK_MSG,
ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
SESSION_TIMEOUT_MSG,
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 90f15a4..86be3a3 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 8f4c095..610a490 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -106,6 +106,10 @@ public class TBKafkaProducerTemplate<T> {
return send(topic, key, value, null, headers, callback);
}
+ public Future<RecordMetadata> send(String topic, String key, T value, Callback callback) {
+ return send(topic, key, value, null, null, callback);
+ }
+
public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
byte[] data = encoder.encode(value);
ProducerRecord<String, byte[]> record;
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index ba08bf8..77ad033 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -160,7 +160,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
SettableFuture<Response> future = SettableFuture.create();
pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
- requestTemplate.send(key, request, headers);
+ requestTemplate.send(key, request, headers, null);
return future;
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 6f9e8a7..4c23ac2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index e7c1734..6bb8eff 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -15,18 +15,38 @@
*/
package org.thingsboard.server.common.transport.adaptor;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSyntaxException;
+import org.thingsboard.server.common.data.kv.AttributeKey;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DoubleDataEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
+import org.thingsboard.server.common.msg.core.BasicAttributesUpdateRequest;
+import org.thingsboard.server.common.msg.core.BasicRequest;
+import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
+import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
+import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import com.google.gson.*;
-import org.thingsboard.server.common.msg.core.*;
-
-import org.thingsboard.server.common.data.kv.*;
-import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
-
public class JsonConverter {
private static final Gson GSON = new Gson();
@@ -44,6 +64,109 @@ public class JsonConverter {
return convertToTelemetry(jsonObject, System.currentTimeMillis(), requestId);
}
+ public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException {
+ long systemTs = System.currentTimeMillis();
+ PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
+ if (jsonObject.isJsonObject()) {
+ parseObject(builder, systemTs, jsonObject);
+ } else if (jsonObject.isJsonArray()) {
+ jsonObject.getAsJsonArray().forEach(je -> {
+ if (je.isJsonObject()) {
+ parseObject(builder, systemTs, je.getAsJsonObject());
+ } else {
+ throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
+ }
+ });
+ } else {
+ throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
+ }
+ return builder.build();
+ }
+
+ public static PostAttributeMsg convertToAttributesProto(JsonElement jsonObject) throws JsonSyntaxException {
+ if (jsonObject.isJsonObject()) {
+ PostAttributeMsg.Builder result = PostAttributeMsg.newBuilder();
+ List<KeyValueProto> keyValueList = parseProtoValues(jsonObject.getAsJsonObject());
+ result.addAllKv(keyValueList);
+ return result.build();
+ } else {
+ throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
+ }
+ }
+
+
+ private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
+ JsonObject jo = jsonObject.getAsJsonObject();
+ if (jo.has("ts") && jo.has("values")) {
+ parseWithTs(builder, jo);
+ } else {
+ parseWithoutTs(builder, systemTs, jo);
+ }
+ }
+
+ private static void parseWithoutTs(PostTelemetryMsg.Builder request, long systemTs, JsonObject jo) {
+ TsKvListProto.Builder builder = TsKvListProto.newBuilder();
+ builder.setTs(systemTs);
+ builder.addAllKv(parseProtoValues(jo));
+ request.addTsKvList(builder.build());
+ }
+
+ public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
+ TsKvListProto.Builder builder = TsKvListProto.newBuilder();
+ builder.setTs(jo.get("ts").getAsLong());
+ builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
+ request.addTsKvList(builder.build());
+ }
+
+ public static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
+ List<KeyValueProto> result = new ArrayList<>();
+ for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
+ JsonElement element = valueEntry.getValue();
+ if (element.isJsonPrimitive()) {
+ JsonPrimitive value = element.getAsJsonPrimitive();
+ if (value.isString()) {
+ result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
+ .setStringV(value.getAsString()).build());
+ } else if (value.isBoolean()) {
+ result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.BOOLEAN_V)
+ .setBoolV(value.getAsBoolean()).build());
+ } else if (value.isNumber()) {
+ if (value.getAsString().contains(".")) {
+ result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.DOUBLE_V)
+ .setDoubleV(value.getAsDouble()).build());
+ } else {
+ try {
+ long longValue = Long.parseLong(value.getAsString());
+ result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.LONG_V)
+ .setLongV(longValue).build());
+ } catch (NumberFormatException e) {
+ throw new JsonSyntaxException("Big integer values are not supported!");
+ }
+ }
+ } else {
+ throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
+ }
+ } else {
+ throw new JsonSyntaxException(CAN_T_PARSE_VALUE + element);
+ }
+ }
+ return result;
+ }
+
+ private static void parseNumericProto(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
+ if (value.getAsString().contains(".")) {
+ result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
+ } else {
+ try {
+ long longValue = Long.parseLong(value.getAsString());
+ result.add(new LongDataEntry(valueEntry.getKey(), longValue));
+ } catch (NumberFormatException e) {
+ throw new JsonSyntaxException("Big integer values are not supported!");
+ }
+ }
+ }
+
+
private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
if (jsonObject.isJsonObject()) {
@@ -140,6 +263,26 @@ public class JsonConverter {
}
}
+ public static JsonObject toJson(GetAttributeResponseMsg payload) {
+ JsonObject result = new JsonObject();
+ if (payload.getClientAttributeListCount() > 0) {
+ JsonObject attrObject = new JsonObject();
+ payload.getClientAttributeListList().forEach(addToObjectFromProto(attrObject));
+ result.add("client", attrObject);
+ }
+ if (payload.getSharedAttributeListCount() > 0) {
+ JsonObject attrObject = new JsonObject();
+ payload.getSharedAttributeListList().forEach(addToObjectFromProto(attrObject));
+ result.add("shared", attrObject);
+ }
+ if (payload.getDeletedAttributeKeysCount() > 0) {
+ JsonArray attrObject = new JsonArray();
+ payload.getDeletedAttributeKeysList().forEach(attrObject::add);
+ result.add("deleted", attrObject);
+ }
+ return result;
+ }
+
public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
JsonObject result = new JsonObject();
if (asMap) {
@@ -166,8 +309,29 @@ public class JsonConverter {
}
private static Consumer<AttributeKey> addToObject(JsonArray result) {
- return key -> {
- result.add(key.getAttributeKey());
+ return key -> result.add(key.getAttributeKey());
+ }
+
+ private static Consumer<TsKvProto> addToObjectFromProto(JsonObject result) {
+ return de -> {
+ JsonPrimitive value;
+ switch (de.getKv().getType()) {
+ case BOOLEAN_V:
+ value = new JsonPrimitive(de.getKv().getBoolV());
+ break;
+ case DOUBLE_V:
+ value = new JsonPrimitive(de.getKv().getDoubleV());
+ break;
+ case LONG_V:
+ value = new JsonPrimitive(de.getKv().getLongV());
+ break;
+ case STRING_V:
+ value = new JsonPrimitive(de.getKv().getStringV());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + de.getKv().getType());
+ }
+ result.add(de.getKv().getKey(), value);
};
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 3068974..b46cfb8 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
index d0e6d18..44fcd2a 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java
index 4cff643..80b2690 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java
@@ -20,8 +20,6 @@ import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
public interface SessionMsgProcessor {
- void process(SessionAwareMsg msg);
-
void onDeviceAdded(Device device);
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java
index 080f874..86067d5 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java
@@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Optional;
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index dc2e306..c3817a9 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
/**
* Created by ashvayka on 04.10.18.
@@ -40,6 +41,8 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+ void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
+
void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void deregisterSession(SessionInfoProto sessionInfo);
common/transport/src/main/proto/transport.proto 36(+22 -14)
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index f3b3ae8..d32bac9 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -23,12 +23,13 @@ option java_outer_classname = "TransportProtos";
* Data Structures;
*/
message SessionInfoProto {
- int64 sessionIdMSB = 1;
- int64 sessionIdLSB = 2;
- int64 tenantIdMSB = 3;
- int64 tenantIdLSB = 4;
- int64 deviceIdMSB = 5;
- int64 deviceIdLSB = 6;
+ string nodeId = 1;
+ int64 sessionIdMSB = 2;
+ int64 sessionIdLSB = 3;
+ int64 tenantIdMSB = 4;
+ int64 tenantIdLSB = 5;
+ int64 deviceIdMSB = 6;
+ int64 deviceIdLSB = 7;
}
enum SessionEvent {
@@ -57,6 +58,11 @@ message KeyValueProto {
string string_v = 6;
}
+message TsKvProto {
+ int64 ts = 1;
+ KeyValueProto kv = 2;
+}
+
message TsKvListProto {
int64 ts = 1;
repeated KeyValueProto kv = 2;
@@ -76,9 +82,8 @@ message DeviceInfoProto {
* Messages that use Data Structures;
*/
message SessionEventMsg {
- string nodeId = 1;
- SessionType sessionType = 2;
- SessionEvent event = 3;
+ SessionType sessionType = 1;
+ SessionEvent event = 2;
}
message PostTelemetryMsg {
@@ -90,14 +95,17 @@ message PostAttributeMsg {
}
message GetAttributeRequestMsg {
- repeated string clientAttributeNames = 1;
- repeated string sharedAttributeNames = 2;
+ int32 requestId = 1;
+ repeated string clientAttributeNames = 2;
+ repeated string sharedAttributeNames = 3;
}
message GetAttributeResponseMsg {
- repeated TsKvListProto clientAttributeList = 1;
- repeated TsKvListProto sharedAttributeList = 2;
- repeated string deletedAttributeKeys = 3;
+ int32 requestId = 1;
+ repeated TsKvProto clientAttributeList = 2;
+ repeated TsKvProto sharedAttributeList = 3;
+ repeated string deletedAttributeKeys = 4;
+ string error = 5;
}
message ValidateDeviceTokenRequestMsg {
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index e2fd7cd..ec69cec 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -106,30 +106,30 @@ public class CoapServerTest {
public static SessionMsgProcessor sessionMsgProcessor() {
return new SessionMsgProcessor() {
- @Override
- public void process(SessionAwareMsg toActorMsg) {
- if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
- AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
- try {
- FromDeviceMsg deviceMsg = sessionMsg.getMsg();
- ToDeviceMsg toDeviceMsg = null;
- if (deviceMsg.getMsgType() == SessionMsgType.POST_TELEMETRY_REQUEST) {
- toDeviceMsg = BasicStatusCodeResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID);
- } else if (deviceMsg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {
- List<AttributeKvEntry> data = new ArrayList<>();
- data.add(new BaseAttributeKvEntry(new StringDataEntry("key1", "value1"), System.currentTimeMillis()));
- data.add(new BaseAttributeKvEntry(new LongDataEntry("key2", 42L), System.currentTimeMillis()));
- BasicAttributeKVMsg kv = BasicAttributeKVMsg.fromClient(data);
- toDeviceMsg = BasicGetAttributesResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID, kv);
- }
- if (toDeviceMsg != null) {
- sessionMsg.getSessionContext().onMsg(new BasicSessionActorToAdaptorMsg(sessionMsg.getSessionContext(), toDeviceMsg));
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
+// @Override
+// public void process(SessionAwareMsg toActorMsg) {
+// if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
+// AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
+// try {
+// FromDeviceMsg deviceMsg = sessionMsg.getMsg();
+// ToDeviceMsg toDeviceMsg = null;
+// if (deviceMsg.getMsgType() == SessionMsgType.POST_TELEMETRY_REQUEST) {
+// toDeviceMsg = BasicStatusCodeResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID);
+// } else if (deviceMsg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {
+// List<AttributeKvEntry> data = new ArrayList<>();
+// data.add(new BaseAttributeKvEntry(new StringDataEntry("key1", "value1"), System.currentTimeMillis()));
+// data.add(new BaseAttributeKvEntry(new LongDataEntry("key2", 42L), System.currentTimeMillis()));
+// BasicAttributeKVMsg kv = BasicAttributeKVMsg.fromClient(data);
+// toDeviceMsg = BasicGetAttributesResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID, kv);
+// }
+// if (toDeviceMsg != null) {
+// sessionMsg.getSessionContext().onMsg(new BasicSessionActorToAdaptorMsg(sessionMsg.getSessionContext(), toDeviceMsg));
+// }
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// }
+// }
@Override
public void onDeviceAdded(Device device) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index c8baaf7..ee41d2c 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.adaptors;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import io.netty.buffer.ByteBuf;
@@ -25,12 +26,14 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTopics;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@@ -53,6 +56,64 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
@Override
+ public TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ String payload = validatePayload(ctx.getSessionId(), inbound.payload());
+ try {
+ return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
+ } catch (IllegalStateException | JsonSyntaxException ex) {
+ throw new AdaptorException(ex);
+ }
+ }
+
+ @Override
+ public TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ String payload = validatePayload(ctx.getSessionId(), inbound.payload());
+ try {
+ return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
+ } catch (IllegalStateException | JsonSyntaxException ex) {
+ throw new AdaptorException(ex);
+ }
+ }
+
+ @Override
+ public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ String topicName = inbound.variableHeader().topicName();
+ try {
+ TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
+ result.setRequestId(Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())));
+ String payload = inbound.payload().toString(UTF8);
+ JsonElement requestBody = new JsonParser().parse(payload);
+ Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
+ Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
+ if (clientKeys != null) {
+ result.addAllClientAttributeNames(clientKeys);
+ }
+ if (sharedKeys != null) {
+ result.addAllSharedAttributeNames(sharedKeys);
+ }
+ return result.build();
+ } catch (RuntimeException e) {
+ log.warn("Failed to decode get attributes request", e);
+ throw new AdaptorException(e);
+ }
+ }
+
+ @Override
+ public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
+ if (!StringUtils.isEmpty(responseMsg.getError())) {
+ throw new AdaptorException(responseMsg.getError());
+ } else {
+ Integer requestId = responseMsg.getRequestId();
+ if (requestId >= 0) {
+ return Optional.of(createMqttPublishMsg(ctx,
+ MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
+ JsonConverter.toJson(responseMsg)));
+ }
+ return Optional.empty();
+ }
+ }
+
+ @Override
public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {
FromDeviceMsg msg;
switch (type) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index bf83a1f..54e7c5c 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -16,11 +16,24 @@
package org.thingsboard.server.transport.mqtt.adaptors;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.thingsboard.server.common.transport.TransportAdaptor;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
+import java.util.Optional;
+
/**
* @author Andrew Shvayka
*/
public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
+
+ TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+
+ TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+
+ TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+
+ Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index d8f1d10..c64ab5d 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,18 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
@@ -54,6 +61,7 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -65,14 +73,16 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUS
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
/**
* @author Andrew Shvayka
*/
@Slf4j
-public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
+public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
@@ -84,8 +94,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final SslHandler sslHandler;
private final ConcurrentMap<String, Integer> mqttQoSMap;
- private final SessionInfoProto sessionInfo;
-
+ private volatile SessionInfoProto sessionInfo;
private volatile InetSocketAddress address;
private volatile DeviceSessionCtx deviceSessionCtx;
private volatile GatewaySessionCtx gatewaySessionCtx;
@@ -98,11 +107,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.quotaService = context.getQuotaService();
this.sslHandler = context.getSslHandler();
this.mqttQoSMap = new ConcurrentHashMap<>();
- this.sessionInfo = SessionInfoProto.newBuilder()
- .setNodeId(context.getNodeId())
- .setSessionIdMSB(sessionId.getMostSignificantBits())
- .setSessionIdLSB(sessionId.getLeastSignificantBits())
- .build();
this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap);
}
@@ -135,15 +139,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
-// case PUBLISH:
-// processPublish(ctx, (MqttPublishMessage) msg);
-// break;
-// case SUBSCRIBE:
-// processSubscribe(ctx, (MqttSubscribeMessage) msg);
-// break;
-// case UNSUBSCRIBE:
-// processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
-// break;
+ case PUBLISH:
+ processPublish(ctx, (MqttPublishMessage) msg);
+ break;
+ case SUBSCRIBE:
+ processSubscribe(ctx, (MqttSubscribeMessage) msg);
+ break;
+ case UNSUBSCRIBE:
+ processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
+ break;
// case PINGREQ:
// if (checkConnected(ctx)) {
// ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
@@ -160,24 +164,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
-// private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
-// if (!checkConnected(ctx)) {
-// return;
-// }
-// String topicName = mqttMsg.variableHeader().topicName();
-// int msgId = mqttMsg.variableHeader().packetId();
-// log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
-//
-// if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
-// if (gatewaySessionCtx != null) {
-// gatewaySessionCtx.setChannel(ctx);
+ private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
+ if (!checkConnected(ctx)) {
+ return;
+ }
+ String topicName = mqttMsg.variableHeader().topicName();
+ int msgId = mqttMsg.variableHeader().packetId();
+ log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+
+ if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
+ if (gatewaySessionCtx != null) {
+ gatewaySessionCtx.setChannel(ctx);
// handleMqttPublishMsg(topicName, msgId, mqttMsg);
-// }
-// } else {
-// processDevicePublish(ctx, mqttMsg, topicName, msgId);
-// }
-// }
-//
+ }
+ } else {
+ processDevicePublish(ctx, mqttMsg, topicName, msgId);
+ }
+ }
+
+ //
// private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
// try {
// switch (topicName) {
@@ -205,7 +210,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// }
// }
//
-// private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
+ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
+ try {
+ if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
+ TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
+ transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
+ } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) {
+ TransportProtos.PostAttributeMsg postAttributeMsg = adaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
+ transportService.process(sessionInfo, postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
+ } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
+ TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
+ transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
+ }
+ } catch (AdaptorException e) {
+ log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+ log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
+ ctx.close();
+ }
// AdaptorToSessionActorMsg msg = null;
// try {
// if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
@@ -237,20 +258,38 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
// ctx.close();
// }
-// }
-//
-// private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
-// if (!checkConnected(ctx)) {
-// return;
-// }
-// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
-// List<Integer> grantedQoSList = new ArrayList<>();
-// for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
-// String topic = subscription.topicName();
-// MqttQoS reqQoS = subscription.qualityOfService();
-// try {
-// switch (topic) {
-// case DEVICE_ATTRIBUTES_TOPIC: {
+ }
+
+ private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
+ return new TransportServiceCallback<Void>() {
+ @Override
+ public void onSuccess(Void dummy) {
+ log.trace("[{}] Published msg: {}", sessionId, msg);
+ if (msgId > 0) {
+ ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
+ ctx.close();
+ }
+ };
+ }
+
+ private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
+ if (!checkConnected(ctx)) {
+ return;
+ }
+ log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+ List<Integer> grantedQoSList = new ArrayList<>();
+ for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
+ String topic = subscription.topicName();
+ MqttQoS reqQoS = subscription.qualityOfService();
+ try {
+ switch (topic) {
+// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
// registerSubQoS(topic, grantedQoSList, reqQoS);
@@ -267,37 +306,37 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// case GATEWAY_RPC_TOPIC:
// registerSubQoS(topic, grantedQoSList, reqQoS);
// break;
-// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-// deviceSessionCtx.setAllowAttributeResponses();
-// registerSubQoS(topic, grantedQoSList, reqQoS);
-// break;
-// default:
-// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
-// grantedQoSList.add(FAILURE.value());
-// break;
-// }
-// } catch (AdaptorException e) {
-// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
-// grantedQoSList.add(FAILURE.value());
-// }
-// }
-// ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
-// }
-//
-// private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
-// grantedQoSList.add(getMinSupportedQos(reqQoS));
-// mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
-// }
-//
-// private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
-// if (!checkConnected(ctx)) {
-// return;
-// }
-// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
-// for (String topicName : mqttMsg.payload().topics()) {
-// mqttQoSMap.remove(topicName);
-// try {
-// switch (topicName) {
+ case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+ deviceSessionCtx.setAllowAttributeResponses();
+ registerSubQoS(topic, grantedQoSList, reqQoS);
+ break;
+ default:
+ log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+ grantedQoSList.add(FAILURE.value());
+ break;
+ }
+ } catch (Exception e) {
+ log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+ grantedQoSList.add(FAILURE.value());
+ }
+ }
+ ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
+ }
+
+ private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
+ grantedQoSList.add(getMinSupportedQos(reqQoS));
+ mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
+ }
+
+ private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
+ if (!checkConnected(ctx)) {
+ return;
+ }
+ log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+ for (String topicName : mqttMsg.payload().topics()) {
+ mqttQoSMap.remove(topicName);
+ try {
+ switch (topicName) {
// case DEVICE_ATTRIBUTES_TOPIC: {
// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
@@ -308,23 +347,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
// break;
// }
-// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-// deviceSessionCtx.setDisallowAttributeResponses();
-// break;
-// }
-// } catch (AdaptorException e) {
-// log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
-// }
-// }
-// ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
-// }
-//
-// private MqttMessage createUnSubAckMessage(int msgId) {
-// MqttFixedHeader mqttFixedHeader =
-// new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
-// MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
-// return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
-// }
+ case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+ deviceSessionCtx.setDisallowAttributeResponses();
+ break;
+ }
+ } catch (Exception e) {
+ log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
+ }
+ }
+ ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
+ }
+
+ private MqttMessage createUnSubAckMessage(int msgId) {
+ MqttFixedHeader mqttFixedHeader =
+ new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
+ MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
+ return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
+ }
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
@@ -346,15 +385,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
- if (!msg.hasDeviceInfo()) {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
- ctx.close();
- } else {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
- deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
- transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
- checkGatewaySession();
- }
+ onValidateDeviceResponse(msg, ctx);
}
@Override
@@ -375,15 +406,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
- if (!msg.hasDeviceInfo()) {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
- ctx.close();
- } else {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
- deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
- transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
- checkGatewaySession();
- }
+ onValidateDeviceResponse(msg, ctx);
}
@Override
@@ -415,7 +438,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close();
if (deviceSessionCtx.isConnected()) {
- transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.deregisterSession(sessionInfo);
if (gatewaySessionCtx != null) {
gatewaySessionCtx.onGatewayDisconnect();
}
@@ -488,16 +512,46 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private SessionEventMsg getSessionEventMsg(SessionEvent event) {
return SessionEventMsg.newBuilder()
- .setSessionInfo(sessionInfo)
- .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
- .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
+ .setSessionType(TransportProtos.SessionType.ASYNC)
.setEvent(event).build();
}
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (deviceSessionCtx.isConnected()) {
- transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.deregisterSession(sessionInfo);
+ }
+ }
+
+ private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
+ if (!msg.hasDeviceInfo()) {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+ ctx.close();
+ } else {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
+ sessionInfo = SessionInfoProto.newBuilder()
+ .setNodeId(context.getNodeId())
+ .setSessionIdMSB(sessionId.getMostSignificantBits())
+ .setSessionIdLSB(sessionId.getLeastSignificantBits())
+ .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
+ .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
+ .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
+ .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
+ .build();
+ transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
+ transportService.registerSession(sessionInfo, this);
+ checkGatewaySession();
+ }
+ }
+
+ @Override
+ public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
+ try {
+ adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+ } catch (Exception e) {
+ log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e);
}
}
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index 6661cb1..d704634 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
@@ -41,12 +42,13 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private final MqttSessionId sessionId;
+ @Getter
private ChannelHandlerContext channel;
private volatile boolean allowAttributeResponses;
private AtomicInteger msgIdSeq = new AtomicInteger(0);
public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
- super(null, null, null);
+ super(null, null, mqttQoSMap);
this.sessionId = new MqttSessionId();
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 76c1c1b..7a2f698 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -111,8 +111,8 @@ public class MqttTransportService implements TransportService {
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
- responseBuilder.clientId(transportContext.getNodeId());
- responseBuilder.groupId(null);
+ responseBuilder.clientId("transport-api-client-" + transportContext.getNodeId());
+ responseBuilder.groupId("transport-api-client");
responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new TransportApiResponseDecoder());
@@ -137,8 +137,8 @@ public class MqttTransportService implements TransportService {
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
mainConsumerBuilder.settings(kafkaSettings);
mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId());
- mainConsumerBuilder.clientId(transportContext.getNodeId());
- mainConsumerBuilder.groupId(null);
+ mainConsumerBuilder.clientId("transport-" + transportContext.getNodeId());
+ mainConsumerBuilder.groupId("transport");
mainConsumerBuilder.autoCommit(true);
mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
@@ -243,6 +243,15 @@ public class MqttTransportService implements TransportService {
}
@Override
+ public void process(SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setGetAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+
+ @Override
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
@@ -271,9 +280,13 @@ public class MqttTransportService implements TransportService {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
- callback.onSuccess(null);
+ if (callback != null) {
+ callback.onSuccess(null);
+ }
} else {
- callback.onError(exception);
+ if (callback != null) {
+ callback.onError(exception);
+ }
}
}
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
index ee929ce..179a2a1 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.