thingsboard-developers

Fixed compilation issues

4/1/2018 2:06:41 PM

Changes

extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionType.java 23(+0 -23)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 3cc211c..21bc090 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -233,21 +233,6 @@ public class ActorSystemContext {
     @Getter
     private final Config config;
 
-    @Getter
-    private ExecutorService tsCallBackExecutor;
-
-    @PostConstruct
-    public void initExecutor() {
-        tsCallBackExecutor = Executors.newSingleThreadExecutor();
-    }
-
-    @PreDestroy
-    public void shutdownExecutor() {
-        if (tsCallBackExecutor != null) {
-            tsCallBackExecutor.shutdownNow();
-        }
-    }
-
     public ActorSystemContext() {
         config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b3b0072..7dca2c8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,16 +17,10 @@ package org.thingsboard.server.actors.ruleChain;
 
 import akka.actor.ActorRef;
 import com.google.common.base.Function;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
 import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.alarm.AlarmService;
@@ -41,7 +35,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.dao.user.UserService;
 import scala.concurrent.duration.Duration;
 
-import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -122,33 +115,6 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
-    public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
-        saveAndNotify(entityId, ts, 0L, callback);
-    }
-
-    @Override
-    public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
-        ListenableFuture<List<Void>> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl);
-        Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
-            @Override
-            public void onSuccess(@Nullable List<Void> result) {
-                mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts);
-                callback.onSuccess(null);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                callback.onFailure(t);
-            }
-        }, mainCtx.getTsCallBackExecutor());
-    }
-
-    @Override
-    public void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
-
-    }
-
-    @Override
     public ListeningExecutor getJsExecutor() {
         return mainCtx.getJsExecutor();
     }
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index 800bc3b..ba5f9f9 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index c2a2df8..c1338e4 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -89,15 +89,15 @@ import java.util.stream.Collectors;
 public class TelemetryController extends BaseController {
 
     @Autowired
-    private TelemetrySubscriptionService subscriptionService;
-
-    @Autowired
     private AttributesService attributesService;
 
     @Autowired
     private TimeseriesService tsService;
 
     @Autowired
+    private TelemetrySubscriptionService tsSubService;
+
+    @Autowired
     private AccessValidator accessValidator;
 
     private ExecutorService executor;
@@ -312,13 +312,11 @@ public class TelemetryController extends BaseController {
             }
             SecurityUser user = getCurrentUser();
             return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
-                ListenableFuture<List<Void>> future = attributesService.save(entityId, scope, attributes);
-                Futures.addCallback(future, new FutureCallback<List<Void>>() {
+                tsSubService.saveAndNotify(entityId, scope, attributes, new FutureCallback<Void>() {
                     @Override
-                    public void onSuccess(@Nullable List<Void> tmp) {
+                    public void onSuccess(@Nullable Void tmp) {
                         logAttributesUpdated(user, entityId, scope, attributes, null);
                         result.setResult(new ResponseEntity(HttpStatus.OK));
-                        subscriptionService.onAttributesUpdateFromServer(entityId, scope, attributes);
                     }
 
                     @Override
@@ -327,7 +325,6 @@ public class TelemetryController extends BaseController {
                         AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
                     }
                 });
-                result.setResult(new ResponseEntity(HttpStatus.OK));
             });
         } else {
             return getImmediateDeferredResult("Request is not a JSON object", HttpStatus.BAD_REQUEST);
@@ -358,12 +355,10 @@ public class TelemetryController extends BaseController {
         }
         SecurityUser user = getCurrentUser();
         return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
-            ListenableFuture<List<Void>> future = tsService.save(entityId, entries, ttl);
-            Futures.addCallback(future, new FutureCallback<List<Void>>() {
+            tsSubService.saveAndNotify(entityId, entries, ttl, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(@Nullable List<Void> tmp) {
+                public void onSuccess(@Nullable Void tmp) {
                     result.setResult(new ResponseEntity(HttpStatus.OK));
-                    subscriptionService.onTimeseriesUpdateFromServer(entityId, entries);
                 }
 
                 @Override
@@ -371,7 +366,6 @@ public class TelemetryController extends BaseController {
                     AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
                 }
             });
-            result.setResult(new ResponseEntity(HttpStatus.OK));
         });
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
index 02e12fe..e9bacfd 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.security;
 
 import com.google.common.base.Function;
diff --git a/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
index 7feae03..2b91c60 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.security;
 
 import com.google.common.util.concurrent.FutureCallback;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 81cdac9..58bbec5 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -1,19 +1,55 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
 /**
  * Created by ashvayka on 27.03.18.
@@ -25,42 +61,275 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     @Autowired
     private TelemetryWebSocketService wsService;
 
+    @Autowired
+    private AttributesService attrService;
+
+    @Autowired
+    private TimeseriesService tsService;
+
+    @Autowired
+    private ClusterRoutingService routingService;
+
+    private ExecutorService tsCallBackExecutor;
+    private ExecutorService wsCallBackExecutor;
+
+    @PostConstruct
+    public void initExecutor() {
+        tsCallBackExecutor = Executors.newSingleThreadExecutor();
+        wsCallBackExecutor = Executors.newSingleThreadExecutor();
+    }
+
+    @PreDestroy
+    public void shutdownExecutor() {
+        if (tsCallBackExecutor != null) {
+            tsCallBackExecutor.shutdownNow();
+        }
+        if (wsCallBackExecutor != null) {
+            wsCallBackExecutor.shutdownNow();
+        }
+    }
+
     private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
 
     private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
 
     @Override
-    public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
-
+    public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+        Optional<ServerAddress> server = routingService.resolveById(entityId);
+        Subscription subscription;
+        if (server.isPresent()) {
+            ServerAddress address = server.get();
+            log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
+            subscription = new Subscription(sub, true, address);
+//            rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
+        } else {
+            log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
+            subscription = new Subscription(sub, true);
+        }
+        registerSubscription(sessionId, entityId, subscription);
     }
 
     @Override
-    public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
+    public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
+        cleanupLocalWsSessionSubscriptions(sessionId);
+    }
 
+    @Override
+    public void removeSubscription(String sessionId, int subscriptionId) {
+        log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
+        Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
+        if (sessionSubscriptions != null) {
+            Subscription subscription = sessionSubscriptions.remove(subscriptionId);
+            if (subscription != null) {
+                processSubscriptionRemoval(sessionId, sessionSubscriptions, subscription);
+            } else {
+                log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
+            }
+        } else {
+            log.debug("[{}] No session subscriptions found!", sessionId);
+        }
     }
 
     @Override
-    public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
+    public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
+        saveAndNotify(entityId, ts, 0L, callback);
+    }
 
+    @Override
+    public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
+        ListenableFuture<List<Void>> saveFuture = tsService.save(entityId, ts, ttl);
+        addMainCallback(saveFuture, callback);
+        addWsCallback(saveFuture, success -> onTimeseriesUpdate(entityId, ts));
     }
 
     @Override
-    public void removeSubscription(String sessionId, int cmdId) {
+    public void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
+        ListenableFuture<List<Void>> saveFuture = attrService.save(entityId, scope, attributes);
+        addMainCallback(saveFuture, callback);
+        addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
+    }
 
+    private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+        Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
+        if (!serverAddress.isPresent()) {
+            onLocalAttributesUpdate(entityId, scope, attributes);
+        } else {
+//            rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
+        }
     }
 
-    @Override
-    public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+    private void onTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
+        Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
+        if (!serverAddress.isPresent()) {
+            onLocalTimeseriesUpdate(entityId, ts);
+        } else {
+//            rpcHandler.onTimeseriesUpdate(ctx, serverAddress.get(), entityId, entries);
+        }
+    }
 
+    private void onLocalAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+        onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
+            List<TsKvEntry> subscriptionUpdate = null;
+            for (AttributeKvEntry kv : attributes) {
+                if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
+                    if (subscriptionUpdate == null) {
+                        subscriptionUpdate = new ArrayList<>();
+                    }
+                    subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
+                }
+            }
+            return subscriptionUpdate;
+        });
     }
 
-    @Override
-    public void onLocalTimeseriesUpdate(EntityId entityId, Map<Long, List<KvEntry>> ts) {
+    private void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
+        onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> {
+            List<TsKvEntry> subscriptionUpdate = null;
+            for (TsKvEntry kv : ts) {
+                if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
+                    if (subscriptionUpdate == null) {
+                        subscriptionUpdate = new ArrayList<>();
+                    }
+                    subscriptionUpdate.add(kv);
+                }
+            }
+            return subscriptionUpdate;
+        });
+    }
 
+    private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
+        Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
+        if (deviceSubscriptions != null) {
+            deviceSubscriptions.stream().filter(filter).forEach(s -> {
+                String sessionId = s.getWsSessionId();
+                List<TsKvEntry> subscriptionUpdate = f.apply(s);
+                if (subscriptionUpdate == null || !subscriptionUpdate.isEmpty()) {
+                    SubscriptionUpdate update = new SubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
+                    if (s.isLocal()) {
+                        updateSubscriptionState(sessionId, s, update);
+                        wsService.sendWsMsg(sessionId, update);
+                    } else {
+                        //TODO: ashvayka
+//                        rpcHandler.onSubscriptionUpdate(ctx, s.getServer(), sessionId, update);
+                    }
+                }
+            });
+        } else {
+            log.debug("[{}] No device subscriptions to process!", entityId);
+        }
     }
 
-    @Override
-    public void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes) {
+    private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
+        log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
+        update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
+    }
+
+    private void registerSubscription(String sessionId, EntityId entityId, Subscription subscription) {
+        Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.computeIfAbsent(entityId, k -> new HashSet<>());
+        deviceSubscriptions.add(subscription);
+        Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.computeIfAbsent(sessionId, k -> new HashMap<>());
+        sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
+    }
+
+    public void cleanupLocalWsSessionSubscriptions(String sessionId) {
+        cleanupWsSessionSubscriptions(sessionId, true);
+    }
+
+    public void cleanupRemoteWsSessionSubscriptions(String sessionId) {
+        cleanupWsSessionSubscriptions(sessionId, false);
+    }
+
+    private void cleanupWsSessionSubscriptions(String sessionId, boolean localSession) {
+        log.debug("[{}] Removing all subscriptions for particular session.", sessionId);
+        Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
+        if (sessionSubscriptions != null) {
+            int sessionSubscriptionSize = sessionSubscriptions.size();
+
+            for (Subscription subscription : sessionSubscriptions.values()) {
+                EntityId entityId = subscription.getEntityId();
+                Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
+                deviceSubscriptions.remove(subscription);
+                if (deviceSubscriptions.isEmpty()) {
+                    subscriptionsByEntityId.remove(entityId);
+                }
+            }
+            subscriptionsByWsSessionId.remove(sessionId);
+            log.debug("[{}] Removed {} subscriptions for particular session.", sessionId, sessionSubscriptionSize);
+
+            if (localSession) {
+                notifyWsSubscriptionClosed(sessionId, sessionSubscriptions);
+            }
+        } else {
+            log.debug("[{}] No subscriptions found!", sessionId);
+        }
+    }
+
+    private void notifyWsSubscriptionClosed(String sessionId, Map<Integer, Subscription> sessionSubscriptions) {
+        Set<ServerAddress> affectedServers = new HashSet<>();
+        for (Subscription subscription : sessionSubscriptions.values()) {
+            if (subscription.getServer() != null) {
+                affectedServers.add(subscription.getServer());
+            }
+        }
+        for (ServerAddress address : affectedServers) {
+            log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address);
+//            rpcHandler.onSessionClose(ctx, address, sessionId);
+        }
+    }
+
+    private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) {
+        EntityId entityId = subscription.getEntityId();
+        if (subscription.isLocal() && subscription.getServer() != null) {
+//            rpcHandler.onSubscriptionClose(ctx, subscription.getServer(), sessionId, subscription.getSubscriptionId());
+        }
+        if (sessionSubscriptions.isEmpty()) {
+            log.debug("[{}] Removed last subscription for particular session.", sessionId);
+            subscriptionsByWsSessionId.remove(sessionId);
+        } else {
+            log.debug("[{}] Removed session subscription.", sessionId);
+        }
+        Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
+        if (deviceSubscriptions != null) {
+            boolean result = deviceSubscriptions.remove(subscription);
+            if (result) {
+                if (deviceSubscriptions.size() == 0) {
+                    log.debug("[{}] Removed last subscription for particular device.", sessionId);
+                    subscriptionsByEntityId.remove(entityId);
+                } else {
+                    log.debug("[{}] Removed device subscription.", sessionId);
+                }
+            } else {
+                log.debug("[{}] Subscription not found!", sessionId);
+            }
+        } else {
+            log.debug("[{}] No device subscriptions found!", sessionId);
+        }
+    }
+
+    private void addMainCallback(ListenableFuture<List<Void>> saveFuture, final FutureCallback<Void> callback) {
+        Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(@Nullable List<Void> result) {
+                callback.onSuccess(null);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        }, tsCallBackExecutor);
+    }
+
+    private void addWsCallback(ListenableFuture<List<Void>> saveFuture, Consumer<Void> callback) {
+        Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(@Nullable List<Void> result) {
+                callback.accept(null);
+            }
 
+            @Override
+            public void onFailure(Throwable t) {
+            }
+        }, wsCallBackExecutor);
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 57dd93d..57f3876 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -24,6 +39,7 @@ import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
@@ -31,9 +47,9 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionC
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
 import org.thingsboard.server.service.security.AccessValidator;
 
@@ -146,6 +162,14 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
         }
     }
 
+    @Override
+    public void sendWsMsg(String sessionId, SubscriptionUpdate update) {
+        WsSessionMetaData md = wsSessionsMap.get(sessionId);
+        if (md != null) {
+            sendWsMsg(md.getSessionRef(), update);
+        }
+    }
+
     private void handleWsAttributesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
         String sessionId = sessionRef.getSessionId();
         log.debug("[{}] Processing: {}", sessionId, cmd);
@@ -180,7 +204,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
                 keys.forEach(key -> subState.put(key, 0L));
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
             }
 
@@ -267,7 +291,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
                 Map<String, Long> subState = new HashMap<>(attributesData.size());
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
             }
 
@@ -340,7 +364,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
                 sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
                 Map<String, Long> subState = new HashMap<>(data.size());
                 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
             }
 
@@ -370,7 +394,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
                 Map<String, Long> subState = new HashMap<>(keys.size());
                 keys.forEach(key -> subState.put(key, startTs));
                 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
             }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 44e8512..7bf223f 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -1,5 +1,21 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
+import com.google.common.util.concurrent.FutureCallback;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
@@ -15,17 +31,15 @@ import java.util.Set;
  */
 public interface TelemetrySubscriptionService {
 
-    void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
-
-    void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries);
+    void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
 
     void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
 
     void removeSubscription(String sessionId, int cmdId);
 
-    void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
+    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
 
-    void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts);
+    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
 
-    void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes);
+    void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
index a7e7cad..00fb80a 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
 import java.io.IOException;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
