thingsboard-aplcache
Changes
.gitignore 1(+1 -0)
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 142(+108 -34)
application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java 39(+39 -0)
application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java 24(+6 -18)
application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java 50(+50 -0)
application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java 30(+30 -0)
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java 7(+7 -0)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 19(+8 -11)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java 13(+9 -4)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java 90(+88 -2)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java 11(+11 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java 17(+16 -1)
Details
.gitignore 1(+1 -0)
diff --git a/.gitignore b/.gitignore
index 9039e8e..da8569b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,4 @@ pom.xml.versionsBackup
**/Californium.properties
**/.env
.instance_id
+rebuild-docker.sh
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 5e19d69..091b504 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -67,6 +67,7 @@ import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.rpc.DeviceRpcService;
import org.thingsboard.server.service.script.JsExecutorService;
import org.thingsboard.server.service.script.JsInvokeService;
+import org.thingsboard.server.service.session.DeviceSessionCacheService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.transport.RuleEngineTransportService;
@@ -201,6 +202,10 @@ public class ActorSystemContext {
@Getter
private DeviceStateService deviceStateService;
+ @Autowired
+ @Getter
+ private DeviceSessionCacheService deviceSessionCacheService;
+
@Lazy
@Autowired
@Getter
@@ -254,6 +259,14 @@ public class ActorSystemContext {
@Getter
private boolean allowSystemMailService;
+ @Value("${transport.sessions.inactivity_timeout}")
+ @Getter
+ private long sessionInactivityTimeout;
+
+ @Value("${transport.sessions.report_timeout}")
+ @Getter
+ private long sessionReportTimeout;
+
@Getter
@Setter
private ActorSystem actorSystem;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index bd2a0f4..7b412b1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -44,11 +44,19 @@ public class DeviceActor extends ContextAwareActor {
}
@Override
+ public void preStart() {
+ logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
+ try {
+ processor.initSessionTimeout(context());
+ logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
+ } catch (Exception e) {
+ logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId);
+ }
+ }
+
+ @Override
protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
- case CLUSTER_EVENT_MSG:
- processor.processClusterEventMsg((ClusterEventMsg) msg);
- break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
processor.process(context(), (TransportToDeviceActorMsgWrapper) msg);
break;
@@ -73,6 +81,9 @@ public class DeviceActor extends ContextAwareActor {
case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
break;
+ case SESSION_TIMEOUT_MSG:
+ processor.checkSessionsTimeout();
+ break;
default:
return false;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 021f262..08a4454 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -88,11 +88,11 @@ import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
*/
-public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
+class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
- private final TenantId tenantId;
- private final DeviceId deviceId;
- private final Map<UUID, SessionInfo> sessions;
+ final TenantId tenantId;
+ final DeviceId deviceId;
+ private final Map<UUID, SessionInfoMetaData> sessions;
private final Map<UUID, SessionInfo> attributeSubscriptions;
private final Map<UUID, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
@@ -116,6 +116,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.toDeviceRpcPendingMap = new HashMap<>();
this.toServerRpcPendingMap = new HashMap<>();
initAttributes();
+ restoreSessions();
}
private void initAttributes() {
@@ -160,7 +161,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
} else {
logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
}
-
}
private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
@@ -174,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (requestMd != null) {
logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
- null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
+ null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
}
}
@@ -227,11 +227,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
if (msg.hasPostAttributes()) {
handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
- reportActivity();
+ reportLogicalDeviceActivity();
}
if (msg.hasPostTelemetry()) {
handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
- reportActivity();
+ reportLogicalDeviceActivity();
}
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
@@ -241,11 +241,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
if (msg.hasToServerRPCCallRequest()) {
handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
- reportActivity();
+ reportLogicalDeviceActivity();
+ }
+ if (msg.hasSubscriptionInfo()) {
+ handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo());
}
}
- private void reportActivity() {
+ private void reportLogicalDeviceActivity() {
systemContext.getDeviceStateService().onDeviceActivity(deviceId);
}
@@ -406,28 +409,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- void processClusterEventMsg(ClusterEventMsg msg) {
-// if (!msg.isAdded()) {
-// logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
-// Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
-// .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
-// attributeSubscriptions.entrySet().removeIf(filter);
-// rpcSubscriptions.entrySet().removeIf(filter);
-// }
- }
-
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.remove(sessionId);
} else {
- SessionInfo session = sessions.get(sessionId);
- if (session == null) {
- session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+ SessionInfoMetaData sessionMD = sessions.get(sessionId);
+ if (sessionMD == null) {
+ sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
}
+ sessionMD.setSubscribedToAttributes(true);
logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
- attributeSubscriptions.put(sessionId, session);
+ attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
+ dumpSessions();
}
}
@@ -441,20 +436,22 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
} else {
- SessionInfo session = sessions.get(sessionId);
- if (session == null) {
- session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+ SessionInfoMetaData sessionMD = sessions.get(sessionId);
+ if (sessionMD == null) {
+ sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
}
+ sessionMD.setSubscribedToRPC(true);
logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
- rpcSubscriptions.put(sessionId, session);
+ rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
sendPendingRequests(context, sessionId, sessionInfo);
+ dumpSessions();
}
}
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
UUID sessionId = getSessionId(sessionInfo);
if (msg.getEvent() == SessionEvent.OPEN) {
- if(sessions.containsKey(sessionId)){
+ if (sessions.containsKey(sessionId)) {
logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
return;
}
@@ -462,13 +459,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
if (sessionIdToRemove != null) {
- closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
+ notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
}
}
- sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId()));
+ sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())));
if (sessions.size() == 1) {
reportSessionOpen();
}
+ dumpSessions();
} else if (msg.getEvent() == SessionEvent.CLOSED) {
logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
@@ -477,21 +475,34 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (sessions.isEmpty()) {
reportSessionClose();
}
+ dumpSessions();
+ }
+ }
+
+ private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
+ UUID sessionId = getSessionId(sessionInfo);
+ SessionInfoMetaData sessionMD = sessions.get(sessionId);
+ if (sessionMD != null) {
+ sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
+ sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
+ sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
}
+ dumpSessions();
}
void processCredentialsUpdate() {
- sessions.forEach(this::closeSession);
+ sessions.forEach(this::notifyTransportAboutClosedSession);
attributeSubscriptions.clear();
rpcSubscriptions.clear();
+ dumpSessions();
}
- private void closeSession(UUID sessionId, SessionInfo sessionInfo) {
+ private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) {
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
- systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+ systemContext.getRuleEngineTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
}
void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
@@ -605,4 +616,67 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
return builder.build();
}
+
+ private void restoreSessions() {
+ TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
+ if (sessionsDump.getSerializedSize() == 0) {
+ return;
+ }
+ for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
+ SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
+ UUID sessionId = getSessionId(sessionInfoProto);
+ SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
+ TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
+ SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
+ sessions.put(sessionId, sessionInfoMetaData);
+ if (subInfo.getAttributeSubscription()) {
+ rpcSubscriptions.put(sessionId, sessionInfo);
+ }
+ if (subInfo.getAttributeSubscription()) {
+ attributeSubscriptions.put(sessionId, sessionInfo);
+ }
+ }
+ }
+
+ private void dumpSessions() {
+ List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
+ sessions.forEach((uuid, sessionMD) -> {
+ if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
+ return;
+ }
+ SessionInfo sessionInfo = sessionMD.getSessionInfo();
+ TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
+ .setLastActivityTime(sessionMD.getLastActivityTime())
+ .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
+ .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
+ TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder()
+ .setSessionIdMSB(uuid.getMostSignificantBits())
+ .setSessionIdLSB(uuid.getLeastSignificantBits())
+ .setNodeId(sessionInfo.getNodeId()).build();
+ sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
+ .setSessionInfo(sessionInfoProto)
+ .setSubscriptionInfo(subscriptionInfoProto).build());
+ });
+ systemContext.getDeviceSessionCacheService()
+ .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
+ .addAllSessions(sessionsList).build());
+ }
+
+ void initSessionTimeout(ActorContext context) {
+ schedulePeriodicMsgWithDelay(context, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout());
+ }
+
+ void checkSessionsTimeout() {
+ long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
+ Map<UUID, SessionInfoMetaData> sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ sessionsToRemove.forEach((sessionId, sessionMD) -> {
+ sessions.remove(sessionId);
+ rpcSubscriptions.remove(sessionId);
+ attributeSubscriptions.remove(sessionId);
+ notifyTransportAboutClosedSession(sessionId, sessionMD);
+ });
+ if (!sessionsToRemove.isEmpty()) {
+ dumpSessions();
+ }
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 43ae592..fe09077 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -25,4 +25,5 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
public class SessionInfo {
private final SessionType type;
private final String nodeId;
+ private long lastActivityTime;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java
new file mode 100644
index 0000000..dd08394
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import lombok.Data;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+class SessionInfoMetaData {
+ private final SessionInfo sessionInfo;
+ private long lastActivityTime;
+ private boolean subscribedToAttributes;
+ private boolean subscribedToRPC;
+
+ SessionInfoMetaData(SessionInfo sessionInfo) {
+ this(sessionInfo, System.currentTimeMillis());
+ }
+
+ SessionInfoMetaData(SessionInfo sessionInfo, long lastActivityTime) {
+ this.sessionInfo = sessionInfo;
+ this.lastActivityTime = lastActivityTime;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java
new file mode 100644
index 0000000..d9172ae
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+/**
+ * Created by ashvayka on 29.10.18.
+ */
+public class SessionTimeoutCheckMsg implements TbActorMsg {
+
+ private static final SessionTimeoutCheckMsg INSTANCE = new SessionTimeoutCheckMsg();
+
+ private SessionTimeoutCheckMsg() {
+ }
+
+ public static SessionTimeoutCheckMsg instance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_TIMEOUT_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index 8864486..c809782 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -40,43 +40,31 @@ public abstract class AbstractContextAwareMsgProcessor {
this.logger = logger;
}
- protected ActorRef getAppActor() {
- return systemContext.getAppActor();
- }
-
- protected Scheduler getScheduler() {
+ private Scheduler getScheduler() {
return systemContext.getScheduler();
}
- protected ExecutionContextExecutor getSystemDispatcher() {
+ private ExecutionContextExecutor getSystemDispatcher() {
return systemContext.getActorSystem().dispatcher();
}
protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs) {
- schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs, ctx.self());
+ schedulePeriodicMsgWithDelay(msg, delayInMs, periodInMs, ctx.self());
}
- protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs, ActorRef target) {
+ private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) {
logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
}
-
protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs) {
- scheduleMsgWithDelay(ctx, msg, delayInMs, ctx.self());
+ scheduleMsgWithDelay(msg, delayInMs, ctx.self());
}
- protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, ActorRef target) {
+ private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
}
- @Data
- @AllArgsConstructor
- private static class ComponentConfiguration {
- private final String clazz;
- private final String name;
- private final String configuration;
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 347483a..f822e4e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -127,7 +127,6 @@ public class TenantActor extends RuleChainManagerActor {
ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
}
-
private void onToDeviceActorMsg(DeviceAwareMsg msg) {
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
new file mode 100644
index 0000000..6201dab
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.session;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.cache.annotation.CachePut;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
+
+/**
+ * Created by ashvayka on 29.10.18.
+ */
+@Service
+@Slf4j
+public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
+
+ @Override
+ @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
+ public DeviceSessionsCacheEntry get(DeviceId deviceId) {
+ log.debug("[{}] Fetching session data from cache", deviceId);
+ return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
+ }
+
+ @Override
+ @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
+ public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
+ log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
+ return sessions;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
new file mode 100644
index 0000000..0a2e6a5
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.session;
+
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
+
+/**
+ * Created by ashvayka on 29.10.18.
+ */
+public interface DeviceSessionCacheService {
+
+ DeviceSessionsCacheEntry get(DeviceId deviceId);
+
+ DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index 0f72230..d3782e8 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -161,6 +161,13 @@ public class LocalTransportService extends AbstractTransportService implements R
}
@Override
+ public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
+ }
+ }
+
+ @Override
public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, callback)) {
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 54096ea..5824293 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -202,6 +202,9 @@ caffeine:
devices:
timeToLiveInMinutes: 1440
maxSize: 100000
+ sessions:
+ timeToLiveInMinutes: 1440
+ maxSize: 100000
assets:
timeToLiveInMinutes: 1440
maxSize: 100000
@@ -392,6 +395,9 @@ transport:
auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
notifications:
topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
+ sessions:
+ inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+ report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
rate_limits:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
index 698a69e..853caff 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
@@ -19,6 +19,7 @@ public class CacheConstants {
public static final String DEVICE_CREDENTIALS_CACHE = "deviceCredentials";
public static final String RELATIONS_CACHE = "relations";
public static final String DEVICE_CACHE = "devices";
+ public static final String SESSIONS_CACHE = "sessions";
public static final String ASSET_CACHE = "assets";
public static final String ENTITY_VIEW_CACHE = "entityViews";
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 24758b5..44a98d6 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -96,13 +96,8 @@ public enum MsgType {
*/
DEVICE_ACTOR_TO_RULE_ENGINE_MSG,
- /**
- * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
- */
- ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
- TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
SESSION_TIMEOUT_MSG,
- SESSION_CTRL_MSG,
+
STATS_PERSIST_TICK_MSG,
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index da8b3a6..a178bdf 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -43,10 +43,10 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.msg.EncryptionUtil;
+import org.thingsboard.server.common.transport.service.AbstractTransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
-import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
@@ -141,9 +141,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
- //TODO: should we push the notification to the rule engine?
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+ transportService.reportActivity(sessionInfo);
+ if (gatewaySessionHandler != null) {
+ gatewaySessionHandler.reportActivity();
+ }
}
break;
case DISCONNECT:
@@ -394,7 +397,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close();
if (deviceSessionCtx.isConnected()) {
- transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
transportService.deregisterSession(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.onGatewayDisconnect();
@@ -466,16 +469,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
- public static SessionEventMsg getSessionEventMsg(SessionEvent event) {
- return SessionEventMsg.newBuilder()
- .setSessionType(TransportProtos.SessionType.ASYNC)
- .setEvent(event).build();
- }
-
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (deviceSessionCtx.isConnected()) {
- transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+ transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
transportService.deregisterSession(sessionInfo);
}
}
@@ -495,7 +492,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
.setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
.build();
- transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
+ transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
transportService.registerAsyncSession(sessionInfo, this);
checkGatewaySession();
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index d600059..ac33ba6 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -34,6 +34,7 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.common.transport.service.AbstractTransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@@ -118,7 +119,7 @@ public class GatewaySessionHandler {
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
- transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
+ transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
@@ -334,7 +335,7 @@ public class GatewaySessionHandler {
private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
- transportService.process(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
+ transportService.process(deviceSessionCtx.getSessionInfo(), AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
}
@@ -360,11 +361,15 @@ public class GatewaySessionHandler {
return context;
}
- public MqttTransportAdaptor getAdaptor() {
+ MqttTransportAdaptor getAdaptor() {
return context.getAdaptor();
}
- public int nextMsgId() {
+ int nextMsgId() {
return deviceSessionCtx.nextMsgId();
}
+
+ public void reportActivity() {
+ devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo()));
+ }
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index 265dacb..3bb84ec 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -47,9 +47,14 @@ public abstract class AbstractTransportService implements TransportService {
private String perTenantLimitsConf;
@Value("${transport.rate_limits.tenant}")
private String perDevicesLimitsConf;
+ @Value("${transport.sessions.inactivity_timeout}")
+ private long sessionInactivityTimeout;
+ @Value("${transport.sessions.report_timeout}")
+ private long sessionReportTimeout;
protected ScheduledExecutorService schedulerExecutor;
protected ExecutorService transportCallbackExecutor;
+
private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
//TODO: Implement cleanup of this maps.
@@ -59,7 +64,81 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
- //TODO: monitor sessions periodically: PING REQ/RESP, etc.
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+ SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+ sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+ SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+ sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ @Override
+ public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ @Override
+ public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
+ reportActivityInternal(sessionInfo);
+ }
+
+ private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
+ UUID sessionId = toId(sessionInfo);
+ SessionMetaData sessionMetaData = sessions.get(sessionId);
+ if (sessionMetaData != null) {
+ sessionMetaData.updateLastActivityTime();
+ }
+ return sessionMetaData;
+ }
+
+ private void checkInactivityAndReportActivity() {
+ long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
+ sessions.forEach((uuid, sessionMD) -> {
+ if (sessionMD.getLastActivityTime() < expTime) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Session has expired due to last activity time: {}", toId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
+ }
+ process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
+ sessions.remove(uuid);
+ sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
+ } else {
+ process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
+ .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
+ .setRpcSubscription(sessionMD.isSubscribedToRPC())
+ .setLastActivityTime(sessionMD.getLastActivityTime()).build(), null);
+ }
+ });
}
@Override
@@ -131,7 +210,7 @@ public abstract class AbstractTransportService implements TransportService {
}
}
- protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
+ private UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
}
@@ -147,6 +226,7 @@ public abstract class AbstractTransportService implements TransportService {
}
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+ this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
}
public void destroy() {
@@ -161,4 +241,10 @@ public abstract class AbstractTransportService implements TransportService {
transportCallbackExecutor.shutdownNow();
}
}
+
+ public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
+ return TransportProtos.SessionEventMsg.newBuilder()
+ .setSessionType(TransportProtos.SessionType.ASYNC)
+ .setEvent(event).build();
+ }
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 6774942..0910961 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -228,6 +228,17 @@ public class RemoteTransportService extends AbstractTransportService {
}
@Override
+ public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscriptionInfo(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+ }
+
+ @Override
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, callback)) {
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
index 1de5711..8642e93 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
@@ -23,10 +23,25 @@ import org.thingsboard.server.gen.transport.TransportProtos;
* Created by ashvayka on 15.10.18.
*/
@Data
-public class SessionMetaData {
+class SessionMetaData {
private final TransportProtos.SessionInfoProto sessionInfo;
private final TransportProtos.SessionType sessionType;
private final SessionMsgListener listener;
+ private volatile long lastActivityTime;
+ private volatile boolean subscribedToAttributes;
+ private volatile boolean subscribedToRPC;
+
+ SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
+ this.sessionInfo = sessionInfo;
+ this.sessionType = sessionType;
+ this.listener = listener;
+ this.lastActivityTime = System.currentTimeMillis();
+ }
+
+ void updateLastActivityTime() {
+ this.lastActivityTime = System.currentTimeMillis();
+ }
+
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index a47438f..8944e94 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -61,10 +61,14 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
+ void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
+
void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
+ void reportActivity(SessionInfoProto sessionInfo);
+
void deregisterSession(SessionInfoProto sessionInfo);
}
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index 30351b0..f96bcbe 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -23,6 +23,9 @@ transport:
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
bind_port: "${COAP_BIND_PORT:5683}"
timeout: "${COAP_TIMEOUT:10000}"
+ sessions:
+ inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+ report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
rate_limits:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index 001e08a..6d593ed 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -24,6 +24,9 @@ server:
transport:
http:
request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"
+ sessions:
+ inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+ report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
rate_limits:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index e37d14e..e7f8942 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -44,8 +44,8 @@ transport:
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
sessions:
- max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}"
- max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}"
+ inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+ report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
rate_limits:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"