thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 220(+123 -97)
application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java 122(+0 -122)
application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java 180(+0 -180)
application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java 36(+36 -0)
application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java 187(+187 -0)
application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java 32(+32 -0)
application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java 17(+11 -6)
application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java 29(+29 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java 107(+0 -107)
common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java 35(+21 -14)
common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java 26(+26 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 34(+22 -12)
common/transport/src/main/proto/transport.proto 80(+57 -23)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 67(+15 -52)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java 168(+159 -9)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java 29(+29 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 8930f84..8ff61ac 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -31,6 +31,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.server.actors.service.ActorService;
@@ -69,6 +70,7 @@ import org.thingsboard.server.service.script.JsExecutorService;
import org.thingsboard.server.service.script.JsInvokeService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
+import org.thingsboard.server.service.transport.RuleEngineTransportService;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -204,6 +206,11 @@ public class ActorSystemContext {
@Getter
private DeviceStateService deviceStateService;
+ @Lazy
+ @Autowired
+ @Getter
+ private RuleEngineTransportService ruleEngineTransportService;
+
@Value("${cluster.partition_id}")
@Getter
private long queuePartitionId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 6a78f78..14ca586 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -39,7 +39,6 @@ import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.model.ModelConstants;
@@ -105,7 +104,7 @@ public class AppActor extends RuleChainManagerActor {
case SERVICE_TO_RULE_ENGINE_MSG:
onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
break;
- case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
+ case TRANSPORT_TO_DEVICE_ACTOR_MSG:
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
@@ -169,16 +168,6 @@ public class AppActor extends RuleChainManagerActor {
getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender());
}
- private void processDeviceMsg(DeviceToDeviceActorMsg deviceToDeviceActorMsg) {
- TenantId tenantId = deviceToDeviceActorMsg.getTenantId();
- ActorRef tenantActor = getOrCreateTenantActor(tenantId);
- if (deviceToDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) {
-// tenantActor.tell(new RuleChainDeviceMsg(deviceToDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
- } else {
- tenantActor.tell(deviceToDeviceActorMsg, context().self());
- }
- }
-
private ActorRef getOrCreateTenantActor(TenantId tenantId) {
return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
.withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()));
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 99d0045..7fabe49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -26,12 +26,12 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
public class DeviceActor extends ContextAwareActor {
@@ -50,8 +50,8 @@ public class DeviceActor extends ContextAwareActor {
case CLUSTER_EVENT_MSG:
processor.processClusterEventMsg((ClusterEventMsg) msg);
break;
- case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
- processor.process(context(), (DeviceToDeviceActorMsg) msg);
+ case TRANSPORT_TO_DEVICE_ACTOR_MSG:
+ processor.process(context(), (TransportToDeviceActorMsgWrapper) msg);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index a2ea048..1313f60 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -61,7 +61,6 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@@ -71,9 +70,11 @@ import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -92,6 +93,8 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+
/**
* @author Andrew Shvayka
*/
@@ -99,12 +102,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private final TenantId tenantId;
private final DeviceId deviceId;
- private final Map<SessionId, SessionInfo> sessions;
- private final Map<SessionId, SessionInfo> attributeSubscriptions;
- private final Map<SessionId, SessionInfo> rpcSubscriptions;
+ private final Map<UUID, SessionInfo> sessions;
+ private final Map<UUID, SessionInfo> attributeSubscriptions;
+ private final Map<UUID, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
- private final Map<UUID, PendingSessionMsgData> pendingMsgs;
private final Gson gson = new Gson();
private final JsonParser jsonParser = new JsonParser();
@@ -123,7 +125,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.rpcSubscriptions = new HashMap<>();
this.toDeviceRpcPendingMap = new HashMap<>();
this.toServerRpcPendingMap = new HashMap<>();
- this.pendingMsgs = new HashMap<>();
initAttributes();
}
@@ -154,11 +155,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
boolean sent = rpcSubscriptions.size() > 0;
Set<SessionId> syncSessionSet = new HashSet<>();
rpcSubscriptions.entrySet().forEach(sub -> {
- ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
- sendMsgToSessionActor(response, sub.getValue().getServer());
- if (SessionType.SYNC == sub.getValue().getType()) {
- syncSessionSet.add(sub.getKey());
- }
+// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
+// sendMsgToSessionActor(response, sub.getValue().getServer());
+// if (SessionType.SYNC == sub.getValue().getType()) {
+// syncSessionSet.add(sub.getKey());
+// }
});
syncSessionSet.forEach(rpcSubscriptions::remove);
@@ -191,15 +192,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- void processQueueTimeout(ActorContext context, DeviceActorQueueTimeoutMsg msg) {
- PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
- if (data != null) {
- logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
- ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
- }
- }
-
void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
if (data != null && data.isReplyOnQueueAck()) {
@@ -252,31 +244,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
};
}
- void process(ActorContext context, DeviceToDeviceActorMsg msg) {
- processSubscriptionCommands(context, msg);
- processRpcResponses(context, msg);
- processSessionStateMsgs(msg);
+ void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
+ TransportToDeviceActorMsg msg = wrapper.getMsg();
+// processSubscriptionCommands(context, msg);
+// processRpcResponses(context, msg);
+ if (msg.hasSessionEvent()) {
+ processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
+ }
- SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
- if (sessionMsgType.requiresRulesProcessing()) {
- switch (sessionMsgType) {
- case GET_ATTRIBUTES_REQUEST:
- handleGetAttributesRequest(msg);
- break;
- case POST_ATTRIBUTES_REQUEST:
- handlePostAttributesRequest(context, msg);
- reportActivity();
- break;
- case POST_TELEMETRY_REQUEST:
- handlePostTelemetryRequest(context, msg);
- reportActivity();
- break;
- case TO_SERVER_RPC_REQUEST:
- handleClientSideRPCRequest(context, msg);
- reportActivity();
- break;
- }
+ if (msg.hasPostAttributes()) {
+ handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
+ reportActivity();
}
+ if (msg.hasPostTelemetry()) {
+ handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
+ reportActivity();
+ }
+ if (msg.hasGetAttributes()) {
+ handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
+ }
+// SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
+// if (sessionMsgType.requiresRulesProcessing()) {
+// switch (sessionMsgType) {
+// case GET_ATTRIBUTES_REQUEST:
+// handleGetAttributesRequest(msg);
+// break;
+// case TO_SERVER_RPC_REQUEST:
+// handleClientSideRPCRequest(context, msg);
+// reportActivity();
+// break;
+// }
+// }
}
private void reportActivity() {
@@ -291,6 +289,39 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
}
+ private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
+ ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
+ ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
+
+ Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
+ @Override
+ public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
+ systemContext.getRuleEngineTransportService().process();
+ BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
+ request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof Exception) {
+ ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
+ } else {
+ logger.error("[{}] Failed to process attributes request", deviceId, t);
+ }
+ }
+ });
+ }
+
+ private Optional<Set<String>> toOptionalSet(List<String> strings) {
+ if (strings == null || strings.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new HashSet<>(strings));
+ }
+ }
+
private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
@@ -328,43 +359,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void handlePostAttributesRequest(ActorContext context, DeviceToDeviceActorMsg src) {
- AttributesUpdateRequest request = (AttributesUpdateRequest) src.getPayload();
-
- JsonObject json = new JsonObject();
- for (AttributeKvEntry kv : request.getAttributes()) {
- kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- }
-
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
- PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
- SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
- pushToRuleEngineWithTimeout(context, tbMsg, msgData);
+ private void handlePostAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, PostAttributeMsg postAttributes) {
+ JsonObject json = getJsonObject(postAttributes.getKvList());
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(),
+ TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ pushToRuleEngine(context, tbMsg);
}
- private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
- TelemetryUploadRequest request = (TelemetryUploadRequest) src.getPayload();
-
- Map<Long, List<KvEntry>> tsData = request.getData();
-
- PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
- SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size());
-
- for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
- JsonObject json = new JsonObject();
- for (KvEntry kv : entry.getValue()) {
- kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
- }
+ private void handlePostTelemetryRequest(ActorContext context, SessionInfoProto sessionInfo, PostTelemetryMsg postTelemetry) {
+ for (TsKvListProto tsKv : postTelemetry.getTsKvListList()) {
+ JsonObject json = getJsonObject(tsKv.getKvList());
TbMsgMetaData metaData = defaultMetaData.copy();
- metaData.putValue("ts", entry.getKey() + "");
+ metaData.putValue("ts", tsKv.getTs() + "");
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
- pushToRuleEngineWithTimeout(context, tbMsg, msgData);
+ pushToRuleEngine(context, tbMsg);
}
}
@@ -401,16 +409,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) {
- SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType();
- int requestId = pendingMsgData.getRequestId();
- if (systemContext.isQueuePersistenceEnabled()) {
- pendingMsgs.put(tbMsg.getId(), pendingMsgData);
- scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
- } else {
- ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
- sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
- }
+ private void pushToRuleEngine(ActorContext context, TbMsg tbMsg) {
context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
}
@@ -497,13 +496,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void processSessionStateMsgs(DeviceToDeviceActorMsg msg) {
- SessionId sessionId = msg.getSessionId();
- FromDeviceMsg inMsg = msg.getPayload();
- if (inMsg instanceof SessionOpenMsg) {
+ private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
+ UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ if (msg.getEvent() == SessionEvent.OPEN) {
logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
- SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
+ UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
if (sessionIdToRemove != null) {
closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
}
@@ -512,6 +510,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (sessions.size() == 1) {
reportSessionOpen();
}
+ }
+ FromDeviceMsg inMsg = msg.getPayload();
+ if (inMsg instanceof SessionOpenMsg) {
+ logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
} else if (inMsg instanceof SessionCloseMsg) {
logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
@@ -540,8 +542,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
rpcSubscriptions.clear();
}
- private void closeSession(SessionId sessionId, SessionInfo sessionInfo) {
- sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer());
+ private void closeSession(UUID sessionId, SessionInfo sessionInfo) {
+ DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+ .setSessionIdMSB(sessionId.getMostSignificantBits())
+ .setSessionIdLSB(sessionId.getLeastSignificantBits())
+ .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
+ systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
}
void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
@@ -552,4 +558,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.defaultMetaData.putValue("deviceType", deviceType);
}
+ private JsonObject getJsonObject(List<KeyValueProto> tsKv) {
+ JsonObject json = new JsonObject();
+ for (KeyValueProto kv : tsKv) {
+ switch (kv.getType()) {
+ case BOOLEAN_V:
+ json.addProperty(kv.getKey(), kv.getBoolV());
+ break;
+ case LONG_V:
+ json.addProperty(kv.getKey(), kv.getLongV());
+ break;
+ case DOUBLE_V:
+ json.addProperty(kv.getKey(), kv.getDoubleV());
+ break;
+ case STRING_V:
+ json.addProperty(kv.getKey(), kv.getStringV());
+ break;
+ }
+ }
+ return json;
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
index 23ad966..dfa07cf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import java.util.Optional;
+import java.util.UUID;
/**
* Created by ashvayka on 17.04.18.
@@ -30,7 +31,7 @@ import java.util.Optional;
@AllArgsConstructor
public final class PendingSessionMsgData {
- private final SessionId sessionId;
+ private final UUID sessionId;
private final Optional<ServerAddress> serverAddress;
private final SessionMsgType sessionMsgType;
private final int requestId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 04c457c..9faaade 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,10 +16,7 @@
package org.thingsboard.server.actors.device;
import lombok.Data;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.session.SessionType;
-
-import java.util.Optional;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
/**
* @author Andrew Shvayka
@@ -27,5 +24,6 @@ import java.util.Optional;
@Data
public class SessionInfo {
private final SessionType type;
- private final Optional<ServerAddress> server;
+ private final String nodeId;
+
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 460b64c..347483a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -87,7 +87,7 @@ public class TenantActor extends RuleChainManagerActor {
case DEVICE_ACTOR_TO_RULE_ENGINE_MSG:
onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
break;
- case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
+ case TRANSPORT_TO_DEVICE_ACTOR_MSG:
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
new file mode 100644
index 0000000..3ee1858
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
@@ -0,0 +1,36 @@
+package org.thingsboard.server.service.transport.msg;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
+import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 09.10.18.
+ */
+@Data
+public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable {
+
+ private final TenantId tenantId;
+ private final DeviceId deviceId;
+ private final TransportToDeviceActorMsg msg;
+
+ public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg) {
+ this.msg = msg;
+ this.tenantId = new TenantId(new UUID(msg.getSessionInfo().getTenantIdMSB(), msg.getSessionInfo().getTenantIdLSB()));
+ this.deviceId = new DeviceId(new UUID(msg.getSessionInfo().getDeviceIdMSB(), msg.getSessionInfo().getDeviceIdLSB()));
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
new file mode 100644
index 0000000..6b82792
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -0,0 +1,187 @@
+package org.thingsboard.server.service.transport;
+
+import akka.actor.ActorRef;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 09.10.18.
+ */
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
+public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Value("${transport.remote.rule_engine.topic}")
+ private String ruleEngineTopic;
+ @Value("${transport.remote.notifications.topic}")
+ private String notificationsTopic;
+ @Value("${transport.remote.rule_engine.poll_interval}")
+ private int pollDuration;
+ @Value("${transport.remote.rule_engine.auto_commit_interval}")
+ private int autoCommitInterval;
+
+ @Autowired
+ private TbKafkaSettings kafkaSettings;
+
+ @Autowired
+ private DiscoveryService discoveryService;
+
+ @Autowired
+ private ActorSystemContext actorContext;
+
+ @Autowired
+ private ActorService actorService;
+
+ //TODO: completely replace this routing with the Kafka routing by partition ids.
+ @Autowired
+ private ClusterRoutingService routingService;
+ @Autowired
+ private ClusterRpcService rpcService;
+ @Autowired
+ private DataDecodingEncodingService encodingService;
+
+ private TBKafkaConsumerTemplate<ToRuleEngineMsg> ruleEngineConsumer;
+ private TBKafkaProducerTemplate<ToTransportMsg> notificationsProducer;
+
+ private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
+
+ private volatile boolean stopped = false;
+
+ @PostConstruct
+ public void init() {
+ TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
+ notificationsProducerBuilder.settings(kafkaSettings);
+ notificationsProducerBuilder.defaultTopic(notificationsTopic);
+ notificationsProducerBuilder.encoder(new ToTransportMsgEncoder());
+
+ notificationsProducer = notificationsProducerBuilder.build();
+ notificationsProducer.init();
+
+ TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToRuleEngineMsg> ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder();
+ ruleEngineConsumerBuilder.settings(kafkaSettings);
+ ruleEngineConsumerBuilder.topic(ruleEngineTopic);
+ ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId());
+ ruleEngineConsumerBuilder.groupId("tb-node");
+ ruleEngineConsumerBuilder.autoCommit(true);
+ ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
+ ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder());
+
+ ruleEngineConsumer = ruleEngineConsumerBuilder.build();
+ ruleEngineConsumer.subscribe();
+
+ mainConsumerExecutor.execute(() -> {
+ while (!stopped) {
+ try {
+ ConsumerRecords<String, byte[]> records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration));
+ records.forEach(record -> {
+ try {
+ ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
+ if (toRuleEngineMsg.hasToDeviceActorMsg()) {
+ forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to process the notification.", e);
+ }
+ });
+ } catch (Exception e) {
+ log.warn("Failed to obtain messages from queue.", e);
+ try {
+ Thread.sleep(pollDuration);
+ } catch (InterruptedException e2) {
+ log.trace("Failed to wait until the server has capacity to handle new requests", e2);
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void process(String nodeId, DeviceActorToTransportMsg msg) {
+ process(nodeId, msg, null, null);
+ }
+
+ @Override
+ public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
+ notificationsProducer.send(notificationsTopic + "." + nodeId,
+ ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
+ , new QueueCallbackAdaptor(onSuccess, onFailure));
+ }
+
+ private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
+ TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
+ Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
+ if (address.isPresent()) {
+ rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
+ } else {
+ actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
+ }
+ }
+
+ @PreDestroy
+ public void destroy() {
+ stopped = true;
+ if (ruleEngineConsumer != null) {
+ ruleEngineConsumer.unsubscribe();
+ }
+ if (mainConsumerExecutor != null) {
+ mainConsumerExecutor.shutdownNow();
+ }
+ }
+
+ private static class QueueCallbackAdaptor implements Callback {
+ private final Runnable onSuccess;
+ private final Consumer<Throwable> onFailure;
+
+ QueueCallbackAdaptor(Runnable onSuccess, Consumer<Throwable> onFailure) {
+ this.onSuccess = onSuccess;
+ this.onFailure = onFailure;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception == null) {
+ if (onSuccess != null) {
+ onSuccess.run();
+ }
+ } else {
+ if (onFailure != null) {
+ onFailure.accept(exception);
+ }
+ }
+ }
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
new file mode 100644
index 0000000..e6b3dd3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public interface RuleEngineTransportService {
+
+ void process(String nodeId, DeviceActorToTransportMsg msg);
+
+ void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure);
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
new file mode 100644
index 0000000..5f3c026
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class ToTransportMsgEncoder implements TbKafkaEncoder<ToTransportMsg> {
+ @Override
+ public byte[] encode(ToTransportMsg value) {
+ return value.toByteArray();
+ }
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index c58f3a1..923976c 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -462,4 +462,8 @@ transport:
request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}"
rule_engine:
- topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
\ No newline at end of file
+ topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
+ poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}"
+ auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
+ notifications:
+ topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
\ No newline at end of file
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 60e5469..dfd8f98 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -77,11 +77,6 @@ public enum MsgType {
*/
RULE_TO_SELF_MSG,
- /**
- * Message that is sent by Session Actor to Device Actor. Represents messages from the device itself.
- */
- DEVICE_SESSION_TO_DEVICE_ACTOR_MSG,
-
DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG,
DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG,
@@ -111,6 +106,12 @@ public enum MsgType {
TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
SESSION_TIMEOUT_MSG,
SESSION_CTRL_MSG,
- STATS_PERSIST_TICK_MSG;
+ STATS_PERSIST_TICK_MSG,
+
+
+ /**
+ * Message that is sent by TransportRuleEngineService to Device Actor. Represents messages from the device itself.
+ */
+ TRANSPORT_TO_DEVICE_ACTOR_MSG;
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 3adb1c3..90f15a4 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -49,7 +49,9 @@ public class TBKafkaConsumerTemplate<T> {
boolean autoCommit, int autoCommitIntervalMs) {
Properties props = settings.toProps();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ if (groupId != null) {
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ }
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 1e109d2..8f4c095 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -89,28 +90,28 @@ public class TBKafkaProducerTemplate<T> {
}
}
- public Future<RecordMetadata> send(String key, T value) {
- return send(key, value, null, null);
+ public Future<RecordMetadata> send(String key, T value, Callback callback) {
+ return send(key, value, null, callback);
}
- public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers) {
- return send(key, value, null, headers);
+ public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers, Callback callback) {
+ return send(key, value, null, headers, callback);
}
- public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers) {
- return send(this.defaultTopic, key, value, timestamp, headers);
+ public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
+ return send(this.defaultTopic, key, value, timestamp, headers, callback);
}
- public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers) {
- return send(topic, key, value, null, headers);
+ public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers, Callback callback) {
+ return send(topic, key, value, null, headers, callback);
}
- public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
+ public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
byte[] data = encoder.encode(value);
ProducerRecord<String, byte[]> record;
Integer partition = getPartition(topic, key, value, data);
record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers);
- return producer.send(record);
+ return producer.send(record, callback);
}
private Integer getPartition(String topic, String key, T value, byte[] data) {
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 0dbf45d..6f9e8a7 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -77,55 +77,64 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
requestTemplate.subscribe();
loopExecutor.submit(() -> {
while (!stopped) {
- while (pendingRequestCount.get() >= maxPendingRequests) {
+ try {
+ while (pendingRequestCount.get() >= maxPendingRequests) {
+ try {
+ Thread.sleep(pollInterval);
+ } catch (InterruptedException e) {
+ log.trace("Failed to wait until the server has capacity to handle new requests", e);
+ }
+ }
+ ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
+ requests.forEach(request -> {
+ Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+ if (requestIdHeader == null) {
+ log.error("[{}] Missing requestId in header", request);
+ return;
+ }
+ UUID requestId = bytesToUuid(requestIdHeader.value());
+ if (requestId == null) {
+ log.error("[{}] Missing requestId in header and body", request);
+ return;
+ }
+ Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
+ if (responseTopicHeader == null) {
+ log.error("[{}] Missing response topic in header", request);
+ return;
+ }
+ String responseTopic = bytesToString(responseTopicHeader.value());
+ try {
+ pendingRequestCount.getAndIncrement();
+ Request decodedRequest = requestTemplate.decode(request);
+ AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
+ response -> {
+ pendingRequestCount.decrementAndGet();
+ reply(requestId, responseTopic, response);
+ },
+ e -> {
+ pendingRequestCount.decrementAndGet();
+ if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
+ log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
+ } else {
+ log.trace("[{}] Failed to process the request: {}", requestId, request, e);
+ }
+ },
+ requestTimeout,
+ timeoutExecutor,
+ callbackExecutor);
+ } catch (Throwable e) {
+ pendingRequestCount.decrementAndGet();
+ log.warn("[{}] Failed to process the request: {}", requestId, request, e);
+ }
+ });
+ } catch (Throwable e) {
+ log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollInterval);
- } catch (InterruptedException e) {
- log.trace("Failed to wait until the server has capacity to handle new requests", e);
+ } catch (InterruptedException e2) {
+ log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
- ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
- requests.forEach(request -> {
- Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
- if (requestIdHeader == null) {
- log.error("[{}] Missing requestId in header", request);
- return;
- }
- UUID requestId = bytesToUuid(requestIdHeader.value());
- if (requestId == null) {
- log.error("[{}] Missing requestId in header and body", request);
- return;
- }
- Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
- if (responseTopicHeader == null) {
- log.error("[{}] Missing response topic in header", request);
- return;
- }
- String responseTopic = bytesToString(responseTopicHeader.value());
- try {
- pendingRequestCount.getAndIncrement();
- Request decodedRequest = requestTemplate.decode(request);
- AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
- response -> {
- pendingRequestCount.decrementAndGet();
- reply(requestId, responseTopic, response);
- },
- e -> {
- pendingRequestCount.decrementAndGet();
- if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
- log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
- } else {
- log.trace("[{}] Failed to process the request: {}", requestId, request, e);
- }
- },
- requestTimeout,
- timeoutExecutor,
- callbackExecutor);
- } catch (Throwable e) {
- pendingRequestCount.decrementAndGet();
- log.warn("[{}] Failed to process the request: {}", requestId, request, e);
- }
- });
}
});
}
@@ -141,7 +150,7 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
}
private void reply(UUID requestId, String topic, Response response) {
- responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
+ responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))), null);
}
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 7b2c05e..3068974 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,17 +16,12 @@
package org.thingsboard.server.common.transport.session;
import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
-import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
+import lombok.Getter;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.session.SessionContext;
-import org.thingsboard.server.common.transport.SessionMsgProcessor;
-import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
-import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
-import java.util.Optional;
+import java.util.UUID;
/**
* @author Andrew Shvayka
@@ -34,7 +29,9 @@ import java.util.Optional;
@Data
public abstract class DeviceAwareSessionContext implements SessionContext {
- private volatile TransportProtos.DeviceInfoProto deviceInfo;
+ @Getter
+ private volatile DeviceId deviceId;
+ private volatile DeviceInfoProto deviceInfo;
public long getDeviceIdMSB() {
return deviceInfo.getDeviceIdMSB();
@@ -44,6 +41,16 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
return deviceInfo.getDeviceIdLSB();
}
+ public DeviceId getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceInfo(DeviceInfoProto deviceInfo) {
+ this.deviceInfo = deviceInfo;
+ this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
+ }
+
+
public boolean isConnected() {
return deviceInfo != null;
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
new file mode 100644
index 0000000..d0e6d18
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+public interface SessionMsgListener {
+
+ void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 84d34e1..dc2e306 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,23 +15,33 @@
*/
package org.thingsboard.server.common.transport;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
/**
* Created by ashvayka on 04.10.18.
*/
public interface TransportService {
- void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
- TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
+ void process(ValidateDeviceTokenRequestMsg msg,
+ TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
- void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg,
- TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
+ void process(ValidateDeviceX509CertRequestMsg msg,
+ TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
- void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
+ void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
- void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
+ void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
- void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+ void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+
+ void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
+
+ void deregisterSession(SessionInfoProto sessionInfo);
}
common/transport/src/main/proto/transport.proto 80(+57 -23)
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index e78f873..f3b3ae8 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -23,9 +23,12 @@ option java_outer_classname = "TransportProtos";
* Data Structures;
*/
message SessionInfoProto {
- string nodeId = 1;
- int64 sessionIdMSB = 2;
- int64 sessionIdLSB = 3;
+ int64 sessionIdMSB = 1;
+ int64 sessionIdLSB = 2;
+ int64 tenantIdMSB = 3;
+ int64 tenantIdLSB = 4;
+ int64 deviceIdMSB = 5;
+ int64 deviceIdLSB = 6;
}
enum SessionEvent {
@@ -33,12 +36,25 @@ enum SessionEvent {
CLOSED = 1;
}
+enum SessionType {
+ SYNC = 0;
+ ASYNC = 1;
+}
+
+enum KeyValueType {
+ BOOLEAN_V = 0;
+ LONG_V = 1;
+ DOUBLE_V = 2;
+ STRING_V = 3;
+}
+
message KeyValueProto {
string key = 1;
- bool bool_v = 2;
- int64 long_v = 3;
- double double_v = 4;
- string string_v = 5;
+ KeyValueType type = 2;
+ bool bool_v = 3;
+ int64 long_v = 4;
+ double double_v = 5;
+ string string_v = 6;
}
message TsKvListProto {
@@ -60,33 +76,28 @@ message DeviceInfoProto {
* Messages that use Data Structures;
*/
message SessionEventMsg {
- SessionInfoProto sessionInfo = 1;
- int64 deviceIdMSB = 2;
- int64 deviceIdLSB = 3;
- SessionEvent event = 4;
+ string nodeId = 1;
+ SessionType sessionType = 2;
+ SessionEvent event = 3;
}
message PostTelemetryMsg {
- SessionInfoProto sessionInfo = 1;
- repeated TsKvListProto tsKvList = 2;
+ repeated TsKvListProto tsKvList = 1;
}
message PostAttributeMsg {
- SessionInfoProto sessionInfo = 1;
- repeated TsKvListProto tsKvList = 2;
+ repeated KeyValueProto kv = 1;
}
message GetAttributeRequestMsg {
- SessionInfoProto sessionInfo = 1;
- repeated string clientAttributeNames = 2;
- repeated string sharedAttributeNames = 3;
+ repeated string clientAttributeNames = 1;
+ repeated string sharedAttributeNames = 2;
}
message GetAttributeResponseMsg {
- SessionInfoProto sessionInfo = 1;
- repeated TsKvListProto clientAttributeList = 2;
- repeated TsKvListProto sharedAttributeList = 3;
- repeated string deletedAttributeKeys = 4;
+ repeated TsKvListProto clientAttributeList = 1;
+ repeated TsKvListProto sharedAttributeList = 2;
+ repeated string deletedAttributeKeys = 3;
}
message ValidateDeviceTokenRequestMsg {
@@ -101,11 +112,34 @@ message ValidateDeviceCredentialsResponseMsg {
DeviceInfoProto deviceInfo = 1;
}
+message SessionCloseNotificationProto {
+ string message = 1;
+}
+
+message TransportToDeviceActorMsg {
+ SessionInfoProto sessionInfo = 1;
+ SessionEventMsg sessionEvent = 2;
+ PostTelemetryMsg postTelemetry = 3;
+ PostAttributeMsg postAttributes = 4;
+ GetAttributeRequestMsg getAttributes = 5;
+}
+
+message DeviceActorToTransportMsg {
+ int64 sessionIdMSB = 1;
+ int64 sessionIdLSB = 2;
+ SessionCloseNotificationProto sessionCloseNotification = 3;
+ GetAttributeResponseMsg getAttributesResponse = 4;
+}
+
/**
* Main messages;
*/
-message TransportToRuleEngineMsg {
+message ToRuleEngineMsg {
+ TransportToDeviceActorMsg toDeviceActorMsg = 1;
+}
+message ToTransportMsg {
+ DeviceActorToTransportMsg toDeviceSessionMsg = 1;
}
message TransportApiRequestMsg {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 23bc4cc..d8f1d10 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,32 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttTopicSubscription;
-import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
-import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
-import org.thingsboard.server.common.data.security.DeviceX509Credentials;
-import org.thingsboard.server.common.msg.core.SessionOpenMsg;
-import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
-import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
@@ -61,49 +54,19 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.thingsboard.server.gen.transport.TransportProtos.*;
-
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
-import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
-import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
-import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
/**
* @author Andrew Shvayka
@@ -389,7 +352,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
- transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+ transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
checkGatewaySession();
}
}
@@ -418,7 +381,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
- transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+ transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
checkGatewaySession();
}
}
@@ -452,7 +415,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close();
if (deviceSessionCtx.isConnected()) {
- transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
if (gatewaySessionCtx != null) {
gatewaySessionCtx.onGatewayDisconnect();
}
@@ -534,7 +497,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (deviceSessionCtx.isConnected()) {
- transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
}
}
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 4d945de..76c1c1b 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,41 @@
*/
package org.thingsboard.server.mqtt.service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
-import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -37,10 +57,17 @@ import java.util.concurrent.Executors;
* Created by ashvayka on 05.10.18.
*/
@Service
+@Slf4j
public class MqttTransportService implements TransportService {
@Value("${kafka.rule_engine.topic}")
private String ruleEngineTopic;
+ @Value("${kafka.notifications.topic}")
+ private String notificationsTopic;
+ @Value("${kafka.notifications.poll_interval}")
+ private int notificationsPollDuration;
+ @Value("${kafka.notifications.auto_commit_interval}")
+ private int notificationsAutoCommitInterval;
@Value("${kafka.transport_api.requests_topic}")
private String transportApiRequestsTopic;
@Value("${kafka.transport_api.responses_topic}")
@@ -54,6 +81,8 @@ public class MqttTransportService implements TransportService {
@Value("${kafka.transport_api.response_auto_commit_interval}")
private int autoCommitInterval;
+ private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
+
@Autowired
private TbKafkaSettings kafkaSettings;
//We use this to get the node id. We should replace this with a component that provides the node id.
@@ -63,6 +92,12 @@ public class MqttTransportService implements TransportService {
private ExecutorService transportCallbackExecutor;
private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
+ private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
+ private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
+
+ private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
+
+ private volatile boolean stopped = false;
@PostConstruct
public void init() {
@@ -77,7 +112,7 @@ public class MqttTransportService implements TransportService {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
responseBuilder.clientId(transportContext.getNodeId());
- responseBuilder.groupId("transport-node");
+ responseBuilder.groupId(null);
responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new TransportApiResponseDecoder());
@@ -91,16 +126,79 @@ public class MqttTransportService implements TransportService {
builder.pollInterval(responsePollDuration);
transportApiTemplate = builder.build();
transportApiTemplate.init();
+
+ TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder();
+ ruleEngineProducerBuilder.settings(kafkaSettings);
+ ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic);
+ ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder());
+ ruleEngineProducer = ruleEngineProducerBuilder.build();
+ ruleEngineProducer.init();
+
+ TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
+ mainConsumerBuilder.settings(kafkaSettings);
+ mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId());
+ mainConsumerBuilder.clientId(transportContext.getNodeId());
+ mainConsumerBuilder.groupId(null);
+ mainConsumerBuilder.autoCommit(true);
+ mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
+ mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
+ mainConsumer = mainConsumerBuilder.build();
+ mainConsumer.subscribe();
+
+ mainConsumerExecutor.execute(() -> {
+ while (!stopped) {
+ try {
+ ConsumerRecords<String, byte[]> records = mainConsumer.poll(Duration.ofMillis(notificationsPollDuration));
+ records.forEach(record -> {
+ try {
+ ToTransportMsg toTransportMsg = mainConsumer.decode(record);
+ if (toTransportMsg.hasToDeviceSessionMsg()) {
+ TransportProtos.DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
+ UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
+ SessionMsgListener listener = sessions.get(sessionId);
+ if (listener != null) {
+ transportCallbackExecutor.submit(() -> {
+ if (toSessionMsg.hasGetAttributesResponse()) {
+ listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
+ }
+ });
+ } else {
+ //TODO: should we notify the device actor about missed session?
+ log.debug("[{}] Missing session.", sessionId);
+ }
+
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to process the notification.", e);
+ }
+ });
+ } catch (Exception e) {
+ log.warn("Failed to obtain messages from queue.", e);
+ try {
+ Thread.sleep(notificationsPollDuration);
+ } catch (InterruptedException e2) {
+ log.trace("Failed to wait until the server has capacity to handle new requests", e2);
+ }
+ }
+ }
+ });
}
@PreDestroy
public void destroy() {
+ stopped = true;
if (transportApiTemplate != null) {
transportApiTemplate.stop();
}
if (transportCallbackExecutor != null) {
transportCallbackExecutor.shutdownNow();
}
+ if (mainConsumer != null) {
+ mainConsumer.unsubscribe();
+ }
+ if (mainConsumerExecutor != null) {
+ mainConsumerExecutor.shutdownNow();
+ }
}
@Override
@@ -118,17 +216,69 @@ public class MqttTransportService implements TransportService {
}
@Override
- public void process(SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+ public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSessionEvent(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+ @Override
+ public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setPostTelemetry(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+ public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setPostAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+ @Override
+ public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
+ sessions.putIfAbsent(toId(sessionInfo), listener);
+ //TODO: monitor sessions periodically: PING REQ/RESP, etc.
}
@Override
- public void process(PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+ public void deregisterSession(SessionInfoProto sessionInfo) {
+ sessions.remove(toId(sessionInfo));
+ }
+
+ private UUID toId(SessionInfoProto sessionInfo) {
+ return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ }
+
+ private String getRoutingKey(SessionInfoProto sessionInfo) {
+ return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
+ }
+
+ private static class TransportCallbackAdaptor implements Callback {
+ private final TransportServiceCallback<Void> callback;
+
+ TransportCallbackAdaptor(TransportServiceCallback<Void> callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception == null) {
+ callback.onSuccess(null);
+ } else {
+ callback.onError(exception);
+ }
+ }
+ }
+ private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
+ ruleEngineProducer.send(getRoutingKey(sessionInfo), toRuleEngineMsg, new TransportCallbackAdaptor(callback));
}
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
new file mode 100644
index 0000000..ee929ce
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt.service;
+
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class ToRuleEngineMsgEncoder implements TbKafkaEncoder<ToRuleEngineMsg> {
+ @Override
+ public byte[] encode(ToRuleEngineMsg value) {
+ return value.toByteArray();
+ }
+}
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
index 707fb4b..735eee0 100644
--- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -82,3 +82,7 @@ kafka:
response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule_engine:
topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
+ notifications:
+ topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
+ poll_interval: "${TB_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
+ auto_commit_interval: "${TB_TRANSPORT_NOTIFICATIONS_AUTO_COMMIT_INTERVAL_MS:100}"