index 883a174..be6fc56 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
@@ -1,6 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
 
 /**
  * Created by ashvayka on 27.03.18.
@@ -10,4 +26,6 @@ public interface TelemetryWebSocketService {
     void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent sessionEvent);
 
     void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
+
+    void sendWsMsg(String sessionId, SubscriptionUpdate update);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
index 3fd4b19..53438c5 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
 import lombok.Getter;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
index 6d57122..5d4630c 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
 import lombok.Data;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
index c0b162b..dd15ed3 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.telemetry;
 
 import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
index d116b2e..f30f1a5 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
@@ -16,7 +16,7 @@
 package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
 
 import lombok.NoArgsConstructor;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 
 /**
  * @author Andrew Shvayka
@@ -25,8 +25,8 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
 public class AttributesSubscriptionCmd extends SubscriptionCmd {
 
     @Override
-    public SubscriptionType getType() {
-        return SubscriptionType.ATTRIBUTES;
+    public TelemetryFeature getType() {
+        return TelemetryFeature.ATTRIBUTES;
     }
 
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
index b06476a..7f78abd 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
@@ -18,7 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 
 @NoArgsConstructor
 @AllArgsConstructor
@@ -32,7 +32,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
     private String scope;
     private boolean unsubscribe;
 
-    public abstract SubscriptionType getType();
+    public abstract TelemetryFeature getType();
 
     @Override
     public String toString() {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
index 4d64ca7..88ecb2d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
@@ -18,7 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 
 /**
  * @author Andrew Shvayka
@@ -35,7 +35,7 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
     private String agg;
 
     @Override
-    public SubscriptionType getType() {
-        return SubscriptionType.TIMESERIES;
+    public TelemetryFeature getType() {
+        return TelemetryFeature.TIMESERIES;
     }
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index 158945c..1acc29d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
         }
         Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
         Subscription subscription = new Subscription(
-                new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
+                new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
                 false, msg.getServerAddress());
         subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
     }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index 4fdfe4a..9cb67fd 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -24,7 +24,11 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.msg.core.*;
+import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
+import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
+import org.thingsboard.server.common.msg.core.GetAttributesRequest;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
@@ -35,9 +39,13 @@ import org.thingsboard.server.extensions.api.plugins.msg.TelemetryUploadRequestR
 import org.thingsboard.server.extensions.api.plugins.msg.UpdateAttributesRequestRuleToPluginMsg;
 import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -97,7 +105,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
             @Override
             public void onSuccess(PluginContext ctx, Void data) {
                 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
-                subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), SubscriptionType.TIMESERIES, s ->
+                subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), TelemetryFeature.TIMESERIES, s ->
                     prepareSubscriptionUpdate(request, s)
                 );
             }
@@ -131,7 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
                     public void onSuccess(PluginContext ctx, Void value) {
                         ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
 
-                        subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), SubscriptionType.ATTRIBUTES, s -> {
+                        subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), TelemetryFeature.ATTRIBUTES, s -> {
                             List<TsKvEntry> subscriptionUpdate = new ArrayList<>();
                             for (AttributeKvEntry kv : request.getAttributes()) {
                                 if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 1374ef6..8c80e78 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -21,7 +21,12 @@ import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
-import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.common.data.kv.Aggregation;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
@@ -32,14 +37,26 @@ import org.thingsboard.server.extensions.api.plugins.ws.msg.BinaryPluginWebSocke
 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
 import org.thingsboard.server.extensions.api.plugins.ws.msg.TextPluginWebSocketMsg;
 import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
-import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.*;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -131,7 +148,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 keys.forEach(key -> subState.put(key, 0L));
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
@@ -168,7 +185,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 Map<String, Long> subState = new HashMap<>(attributesData.size());
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
@@ -234,7 +251,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
                 Map<String, Long> subState = new HashMap<>(data.size());
                 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
@@ -262,7 +279,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 Map<String, Long> subState = new HashMap<>(keys.size());
                 keys.forEach(key -> subState.put(key, startTs));
                 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
index fc04713..98c7632 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
@@ -20,6 +20,7 @@ import lombok.Data;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 
 import java.util.Map;
 
@@ -47,7 +48,7 @@ public class Subscription {
         return getSub().getEntityId();
     }
 
-    public SubscriptionType getType() {
+    public TelemetryFeature getType() {
         return getSub().getType();
     }
 
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
index c9598ef..e4a0d26 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.sub;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 
 import java.util.Map;
 
@@ -30,7 +31,7 @@ public class SubscriptionState {
     @Getter private final String wsSessionId;
     @Getter private final int subscriptionId;
     @Getter private final EntityId entityId;
-    @Getter private final SubscriptionType type;
+    @Getter private final TelemetryFeature type;
     @Getter private final boolean allKeys;
     @Getter private final Map<String, Long> keyStates;
     @Getter private final String scope;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 9d9c2f3..0c3b174 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -19,20 +19,30 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler;
 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -70,7 +80,7 @@ public class SubscriptionManager {
         EntityId entityId = subscription.getEntityId();
         log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
         registerSubscription(sessionId, entityId, subscription);
-        if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
+        if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
             final Map<String, Long> keyStates = subscription.getKeyStates();
             ctx.loadAttributes(entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback<List<AttributeKvEntry>>() {
                 @Override
@@ -91,7 +101,7 @@ public class SubscriptionManager {
                     log.error("Failed to fetch missed updates.", e);
                 }
             });
-        } else if (subscription.getType() == SubscriptionType.TIMESERIES) {
+        } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
             long curTs = System.currentTimeMillis();
             List<TsKvQuery> queries = new ArrayList<>();
             subscription.getKeyStates().entrySet().forEach(e -> {
@@ -175,7 +185,7 @@ public class SubscriptionManager {
         }
     }
 
-    public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
+    public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, TelemetryFeature type, Function<Subscription, List<TsKvEntry>> f) {
         onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f);
     }
 
@@ -212,7 +222,7 @@ public class SubscriptionManager {
     public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
         Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
         if (!serverAddress.isPresent()) {
-            onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
+            onLocalSubscriptionUpdate(ctx, entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
                 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
                 for (AttributeKvEntry kv : attributes) {
                     if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
@@ -229,7 +239,7 @@ public class SubscriptionManager {
     public void onTimeseriesUpdateFromServer(PluginContext ctx, EntityId entityId, List<TsKvEntry> entries) {
         Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
         if (!serverAddress.isPresent()) {
-            onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.TIMESERIES, s -> {
+            onLocalSubscriptionUpdate(ctx, entityId, TelemetryFeature.TIMESERIES, s -> {
                 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
                 for (TsKvEntry kv : entries) {
                     if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 6b89470..44bd3f5 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,12 +63,6 @@ public interface TbContext {
 
     void tellError(TbMsg msg, Throwable th);
 
-    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
-
-    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
-
-    void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback);
-
     RuleNodeId getSelfId();
 
     AttributesService getAttributesService();
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
index 6df9830..46be7f8 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -89,7 +89,7 @@ public class TbTransformMsgNodeTest {
 
     @Test
     public void payloadCanBeUpdated() throws TbNodeException {
-        initWithScript("return msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine' ");
+        initWithScript("msg.passed = msg.passed * metadata.temp; return msg.bigObj.newProp = 'Ukraine' ");
         TbMsgMetaData metaData = new TbMsgMetaData();
         metaData.putValue("temp", "7");
         metaData.putValue("humidity", "99");