thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 293(+281 -12)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java 34(+29 -5)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java 26(+20 -6)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java 18(+18 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java 15(+15 -0)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java 6(+3 -3)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java 4(+2 -2)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java 6(+3 -3)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java 2(+1 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java 18(+13 -5)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java 33(+25 -8)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java 3(+2 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java 3(+2 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionType.java 23(+0 -23)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 3cc211c..21bc090 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -233,21 +233,6 @@ public class ActorSystemContext {
@Getter
private final Config config;
- @Getter
- private ExecutorService tsCallBackExecutor;
-
- @PostConstruct
- public void initExecutor() {
- tsCallBackExecutor = Executors.newSingleThreadExecutor();
- }
-
- @PreDestroy
- public void shutdownExecutor() {
- if (tsCallBackExecutor != null) {
- tsCallBackExecutor.shutdownNow();
- }
- }
-
public ActorSystemContext() {
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b3b0072..7dca2c8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,16 +17,10 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
import com.google.common.base.Function;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.alarm.AlarmService;
@@ -41,7 +35,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import scala.concurrent.duration.Duration;
-import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -122,33 +115,6 @@ class DefaultTbContext implements TbContext {
}
@Override
- public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
- saveAndNotify(entityId, ts, 0L, callback);
- }
-
- @Override
- public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
- ListenableFuture<List<Void>> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl);
- Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
- @Override
- public void onSuccess(@Nullable List<Void> result) {
- mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts);
- callback.onSuccess(null);
- }
-
- @Override
- public void onFailure(Throwable t) {
- callback.onFailure(t);
- }
- }, mainCtx.getTsCallBackExecutor());
- }
-
- @Override
- public void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
-
- }
-
- @Override
public ListeningExecutor getJsExecutor() {
return mainCtx.getJsExecutor();
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index 800bc3b..ba5f9f9 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index c2a2df8..c1338e4 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -89,15 +89,15 @@ import java.util.stream.Collectors;
public class TelemetryController extends BaseController {
@Autowired
- private TelemetrySubscriptionService subscriptionService;
-
- @Autowired
private AttributesService attributesService;
@Autowired
private TimeseriesService tsService;
@Autowired
+ private TelemetrySubscriptionService tsSubService;
+
+ @Autowired
private AccessValidator accessValidator;
private ExecutorService executor;
@@ -312,13 +312,11 @@ public class TelemetryController extends BaseController {
}
SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
- ListenableFuture<List<Void>> future = attributesService.save(entityId, scope, attributes);
- Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ tsSubService.saveAndNotify(entityId, scope, attributes, new FutureCallback<Void>() {
@Override
- public void onSuccess(@Nullable List<Void> tmp) {
+ public void onSuccess(@Nullable Void tmp) {
logAttributesUpdated(user, entityId, scope, attributes, null);
result.setResult(new ResponseEntity(HttpStatus.OK));
- subscriptionService.onAttributesUpdateFromServer(entityId, scope, attributes);
}
@Override
@@ -327,7 +325,6 @@ public class TelemetryController extends BaseController {
AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
}
});
- result.setResult(new ResponseEntity(HttpStatus.OK));
});
} else {
return getImmediateDeferredResult("Request is not a JSON object", HttpStatus.BAD_REQUEST);
@@ -358,12 +355,10 @@ public class TelemetryController extends BaseController {
}
SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
- ListenableFuture<List<Void>> future = tsService.save(entityId, entries, ttl);
- Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ tsSubService.saveAndNotify(entityId, entries, ttl, new FutureCallback<Void>() {
@Override
- public void onSuccess(@Nullable List<Void> tmp) {
+ public void onSuccess(@Nullable Void tmp) {
result.setResult(new ResponseEntity(HttpStatus.OK));
- subscriptionService.onTimeseriesUpdateFromServer(entityId, entries);
}
@Override
@@ -371,7 +366,6 @@ public class TelemetryController extends BaseController {
AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
}
});
- result.setResult(new ResponseEntity(HttpStatus.OK));
});
}
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
index 02e12fe..e9bacfd 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.security;
import com.google.common.base.Function;
diff --git a/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
index 7feae03..2b91c60 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.security;
import com.google.common.util.concurrent.FutureCallback;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 81cdac9..58bbec5 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -1,19 +1,55 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
/**
* Created by ashvayka on 27.03.18.
@@ -25,42 +61,275 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
@Autowired
private TelemetryWebSocketService wsService;
+ @Autowired
+ private AttributesService attrService;
+
+ @Autowired
+ private TimeseriesService tsService;
+
+ @Autowired
+ private ClusterRoutingService routingService;
+
+ private ExecutorService tsCallBackExecutor;
+ private ExecutorService wsCallBackExecutor;
+
+ @PostConstruct
+ public void initExecutor() {
+ tsCallBackExecutor = Executors.newSingleThreadExecutor();
+ wsCallBackExecutor = Executors.newSingleThreadExecutor();
+ }
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ if (tsCallBackExecutor != null) {
+ tsCallBackExecutor.shutdownNow();
+ }
+ if (wsCallBackExecutor != null) {
+ wsCallBackExecutor.shutdownNow();
+ }
+ }
+
private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
@Override
- public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
-
+ public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+ Optional<ServerAddress> server = routingService.resolveById(entityId);
+ Subscription subscription;
+ if (server.isPresent()) {
+ ServerAddress address = server.get();
+ log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
+ subscription = new Subscription(sub, true, address);
+// rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
+ } else {
+ log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
+ subscription = new Subscription(sub, true);
+ }
+ registerSubscription(sessionId, entityId, subscription);
}
@Override
- public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
+ public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
+ cleanupLocalWsSessionSubscriptions(sessionId);
+ }
+ @Override
+ public void removeSubscription(String sessionId, int subscriptionId) {
+ log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
+ Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
+ if (sessionSubscriptions != null) {
+ Subscription subscription = sessionSubscriptions.remove(subscriptionId);
+ if (subscription != null) {
+ processSubscriptionRemoval(sessionId, sessionSubscriptions, subscription);
+ } else {
+ log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
+ }
+ } else {
+ log.debug("[{}] No session subscriptions found!", sessionId);
+ }
}
@Override
- public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
+ public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
+ saveAndNotify(entityId, ts, 0L, callback);
+ }
+ @Override
+ public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
+ ListenableFuture<List<Void>> saveFuture = tsService.save(entityId, ts, ttl);
+ addMainCallback(saveFuture, callback);
+ addWsCallback(saveFuture, success -> onTimeseriesUpdate(entityId, ts));
}
@Override
- public void removeSubscription(String sessionId, int cmdId) {
+ public void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
+ ListenableFuture<List<Void>> saveFuture = attrService.save(entityId, scope, attributes);
+ addMainCallback(saveFuture, callback);
+ addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
+ }
+ private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+ Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
+ if (!serverAddress.isPresent()) {
+ onLocalAttributesUpdate(entityId, scope, attributes);
+ } else {
+// rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
+ }
}
- @Override
- public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+ private void onTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
+ Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
+ if (!serverAddress.isPresent()) {
+ onLocalTimeseriesUpdate(entityId, ts);
+ } else {
+// rpcHandler.onTimeseriesUpdate(ctx, serverAddress.get(), entityId, entries);
+ }
+ }
+ private void onLocalAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+ onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
+ List<TsKvEntry> subscriptionUpdate = null;
+ for (AttributeKvEntry kv : attributes) {
+ if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
+ if (subscriptionUpdate == null) {
+ subscriptionUpdate = new ArrayList<>();
+ }
+ subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
+ }
+ }
+ return subscriptionUpdate;
+ });
}
- @Override
- public void onLocalTimeseriesUpdate(EntityId entityId, Map<Long, List<KvEntry>> ts) {
+ private void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
+ onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> {
+ List<TsKvEntry> subscriptionUpdate = null;
+ for (TsKvEntry kv : ts) {
+ if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
+ if (subscriptionUpdate == null) {
+ subscriptionUpdate = new ArrayList<>();
+ }
+ subscriptionUpdate.add(kv);
+ }
+ }
+ return subscriptionUpdate;
+ });
+ }
+ private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
+ Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
+ if (deviceSubscriptions != null) {
+ deviceSubscriptions.stream().filter(filter).forEach(s -> {
+ String sessionId = s.getWsSessionId();
+ List<TsKvEntry> subscriptionUpdate = f.apply(s);
+ if (subscriptionUpdate == null || !subscriptionUpdate.isEmpty()) {
+ SubscriptionUpdate update = new SubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
+ if (s.isLocal()) {
+ updateSubscriptionState(sessionId, s, update);
+ wsService.sendWsMsg(sessionId, update);
+ } else {
+ //TODO: ashvayka
+// rpcHandler.onSubscriptionUpdate(ctx, s.getServer(), sessionId, update);
+ }
+ }
+ });
+ } else {
+ log.debug("[{}] No device subscriptions to process!", entityId);
+ }
}
- @Override
- public void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes) {
+ private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
+ log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
+ update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
+ }
+
+ private void registerSubscription(String sessionId, EntityId entityId, Subscription subscription) {
+ Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.computeIfAbsent(entityId, k -> new HashSet<>());
+ deviceSubscriptions.add(subscription);
+ Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.computeIfAbsent(sessionId, k -> new HashMap<>());
+ sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
+ }
+
+ public void cleanupLocalWsSessionSubscriptions(String sessionId) {
+ cleanupWsSessionSubscriptions(sessionId, true);
+ }
+
+ public void cleanupRemoteWsSessionSubscriptions(String sessionId) {
+ cleanupWsSessionSubscriptions(sessionId, false);
+ }
+
+ private void cleanupWsSessionSubscriptions(String sessionId, boolean localSession) {
+ log.debug("[{}] Removing all subscriptions for particular session.", sessionId);
+ Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
+ if (sessionSubscriptions != null) {
+ int sessionSubscriptionSize = sessionSubscriptions.size();
+
+ for (Subscription subscription : sessionSubscriptions.values()) {
+ EntityId entityId = subscription.getEntityId();
+ Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
+ deviceSubscriptions.remove(subscription);
+ if (deviceSubscriptions.isEmpty()) {
+ subscriptionsByEntityId.remove(entityId);
+ }
+ }
+ subscriptionsByWsSessionId.remove(sessionId);
+ log.debug("[{}] Removed {} subscriptions for particular session.", sessionId, sessionSubscriptionSize);
+
+ if (localSession) {
+ notifyWsSubscriptionClosed(sessionId, sessionSubscriptions);
+ }
+ } else {
+ log.debug("[{}] No subscriptions found!", sessionId);
+ }
+ }
+
+ private void notifyWsSubscriptionClosed(String sessionId, Map<Integer, Subscription> sessionSubscriptions) {
+ Set<ServerAddress> affectedServers = new HashSet<>();
+ for (Subscription subscription : sessionSubscriptions.values()) {
+ if (subscription.getServer() != null) {
+ affectedServers.add(subscription.getServer());
+ }
+ }
+ for (ServerAddress address : affectedServers) {
+ log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address);
+// rpcHandler.onSessionClose(ctx, address, sessionId);
+ }
+ }
+
+ private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) {
+ EntityId entityId = subscription.getEntityId();
+ if (subscription.isLocal() && subscription.getServer() != null) {
+// rpcHandler.onSubscriptionClose(ctx, subscription.getServer(), sessionId, subscription.getSubscriptionId());
+ }
+ if (sessionSubscriptions.isEmpty()) {
+ log.debug("[{}] Removed last subscription for particular session.", sessionId);
+ subscriptionsByWsSessionId.remove(sessionId);
+ } else {
+ log.debug("[{}] Removed session subscription.", sessionId);
+ }
+ Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
+ if (deviceSubscriptions != null) {
+ boolean result = deviceSubscriptions.remove(subscription);
+ if (result) {
+ if (deviceSubscriptions.size() == 0) {
+ log.debug("[{}] Removed last subscription for particular device.", sessionId);
+ subscriptionsByEntityId.remove(entityId);
+ } else {
+ log.debug("[{}] Removed device subscription.", sessionId);
+ }
+ } else {
+ log.debug("[{}] Subscription not found!", sessionId);
+ }
+ } else {
+ log.debug("[{}] No device subscriptions found!", sessionId);
+ }
+ }
+
+ private void addMainCallback(ListenableFuture<List<Void>> saveFuture, final FutureCallback<Void> callback) {
+ Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> result) {
+ callback.onSuccess(null);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ }, tsCallBackExecutor);
+ }
+
+ private void addWsCallback(ListenableFuture<List<Void>> saveFuture, Consumer<Void> callback) {
+ Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> result) {
+ callback.accept(null);
+ }
+ @Override
+ public void onFailure(Throwable t) {
+ }
+ }, wsCallBackExecutor);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 57dd93d..57f3876 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -24,6 +39,7 @@ import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
@@ -31,9 +47,9 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionC
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
import org.thingsboard.server.service.security.AccessValidator;
@@ -146,6 +162,14 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
}
+ @Override
+ public void sendWsMsg(String sessionId, SubscriptionUpdate update) {
+ WsSessionMetaData md = wsSessionsMap.get(sessionId);
+ if (md != null) {
+ sendWsMsg(md.getSessionRef(), update);
+ }
+ }
+
private void handleWsAttributesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
String sessionId = sessionRef.getSessionId();
log.debug("[{}] Processing: {}", sessionId, cmd);
@@ -180,7 +204,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
keys.forEach(key -> subState.put(key, 0L));
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
}
@@ -267,7 +291,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
Map<String, Long> subState = new HashMap<>(attributesData.size());
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
}
@@ -340,7 +364,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(data.size());
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
}
@@ -370,7 +394,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, startTs));
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 44e8512..7bf223f 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -1,5 +1,21 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
+import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
@@ -15,17 +31,15 @@ import java.util.Set;
*/
public interface TelemetrySubscriptionService {
- void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
-
- void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries);
+ void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
void removeSubscription(String sessionId, int cmdId);
- void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
+ void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
- void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts);
+ void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
- void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes);
+ void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
index a7e7cad..00fb80a 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
import java.io.IOException;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
index 883a174..be6fc56 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
@@ -1,6 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
/**
* Created by ashvayka on 27.03.18.
@@ -10,4 +26,6 @@ public interface TelemetryWebSocketService {
void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent sessionEvent);
void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
+
+ void sendWsMsg(String sessionId, SubscriptionUpdate update);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
index 3fd4b19..53438c5 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
import lombok.Getter;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
index 6d57122..5d4630c 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
import lombok.Data;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
index c0b162b..dd15ed3 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.telemetry;
import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
index d116b2e..f30f1a5 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
@@ -16,7 +16,7 @@
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.NoArgsConstructor;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
/**
* @author Andrew Shvayka
@@ -25,8 +25,8 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
public class AttributesSubscriptionCmd extends SubscriptionCmd {
@Override
- public SubscriptionType getType() {
- return SubscriptionType.ATTRIBUTES;
+ public TelemetryFeature getType() {
+ return TelemetryFeature.ATTRIBUTES;
}
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
index b06476a..7f78abd 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
@@ -18,7 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
@NoArgsConstructor
@AllArgsConstructor
@@ -32,7 +32,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
private String scope;
private boolean unsubscribe;
- public abstract SubscriptionType getType();
+ public abstract TelemetryFeature getType();
@Override
public String toString() {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
index 4d64ca7..88ecb2d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
@@ -18,7 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
/**
* @author Andrew Shvayka
@@ -35,7 +35,7 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
private String agg;
@Override
- public SubscriptionType getType() {
- return SubscriptionType.TIMESERIES;
+ public TelemetryFeature getType() {
+ return TelemetryFeature.TIMESERIES;
}
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index 158945c..1acc29d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
}
Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
Subscription subscription = new Subscription(
- new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
+ new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
false, msg.getServerAddress());
subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index 4fdfe4a..9cb67fd 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -24,7 +24,11 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.msg.core.*;
+import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
+import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
+import org.thingsboard.server.common.msg.core.GetAttributesRequest;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
@@ -35,9 +39,13 @@ import org.thingsboard.server.extensions.api.plugins.msg.TelemetryUploadRequestR
import org.thingsboard.server.extensions.api.plugins.msg.UpdateAttributesRequestRuleToPluginMsg;
import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@@ -97,7 +105,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
@Override
public void onSuccess(PluginContext ctx, Void data) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
- subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), SubscriptionType.TIMESERIES, s ->
+ subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), TelemetryFeature.TIMESERIES, s ->
prepareSubscriptionUpdate(request, s)
);
}
@@ -131,7 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
public void onSuccess(PluginContext ctx, Void value) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
- subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), SubscriptionType.ATTRIBUTES, s -> {
+ subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), TelemetryFeature.ATTRIBUTES, s -> {
List<TsKvEntry> subscriptionUpdate = new ArrayList<>();
for (AttributeKvEntry kv : request.getAttributes()) {
if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 1374ef6..8c80e78 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -21,7 +21,12 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
-import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.common.data.kv.Aggregation;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
@@ -32,14 +37,26 @@ import org.thingsboard.server.extensions.api.plugins.ws.msg.BinaryPluginWebSocke
import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
import org.thingsboard.server.extensions.api.plugins.ws.msg.TextPluginWebSocketMsg;
import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
-import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.*;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -131,7 +148,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
keys.forEach(key -> subState.put(key, 0L));
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
}
@@ -168,7 +185,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
Map<String, Long> subState = new HashMap<>(attributesData.size());
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
}
@@ -234,7 +251,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(data.size());
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
}
@@ -262,7 +279,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, startTs));
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
index fc04713..98c7632 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
@@ -20,6 +20,7 @@ import lombok.Data;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
import java.util.Map;
@@ -47,7 +48,7 @@ public class Subscription {
return getSub().getEntityId();
}
- public SubscriptionType getType() {
+ public TelemetryFeature getType() {
return getSub().getType();
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
index c9598ef..e4a0d26 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.sub;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
import java.util.Map;
@@ -30,7 +31,7 @@ public class SubscriptionState {
@Getter private final String wsSessionId;
@Getter private final int subscriptionId;
@Getter private final EntityId entityId;
- @Getter private final SubscriptionType type;
+ @Getter private final TelemetryFeature type;
@Getter private final boolean allKeys;
@Getter private final Map<String, Long> keyStates;
@Getter private final String scope;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 9d9c2f3..0c3b174 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -19,20 +19,30 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler;
import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -70,7 +80,7 @@ public class SubscriptionManager {
EntityId entityId = subscription.getEntityId();
log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
registerSubscription(sessionId, entityId, subscription);
- if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
+ if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
final Map<String, Long> keyStates = subscription.getKeyStates();
ctx.loadAttributes(entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback<List<AttributeKvEntry>>() {
@Override
@@ -91,7 +101,7 @@ public class SubscriptionManager {
log.error("Failed to fetch missed updates.", e);
}
});
- } else if (subscription.getType() == SubscriptionType.TIMESERIES) {
+ } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
long curTs = System.currentTimeMillis();
List<TsKvQuery> queries = new ArrayList<>();
subscription.getKeyStates().entrySet().forEach(e -> {
@@ -175,7 +185,7 @@ public class SubscriptionManager {
}
}
- public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
+ public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, TelemetryFeature type, Function<Subscription, List<TsKvEntry>> f) {
onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f);
}
@@ -212,7 +222,7 @@ public class SubscriptionManager {
public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
if (!serverAddress.isPresent()) {
- onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
+ onLocalSubscriptionUpdate(ctx, entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
for (AttributeKvEntry kv : attributes) {
if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
@@ -229,7 +239,7 @@ public class SubscriptionManager {
public void onTimeseriesUpdateFromServer(PluginContext ctx, EntityId entityId, List<TsKvEntry> entries) {
Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
if (!serverAddress.isPresent()) {
- onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.TIMESERIES, s -> {
+ onLocalSubscriptionUpdate(ctx, entityId, TelemetryFeature.TIMESERIES, s -> {
List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
for (TsKvEntry kv : entries) {
if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 6b89470..44bd3f5 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,12 +63,6 @@ public interface TbContext {
void tellError(TbMsg msg, Throwable th);
- void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
-
- void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
-
- void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback);
-
RuleNodeId getSelfId();
AttributesService getAttributesService();
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
index 6df9830..46be7f8 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -89,7 +89,7 @@ public class TbTransformMsgNodeTest {
@Test
public void payloadCanBeUpdated() throws TbNodeException {
- initWithScript("return msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine' ");
+ initWithScript("msg.passed = msg.passed * metadata.temp; return msg.bigObj.newProp = 'Ukraine' ");
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");