thingsboard-memoizeit

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 9e02946..3cc211c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -61,12 +61,17 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.component.ComponentDiscoveryService;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 @Slf4j
 @Component
@@ -158,6 +163,10 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
+    private TelemetrySubscriptionService tsSubService;
+
+    @Autowired
+    @Getter
     @Setter
     private PluginWebSocketMsgEndpoint wsMsgEndpoint;
 
@@ -224,6 +233,21 @@ public class ActorSystemContext {
     @Getter
     private final Config config;
 
+    @Getter
+    private ExecutorService tsCallBackExecutor;
+
+    @PostConstruct
+    public void initExecutor() {
+        tsCallBackExecutor = Executors.newSingleThreadExecutor();
+    }
+
+    @PreDestroy
+    public void shutdownExecutor() {
+        if (tsCallBackExecutor != null) {
+            tsCallBackExecutor.shutdownNow();
+        }
+    }
+
     public ActorSystemContext() {
         config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
     }
@@ -345,7 +369,7 @@ public class ActorSystemContext {
         return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
     }
 
-    public ListeningExecutor getExecutor() {
+    public ListeningExecutor getJsExecutor() {
         //TODO: take thread count from yml.
         return new JsExecutorService(1);
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 1d45b61..b3b0072 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,12 +15,18 @@
  */
 package org.thingsboard.server.actors.ruleChain;
 
-import akka.actor.ActorContext;
 import akka.actor.ActorRef;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
 import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.alarm.AlarmService;
@@ -35,6 +41,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.dao.user.UserService;
 import scala.concurrent.duration.Duration;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -43,6 +51,7 @@ import java.util.concurrent.TimeUnit;
  */
 class DefaultTbContext implements TbContext {
 
+    private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;
     private final ActorSystemContext mainCtx;
     private final RuleNodeCtx nodeCtx;
 
@@ -113,8 +122,35 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
+    public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
+        saveAndNotify(entityId, ts, 0L, callback);
+    }
+
+    @Override
+    public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
+        ListenableFuture<List<Void>> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl);
+        Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(@Nullable List<Void> result) {
+                mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts);
+                callback.onSuccess(null);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        }, mainCtx.getTsCallBackExecutor());
+    }
+
+    @Override
+    public void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
+
+    }
+
+    @Override
     public ListeningExecutor getJsExecutor() {
-        return mainCtx.getExecutor();
+        return mainCtx.getJsExecutor();
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java b/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java
new file mode 100644
index 0000000..fb1f3e7
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.controller;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.service.security.ValidationCallback;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class HttpValidationCallback extends ValidationCallback<DeferredResult<ResponseEntity>> {
+
+    public HttpValidationCallback(DeferredResult<ResponseEntity> response, FutureCallback<DeferredResult<ResponseEntity>> action) {
+       super(response, action);
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index a2867ce..c2a2df8 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -69,6 +69,7 @@ import org.thingsboard.server.service.security.model.SecurityUser;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
 import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -101,6 +102,7 @@ public class TelemetryController extends BaseController {
 
     private ExecutorService executor;
 
+    @PostConstruct
     public void initExecutor() {
         executor = Executors.newSingleThreadExecutor();
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
index 01bd238..02e12fe 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.rule.RuleChain;
-import org.thingsboard.server.controller.ValidationCallback;
+import org.thingsboard.server.controller.HttpValidationCallback;
 import org.thingsboard.server.dao.alarm.AlarmService;
 import org.thingsboard.server.dao.asset.AssetService;
 import org.thingsboard.server.dao.customer.CustomerService;
@@ -91,7 +91,7 @@ public class AccessValidator {
 
         final DeferredResult<ResponseEntity> response = new DeferredResult<>();
 
-        validate(currentUser, entityId, new ValidationCallback(response,
+        validate(currentUser, entityId, new HttpValidationCallback(response,
                 new FutureCallback<DeferredResult<ResponseEntity>>() {
                     @Override
                     public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@@ -107,7 +107,7 @@ public class AccessValidator {
         return response;
     }
 
-    public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+    public <T> void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         switch (entityId.getEntityType()) {
             case DEVICE:
                 validateDevice(currentUser, entityId, callback);
@@ -130,7 +130,7 @@ public class AccessValidator {
         }
     }
 
-    private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+    private void validateDevice(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isSystemAdmin()) {
             callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -151,7 +151,7 @@ public class AccessValidator {
         }
     }
 
-    private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+    private <T> void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isSystemAdmin()) {
             callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -173,7 +173,7 @@ public class AccessValidator {
     }
 
 
-    private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+    private <T> void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isCustomerUser()) {
             callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -194,7 +194,7 @@ public class AccessValidator {
         }
     }
 
-    private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+    private <T> void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isSystemAdmin()) {
             callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else {
@@ -215,7 +215,7 @@ public class AccessValidator {
         }
     }
 
-    private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+    private <T> void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
         if (currentUser.isCustomerUser()) {
             callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
         } else if (currentUser.isSystemAdmin()) {
@@ -234,7 +234,7 @@ public class AccessValidator {
         }
     }
 
-    private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {
+    private <T> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult> transformer) {
         return new FutureCallback<T>() {
             @Override
             public void onSuccess(@Nullable T result) {
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 359949e..81cdac9 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -5,8 +5,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 
 import java.util.HashMap;
 import java.util.List;
@@ -23,13 +25,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     @Autowired
     private TelemetryWebSocketService wsService;
 
-
     private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
 
     private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
 
-
-
     @Override
     public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
 
@@ -39,4 +38,29 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
 
     }
+
+    @Override
+    public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
+
+    }
+
+    @Override
+    public void removeSubscription(String sessionId, int cmdId) {
+
+    }
+
+    @Override
+    public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+
+    }
+
+    @Override
+    public void onLocalTimeseriesUpdate(EntityId entityId, Map<Long, List<KvEntry>> ts) {
+
+    }
+
+    @Override
+    public void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes) {
+
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 6d6c33e..57dd93d 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -2,36 +2,46 @@ package org.thingsboard.server.service.telemetry;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.hazelcast.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
+import org.thingsboard.server.actors.plugin.ValidationResult;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
-import org.thingsboard.server.extensions.api.plugins.PluginCallback;
-import org.thingsboard.server.extensions.api.plugins.PluginContext;
-import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+import org.thingsboard.server.service.security.AccessValidator;
 
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -41,6 +51,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 /**
@@ -50,6 +62,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
 
+    public static final int DEFAULT_LIMIT = 100;
+    public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
     private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
     private static final String PROCESSING_MSG = "[{}] Processing: {}";
     private static final ObjectMapper jsonMapper = new ObjectMapper();
@@ -66,11 +80,28 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
     private TelemetryWebSocketMsgEndpoint msgEndpoint;
 
     @Autowired
+    private AccessValidator accessValidator;
+
+    @Autowired
     private AttributesService attributesService;
 
     @Autowired
     private TimeseriesService tsService;
 
+    private ExecutorService executor;
+
+    @PostConstruct
+    public void initExecutor() {
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @PreDestroy
+    public void shutdownExecutor() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+
     @Override
     public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
         String sessionId = sessionRef.getSessionId();
@@ -169,44 +200,190 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
         };
 
         if (StringUtils.isEmpty(cmd.getScope())) {
-            //ValidationCallback?
-            ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback);
+            accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, keys, callback));
         } else {
-            ctx.loadAttributes(entityId, cmd.getScope(), keys, callback);
+            accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), keys, callback));
+        }
+    }
+
+    private void handleWsHistoryCmd(TelemetryWebSocketSessionRef sessionRef, GetHistoryCmd cmd) {
+        String sessionId = sessionRef.getSessionId();
+        WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
+        if (sessionMD == null) {
+            log.warn("[{}] Session meta data not found. ", sessionId);
+            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                    SESSION_META_DATA_NOT_FOUND);
+            sendWsMsg(sessionRef, update);
+            return;
+        }
+        if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
+            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
+                    "Device id is empty!");
+            sendWsMsg(sessionRef, update);
+            return;
+        }
+        if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
+            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
+                    "Keys are empty!");
+            sendWsMsg(sessionRef, update);
+            return;
         }
+        EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
+        List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
+        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
+                .collect(Collectors.toList());
+
+        FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
+            @Override
+            public void onSuccess(List<TsKvEntry> data) {
+                sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                SubscriptionUpdate update;
+                if (UnauthorizedException.class.isInstance(e)) {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+                            SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+                } else {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                            FAILED_TO_FETCH_DATA);
+                }
+                sendWsMsg(sessionRef, update);
+            }
+        };
+        accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+                on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure));
     }
 
-    private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef,
+    private void handleWsAttributesSubscription(TelemetryWebSocketSessionRef sessionRef,
                                                 AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
-        PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+        FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
             @Override
-            public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+            public void onSuccess(List<AttributeKvEntry> data) {
                 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
-                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+                sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 
                 Map<String, Long> subState = new HashMap<>(attributesData.size());
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
                 SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
-                subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
+                subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
             }
 
             @Override
-            public void onFailure(PluginContext ctx, Exception e) {
+            public void onFailure(Throwable e) {
                 log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
                 SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
                         FAILED_TO_FETCH_ATTRIBUTES);
-                sendWsMsg(ctx, sessionRef, update);
+                sendWsMsg(sessionRef, update);
             }
         };
 
+
         if (StringUtils.isEmpty(cmd.getScope())) {
-            ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback);
+            accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, callback));
+        } else {
+            accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), callback));
+        }
+    }
+
+    private void handleWsTimeseriesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd) {
+        String sessionId = sessionRef.getSessionId();
+        log.debug("[{}] Processing: {}", sessionId, cmd);
+
+        if (validateSessionMetadata(sessionRef, cmd, sessionId)) {
+            if (cmd.isUnsubscribe()) {
+                unsubscribe(sessionRef, cmd, sessionId);
+            } else if (validateSubscriptionCmd(sessionRef, cmd)) {
+                EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
+                Optional<Set<String>> keysOptional = getKeys(cmd);
+
+                if (keysOptional.isPresent()) {
+                    handleWsTimeseriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId);
+                } else {
+                    handleWsTimeseriesSubscription(sessionRef, cmd, sessionId, entityId);
+                }
+            }
+        }
+    }
+
+    private void handleWsTimeseriesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
+                                                      TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
+        long startTs;
+        if (cmd.getTimeWindow() > 0) {
+            List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
+            log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
+            startTs = cmd.getStartTs();
+            long endTs = cmd.getStartTs() + cmd.getTimeWindow();
+            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
+                    getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
+
+            final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
+            accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+                    on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure));
         } else {
-            ctx.loadAttributes(entityId, cmd.getScope(), callback);
+            List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
+            startTs = System.currentTimeMillis();
+            log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), entityId);
+            final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
+            accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+                    on(r -> Futures.addCallback(tsService.findLatest(entityId, keys), callback, executor), callback::onFailure));
         }
     }
 
+    private void handleWsTimeseriesSubscription(TelemetryWebSocketSessionRef sessionRef,
+                                                TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
+        FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
+            @Override
+            public void onSuccess(List<TsKvEntry> data) {
+                sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+                Map<String, Long> subState = new HashMap<>(data.size());
+                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
+                subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                SubscriptionUpdate update;
+                if (UnauthorizedException.class.isInstance(e)) {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+                            SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+                } else {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                            FAILED_TO_FETCH_DATA);
+                }
+                sendWsMsg(sessionRef, update);
+            }
+        };
+        accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
+                on(r -> Futures.addCallback(tsService.findAllLatest(entityId), callback, executor), callback::onFailure));
+    }
+
+    private FutureCallback<List<TsKvEntry>> getSubscriptionCallback(final TelemetryWebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List<String> keys) {
+        return new FutureCallback<List<TsKvEntry>>() {
+            @Override
+            public void onSuccess(List<TsKvEntry> data) {
+                sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+
+                Map<String, Long> subState = new HashMap<>(keys.size());
+                keys.forEach(key -> subState.put(key, startTs));
+                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
+                subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                log.error(FAILED_TO_FETCH_DATA, e);
+                SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                        FAILED_TO_FETCH_DATA);
+                sendWsMsg(sessionRef, update);
+            }
+        };
+    }
+
     private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
         if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
             subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
@@ -258,4 +435,105 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
         }
     }
 
+    private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) {
+        return Futures.transform(Futures.successfulAsList(futures),
+                (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+                    List<AttributeKvEntry> tmp = new ArrayList<>();
+                    if (input != null) {
+                        input.forEach(tmp::addAll);
+                    }
+                    return tmp;
+                }, executor);
+    }
+
+    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final List<String> keys, final FutureCallback<List<AttributeKvEntry>> callback) {
+        return new FutureCallback<ValidationResult>() {
+            @Override
+            public void onSuccess(@Nullable ValidationResult result) {
+                List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+                for (String scope : DataConstants.allScopes()) {
+                    futures.add(attributesService.find(entityId, scope, keys));
+                }
+
+                ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
+                Futures.addCallback(future, callback);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        };
+    }
+
+    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final String scope, final List<String> keys, final FutureCallback<List<AttributeKvEntry>> callback) {
+        return new FutureCallback<ValidationResult>() {
+            @Override
+            public void onSuccess(@Nullable ValidationResult result) {
+                Futures.addCallback(attributesService.find(entityId, scope, keys), callback);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        };
+    }
+
+    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final FutureCallback<List<AttributeKvEntry>> callback) {
+        return new FutureCallback<ValidationResult>() {
+            @Override
+            public void onSuccess(@Nullable ValidationResult result) {
+                List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+                for (String scope : DataConstants.allScopes()) {
+                    futures.add(attributesService.findAll(entityId, scope));
+                }
+
+                ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
+                Futures.addCallback(future, callback);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        };
+    }
+
+    private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final String scope, final FutureCallback<List<AttributeKvEntry>> callback) {
+        return new FutureCallback<ValidationResult>() {
+            @Override
+            public void onSuccess(@Nullable ValidationResult result) {
+                Futures.addCallback(attributesService.findAll(entityId, scope), callback);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        };
+    }
+
+    private FutureCallback<ValidationResult> on(Consumer<ValidationResult> success, Consumer<Throwable> failure) {
+        return new FutureCallback<ValidationResult>() {
+            @Override
+            public void onSuccess(@Nullable ValidationResult result) {
+                success.accept(result);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                failure.accept(t);
+            }
+        };
+    }
+
+
+    private static Aggregation getAggregation(String agg) {
+        return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
+    }
+
+    private int getLimit(int limit) {
+        return limit == 0 ? DEFAULT_LIMIT : limit;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 9673629..44e8512 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -2,10 +2,13 @@ package org.thingsboard.server.service.telemetry;
 
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Created by ashvayka on 27.03.18.
@@ -21,4 +24,8 @@ public interface TelemetrySubscriptionService {
     void removeSubscription(String sessionId, int cmdId);
 
     void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
+
+    void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts);
+
+    void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes);
 }
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 46fa1d2..6b89470 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +15,12 @@
  */
 package org.thingsboard.rule.engine.api;
 
+import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.alarm.AlarmService;
@@ -30,6 +35,8 @@ import org.thingsboard.server.dao.rule.RuleService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.dao.user.UserService;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
@@ -56,6 +63,12 @@ public interface TbContext {
 
     void tellError(TbMsg msg, Throwable th);
 
+    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
+
+    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
+
+    void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback);
+
     RuleNodeId getSelfId();
 
     AttributesService getAttributesService();