thingsboard-memoizeit

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
index a75ecb1..d44c50e 100644
--- a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
+++ b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
@@ -19,7 +19,7 @@ import java.util.Map;
 
 import org.thingsboard.server.exception.ThingsboardErrorCode;
 import org.thingsboard.server.exception.ThingsboardException;
-import org.thingsboard.server.controller.plugin.PluginWebSocketHandler;
+import org.thingsboard.server.controller.plugin.TbWebSocketHandler;
 import org.thingsboard.server.service.security.model.SecurityUser;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -54,7 +54,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
 
     @Override
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
-        registry.addHandler(pluginWsHandler(), WS_PLUGIN_MAPPING).setAllowedOrigins("*")
+        registry.addHandler(wsHandler(), WS_PLUGIN_MAPPING).setAllowedOrigins("*")
                 .addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() {
 
                     @Override
@@ -82,8 +82,8 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
     }
 
     @Bean
-    public WebSocketHandler pluginWsHandler() {
-        return new PluginWebSocketHandler();
+    public WebSocketHandler wsHandler() {
+        return new TbWebSocketHandler();
     }
 
     protected SecurityUser getCurrentUser() throws ThingsboardException {
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java b/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java
index 8e3cee4..045835e 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java
@@ -48,59 +48,59 @@ import javax.servlet.http.HttpServletRequest;
 @Slf4j
 public class PluginApiController extends BaseController {
 
-    @SuppressWarnings("rawtypes")
-    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{pluginToken}/**")
-    @ResponseStatus(value = HttpStatus.OK)
-    public DeferredResult<ResponseEntity> processRequest(
-            @PathVariable("pluginToken") String pluginToken,
-            RequestEntity<byte[]> requestEntity,
-            HttpServletRequest request)
-            throws ThingsboardException {
-        log.debug("[{}] Going to process requst uri: {}", pluginToken, requestEntity.getUrl());
-        DeferredResult<ResponseEntity> result = new DeferredResult<ResponseEntity>();
-        PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);
-        if (pluginMd == null) {
-            result.setErrorResult(new PluginNotFoundException("Plugin with token: " + pluginToken + " not found!"));
-        } else {
-            TenantId tenantId = getCurrentUser().getTenantId();
-            CustomerId customerId = getCurrentUser().getCustomerId();
-            if (validatePluginAccess(pluginMd, tenantId, customerId)) {
-                if(tenantId != null && ModelConstants.NULL_UUID.equals(tenantId.getId())){
-                    tenantId = null;
-                }
-                UserId userId = getCurrentUser().getId();
-                String userName = getCurrentUser().getName();
-                PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(),
-                        tenantId, customerId, userId, userName);
-                actorService.process(new BasicPluginRestMsg(securityCtx, new RestRequest(requestEntity, request), result));
-            } else {
-                result.setResult(new ResponseEntity<>(HttpStatus.FORBIDDEN));
-            }
-
-        }
-        return result;
-    }
-
-    public static boolean validatePluginAccess(PluginMetaData pluginMd, TenantId tenantId, CustomerId customerId) {
-        boolean systemAdministrator = tenantId == null || ModelConstants.NULL_UUID.equals(tenantId.getId());
-        boolean tenantAdministrator = !systemAdministrator && (customerId == null || ModelConstants.NULL_UUID.equals(customerId.getId()));
-        boolean systemPlugin = ModelConstants.NULL_UUID.equals(pluginMd.getTenantId().getId());
-
-        boolean validUser = false;
-        if (systemPlugin) {
-            if (pluginMd.isPublicAccess() || systemAdministrator) {
-                // All users can access public system plugins. Only system
-                // users can access private system plugins
-                validUser = true;
-            }
-        } else {
-            if ((pluginMd.isPublicAccess() || tenantAdministrator) && tenantId != null && tenantId.equals(pluginMd.getTenantId())) {
-                // All tenant users can access public tenant plugins. Only tenant
-                // administrator can access private tenant plugins
-                validUser = true;
-            }
-        }
-        return validUser;
-    }
+//    @SuppressWarnings("rawtypes")
+//    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+//    @RequestMapping(value = "/{pluginToken}/**")
+//    @ResponseStatus(value = HttpStatus.OK)
+//    public DeferredResult<ResponseEntity> processRequest(
+//            @PathVariable("pluginToken") String pluginToken,
+//            RequestEntity<byte[]> requestEntity,
+//            HttpServletRequest request)
+//            throws ThingsboardException {
+//        log.debug("[{}] Going to process requst uri: {}", pluginToken, requestEntity.getUrl());
+//        DeferredResult<ResponseEntity> result = new DeferredResult<ResponseEntity>();
+//        PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);
+//        if (pluginMd == null) {
+//            result.setErrorResult(new PluginNotFoundException("Plugin with token: " + pluginToken + " not found!"));
+//        } else {
+//            TenantId tenantId = getCurrentUser().getTenantId();
+//            CustomerId customerId = getCurrentUser().getCustomerId();
+//            if (validatePluginAccess(pluginMd, tenantId, customerId)) {
+//                if(tenantId != null && ModelConstants.NULL_UUID.equals(tenantId.getId())){
+//                    tenantId = null;
+//                }
+//                UserId userId = getCurrentUser().getId();
+//                String userName = getCurrentUser().getName();
+//                PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(),
+//                        tenantId, customerId, userId, userName);
+//                actorService.process(new BasicPluginRestMsg(securityCtx, new RestRequest(requestEntity, request), result));
+//            } else {
+//                result.setResult(new ResponseEntity<>(HttpStatus.FORBIDDEN));
+//            }
+//
+//        }
+//        return result;
+//    }
+//
+//    public static boolean validatePluginAccess(PluginMetaData pluginMd, TenantId tenantId, CustomerId customerId) {
+//        boolean systemAdministrator = tenantId == null || ModelConstants.NULL_UUID.equals(tenantId.getId());
+//        boolean tenantAdministrator = !systemAdministrator && (customerId == null || ModelConstants.NULL_UUID.equals(customerId.getId()));
+//        boolean systemPlugin = ModelConstants.NULL_UUID.equals(pluginMd.getTenantId().getId());
+//
+//        boolean validUser = false;
+//        if (systemPlugin) {
+//            if (pluginMd.isPublicAccess() || systemAdministrator) {
+//                // All users can access public system plugins. Only system
+//                // users can access private system plugins
+//                validUser = true;
+//            }
+//        } else {
+//            if ((pluginMd.isPublicAccess() || tenantAdministrator) && tenantId != null && tenantId.equals(pluginMd.getTenantId())) {
+//                // All tenant users can access public tenant plugins. Only tenant
+//                // administrator can access private tenant plugins
+//                validUser = true;
+//            }
+//        }
+//        return validUser;
+//    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 1944fa2..a2867ce 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -1,9 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.controller;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
@@ -11,47 +29,54 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.util.StringUtils;
 import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.context.request.async.DeferredResult;
-import org.thingsboard.server.actors.plugin.ValidationResult;
-import org.thingsboard.server.common.data.Customer;
 import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.Tenant;
-import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.audit.ActionType;
-import org.thingsboard.server.common.data.id.AssetId;
-import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
-import org.thingsboard.server.common.data.id.RuleChainId;
-import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.exception.ThingsboardException;
-import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
+import org.thingsboard.server.extensions.api.exception.InvalidParametersException;
+import org.thingsboard.server.extensions.api.exception.UncheckedApiException;
 import org.thingsboard.server.extensions.api.plugins.PluginConstants;
 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
+import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
+import org.thingsboard.server.service.security.AccessValidator;
 import org.thingsboard.server.service.security.model.SecurityUser;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
 import javax.annotation.Nullable;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /**
@@ -62,9 +87,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public class TelemetryController extends BaseController {
 
-    public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
-    public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
-    public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";
+    @Autowired
+    private TelemetrySubscriptionService subscriptionService;
 
     @Autowired
     private AttributesService attributesService;
@@ -72,6 +96,9 @@ public class TelemetryController extends BaseController {
     @Autowired
     private TimeseriesService tsService;
 
+    @Autowired
+    private AccessValidator accessValidator;
+
     private ExecutorService executor;
 
     public void initExecutor() {
@@ -87,117 +114,277 @@ public class TelemetryController extends BaseController {
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
+    @ResponseBody
     public DeferredResult<ResponseEntity> getAttributeKeys(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
-        return validateEntityAndCallback(entityType, entityIdStr,
-                this::getAttributeKeysCallback,
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, this::getAttributeKeysCallback);
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
+    @ResponseBody
     public DeferredResult<ResponseEntity> getAttributeKeysByScope(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr
             , @PathVariable("scope") String scope) throws ThingsboardException {
-        return validateEntityAndCallback(entityType, entityIdStr,
-                (result, entityId) -> getAttributeKeysCallback(result, entityId, scope),
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+                (result, entityId) -> getAttributeKeysCallback(result, entityId, scope));
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
+    @ResponseBody
     public DeferredResult<ResponseEntity> getAttributes(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
             @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
         SecurityUser user = getCurrentUser();
-        return validateEntityAndCallback(entityType, entityIdStr,
-                (result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr),
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+                (result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr));
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
+    @ResponseBody
     public DeferredResult<ResponseEntity> getAttributesByScope(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
             @PathVariable("scope") String scope,
             @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
         SecurityUser user = getCurrentUser();
-        return validateEntityAndCallback(entityType, entityIdStr,
-                (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+                (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr));
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
+    @ResponseBody
     public DeferredResult<ResponseEntity> getTimeseriesKeys(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
-        return validateEntityAndCallback(entityType, entityIdStr,
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
                 (result, entityId) -> {
                     Futures.addCallback(tsService.findAllLatest(entityId), getTsKeysToResponseCallback(result));
-                },
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+                });
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
+    @ResponseBody
     public DeferredResult<ResponseEntity> getLatestTimeseries(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
-            @PathVariable("scope") String scope,
             @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
         SecurityUser user = getCurrentUser();
 
-        return validateEntityAndCallback(entityType, entityIdStr,
-                (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+                (result, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr));
     }
 
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
     @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
-    @ResponseStatus(value = HttpStatus.OK)
-    public DeferredResult<ResponseEntity> getLatestTimeseries(
+    @ResponseBody
+    public DeferredResult<ResponseEntity> getTimeseries(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
-            @PathVariable("scope") String scope,
-            @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
-        SecurityUser user = getCurrentUser();
+            @RequestParam(name = "keys") String keys,
+            @RequestParam(name = "startTs") Long startTs,
+            @RequestParam(name = "endTs") Long endTs,
+            @RequestParam(name = "interval", defaultValue = "0") Long interval,
+            @RequestParam(name = "limit", defaultValue = "100") Integer limit,
+            @RequestParam(name = "agg", defaultValue = "NONE") String aggStr
+    ) throws ThingsboardException {
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+                (result, entityId) -> {
+                    // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
+                    Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
+                    List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
+                            .collect(Collectors.toList());
+
+                    Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
+                });
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> saveDeviceAttributes(@PathVariable("deviceId") String deviceIdStr, @PathVariable("scope") String scope,
+                                                               @RequestBody JsonNode request) throws ThingsboardException {
+        EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
+        return saveAttributes(entityId, scope, request);
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> saveEntityAttributesV1(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+                                                                 @PathVariable("scope") String scope,
+                                                                 @RequestBody JsonNode request) throws ThingsboardException {
+        EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+        return saveAttributes(entityId, scope, request);
+    }
 
-        return validateEntityAndCallback(entityType, entityIdStr,
-                (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),
-                (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+                                                                 @PathVariable("scope") String scope,
+                                                                 @RequestBody JsonNode request) throws ThingsboardException {
+        EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+        return saveAttributes(entityId, scope, request);
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+                                                              @PathVariable("scope") String scope,
+                                                              @RequestBody String requestBody) throws ThingsboardException {
+        EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+        return saveTelemetry(entityId, requestBody, 0L);
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+                                                                     @PathVariable("scope") String scope, @PathVariable("ttl") Long ttl,
+                                                                     @RequestBody String requestBody) throws ThingsboardException {
+        EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+        return saveTelemetry(entityId, requestBody, ttl);
     }
 
-    private DeferredResult<ResponseEntity> validateEntityAndCallback(String entityType, String entityIdStr,
-                                                                     BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
-        final DeferredResult<ResponseEntity> response = new DeferredResult<>();
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("deviceId") String deviceIdStr,
+                                                                 @PathVariable("scope") String scope,
+                                                                 @RequestParam(name = "keys") String keysStr) throws ThingsboardException {
+        EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
+        return deleteAttributes(entityId, scope, keysStr);
+    }
+
+    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+    @RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.DELETE)
+    @ResponseBody
+    public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+                                                                 @PathVariable("scope") String scope,
+                                                                 @RequestParam(name = "keys") String keysStr) throws ThingsboardException {
         EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+        return deleteAttributes(entityId, scope, keysStr);
+    }
+
+    private DeferredResult<ResponseEntity> deleteAttributes(EntityId entityIdStr, String scope, String keysStr) throws ThingsboardException {
+        List<String> keys = toKeysList(keysStr);
+        if (keys.isEmpty()) {
+            return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
+        }
+        SecurityUser user = getCurrentUser();
+        if (DataConstants.SERVER_SCOPE.equals(scope) ||
+                DataConstants.SHARED_SCOPE.equals(scope) ||
+                DataConstants.CLIENT_SCOPE.equals(scope)) {
+            return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdStr, (result, entityId) -> {
+                ListenableFuture<List<Void>> future = attributesService.removeAll(entityId, scope, keys);
+                Futures.addCallback(future, new FutureCallback<List<Void>>() {
+                    @Override
+                    public void onSuccess(@Nullable List<Void> tmp) {
+                        logAttributesDeleted(user, entityId, scope, keys, null);
+                        result.setResult(new ResponseEntity<>(HttpStatus.OK));
+                    }
 
-        validate(getCurrentUser(), entityId, new ValidationCallback(response,
-                new FutureCallback<DeferredResult<ResponseEntity>>() {
                     @Override
-                    public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
-                        onSuccess.accept(response, entityId);
+                    public void onFailure(Throwable t) {
+                        logAttributesDeleted(user, entityId, scope, keys, t);
+                        result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+                    }
+                }, executor);
+            });
+        } else {
+            return getImmediateDeferredResult("Invalid attribute scope: " + scope, HttpStatus.BAD_REQUEST);
+        }
+    }
+
+    private DeferredResult<ResponseEntity> saveAttributes(EntityId entityIdSrc, String scope, JsonNode json) throws ThingsboardException {
+        if (!DataConstants.SERVER_SCOPE.equals(scope) && !DataConstants.SHARED_SCOPE.equals(scope)) {
+            return getImmediateDeferredResult("Invalid scope: " + scope, HttpStatus.BAD_REQUEST);
+        }
+        if (json.isObject()) {
+            List<AttributeKvEntry> attributes = extractRequestAttributes(json);
+            if (attributes.isEmpty()) {
+                return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST);
+            }
+            SecurityUser user = getCurrentUser();
+            return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
+                ListenableFuture<List<Void>> future = attributesService.save(entityId, scope, attributes);
+                Futures.addCallback(future, new FutureCallback<List<Void>>() {
+                    @Override
+                    public void onSuccess(@Nullable List<Void> tmp) {
+                        logAttributesUpdated(user, entityId, scope, attributes, null);
+                        result.setResult(new ResponseEntity(HttpStatus.OK));
+                        subscriptionService.onAttributesUpdateFromServer(entityId, scope, attributes);
                     }
 
                     @Override
                     public void onFailure(Throwable t) {
-                        onFailure.accept(response, t);
+                        logAttributesUpdated(user, entityId, scope, attributes, t);
+                        AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
                     }
-                }));
+                });
+                result.setResult(new ResponseEntity(HttpStatus.OK));
+            });
+        } else {
+            return getImmediateDeferredResult("Request is not a JSON object", HttpStatus.BAD_REQUEST);
+        }
+    }
+
+    private DeferredResult<ResponseEntity> saveTelemetry(EntityId entityIdSrc, String requestBody, long ttl) throws ThingsboardException {
+        TelemetryUploadRequest telemetryRequest;
+        JsonElement telemetryJson;
+        try {
+            telemetryJson = new JsonParser().parse(requestBody);
+        } catch (Exception e) {
+            return getImmediateDeferredResult("Unable to parse timeseries payload: Invalid JSON body!", HttpStatus.BAD_REQUEST);
+        }
+        try {
+            telemetryRequest = JsonConverter.convertToTelemetry(telemetryJson);
+        } catch (Exception e) {
+            return getImmediateDeferredResult("Unable to parse timeseries payload. Invalid JSON body: " + e.getMessage(), HttpStatus.BAD_REQUEST);
+        }
+        List<TsKvEntry> entries = new ArrayList<>();
+        for (Map.Entry<Long, List<KvEntry>> entry : telemetryRequest.getData().entrySet()) {
+            for (KvEntry kv : entry.getValue()) {
+                entries.add(new BasicTsKvEntry(entry.getKey(), kv));
+            }
+        }
+        if (entries.isEmpty()) {
+            return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST);
+        }
+        SecurityUser user = getCurrentUser();
+        return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
+            ListenableFuture<List<Void>> future = tsService.save(entityId, entries, ttl);
+            Futures.addCallback(future, new FutureCallback<List<Void>>() {
+                @Override
+                public void onSuccess(@Nullable List<Void> tmp) {
+                    result.setResult(new ResponseEntity(HttpStatus.OK));
+                    subscriptionService.onTimeseriesUpdateFromServer(entityId, entries);
+                }
 
-        return response;
+                @Override
+                public void onFailure(Throwable t) {
+                    AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
+                }
+            });
+            result.setResult(new ResponseEntity(HttpStatus.OK));
+        });
     }
 
-    private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {
-        List<String> keyList = null;
-        if (!StringUtils.isEmpty(keys)) {
-            keyList = Arrays.asList(keys.split(","));
+    private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys) {
+        ListenableFuture<List<TsKvEntry>> future;
+        if (StringUtils.isEmpty(keys)) {
+            future = tsService.findAllLatest(entityId);
+        } else {
+            future = tsService.findLatest(entityId, toKeysList(keys));
         }
+        Futures.addCallback(future, getTsKvListCallback(result));
+    }
+
+    private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {
+        List<String> keyList = toKeysList(keys);
         FutureCallback<List<AttributeKvEntry>> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList);
         if (!StringUtils.isEmpty(scope)) {
             if (keyList != null && !keyList.isEmpty()) {
@@ -247,7 +434,7 @@ public class TelemetryController extends BaseController {
             @Override
             public void onFailure(Throwable e) {
                 log.error("Failed to fetch attributes", e);
-                handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+                AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
             }
         };
     }
@@ -264,12 +451,13 @@ public class TelemetryController extends BaseController {
             @Override
             public void onFailure(Throwable e) {
                 log.error("Failed to fetch attributes", e);
-                handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+                AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
             }
         };
     }
 
-    private FutureCallback<List<AttributeKvEntry>> getAttributeValuesToResponseCallback(final DeferredResult<ResponseEntity> response, final SecurityUser user, final String scope,
+    private FutureCallback<List<AttributeKvEntry>> getAttributeValuesToResponseCallback(final DeferredResult<ResponseEntity> response,
+                                                                                        final SecurityUser user, final String scope,
                                                                                         final EntityId entityId, final List<String> keyList) {
         return new FutureCallback<List<AttributeKvEntry>>() {
             @Override
@@ -284,12 +472,32 @@ public class TelemetryController extends BaseController {
             public void onFailure(Throwable e) {
                 log.error("Failed to fetch attributes", e);
                 logAttributesRead(user, entityId, scope, keyList, e);
-                handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+                AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
             }
         };
     }
 
-    private void logAttributesRead(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
+    private FutureCallback<List<TsKvEntry>> getTsKvListCallback(final DeferredResult<ResponseEntity> response) {
+        return new FutureCallback<List<TsKvEntry>>() {
+            @Override
+            public void onSuccess(List<TsKvEntry> data) {
+                Map<String, List<TsData>> result = new LinkedHashMap<>();
+                for (TsKvEntry entry : data) {
+                    result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>())
+                            .add(new TsData(entry.getTs(), entry.getValueAsString()));
+                }
+                response.setResult(new ResponseEntity<>(result, HttpStatus.OK));
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                log.error("Failed to fetch historical data", e);
+                AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+            }
+        };
+    }
+
+    private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
         auditLogService.logEntityAction(
                 user.getTenantId(),
                 user.getCustomerId(),
@@ -297,163 +505,39 @@ public class TelemetryController extends BaseController {
                 user.getName(),
                 (UUIDBased & EntityId) entityId,
                 null,
-                ActionType.ATTRIBUTES_READ,
+                ActionType.ATTRIBUTES_DELETED,
                 toException(e),
                 scope,
                 keys);
     }
 
-    private void handleError(Throwable e, final DeferredResult<ResponseEntity> response, HttpStatus defaultErrorStatus) {
-        ResponseEntity responseEntity;
-        if (e != null && e instanceof ToErrorResponseEntity) {
-            responseEntity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
-        } else if (e != null && e instanceof IllegalArgumentException) {
-            responseEntity = new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
-        } else {
-            responseEntity = new ResponseEntity<>(defaultErrorStatus);
-        }
-        response.setResult(responseEntity);
-    }
-
-    private void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
-        switch (entityId.getEntityType()) {
-            case DEVICE:
-                validateDevice(currentUser, entityId, callback);
-                return;
-            case ASSET:
-                validateAsset(currentUser, entityId, callback);
-                return;
-            case RULE_CHAIN:
-                validateRuleChain(currentUser, entityId, callback);
-                return;
-            case CUSTOMER:
-                validateCustomer(currentUser, entityId, callback);
-                return;
-            case TENANT:
-                validateTenant(currentUser, entityId, callback);
-                return;
-            default:
-                //TODO: add support of other entities
-                throw new IllegalStateException("Not Implemented!");
-        }
-    }
-
-    private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
-        if (currentUser.isSystemAdmin()) {
-            callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
-        } else {
-            ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
-            Futures.addCallback(deviceFuture, getCallback(callback, device -> {
-                if (device == null) {
-                    return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
-                } else {
-                    if (!device.getTenantId().equals(currentUser.getTenantId())) {
-                        return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
-                    } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
-                        return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
-                    } else {
-                        return ValidationResult.ok();
-                    }
-                }
-            }));
-        }
-    }
-
-    private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
-        if (currentUser.isSystemAdmin()) {
-            callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
-        } else {
-            ListenableFuture<Asset> assetFuture = assetService.findAssetByIdAsync(new AssetId(entityId.getId()));
-            Futures.addCallback(assetFuture, getCallback(callback, asset -> {
-                if (asset == null) {
-                    return ValidationResult.entityNotFound("Asset with requested id wasn't found!");
-                } else {
-                    if (!asset.getTenantId().equals(currentUser.getTenantId())) {
-                        return ValidationResult.accessDenied("Asset doesn't belong to the current Tenant!");
-                    } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
-                        return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
-                    } else {
-                        return ValidationResult.ok();
-                    }
-                }
-            }));
-        }
-    }
-
-
-    private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
-        if (currentUser.isCustomerUser()) {
-            callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
-        } else {
-            ListenableFuture<RuleChain> ruleChainFuture = ruleChainService.findRuleChainByIdAsync(new RuleChainId(entityId.getId()));
-            Futures.addCallback(ruleChainFuture, getCallback(callback, ruleChain -> {
-                if (ruleChain == null) {
-                    return ValidationResult.entityNotFound("Rule chain with requested id wasn't found!");
-                } else {
-                    if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
-                        return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
-                    } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
-                        return ValidationResult.accessDenied("Rule chain is not in system scope!");
-                    } else {
-                        return ValidationResult.ok();
-                    }
-                }
-            }));
-        }
-    }
-
-    private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
-        if (currentUser.isSystemAdmin()) {
-            callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
-        } else {
-            ListenableFuture<Customer> customerFuture = customerService.findCustomerByIdAsync(new CustomerId(entityId.getId()));
-            Futures.addCallback(customerFuture, getCallback(callback, customer -> {
-                if (customer == null) {
-                    return ValidationResult.entityNotFound("Customer with requested id wasn't found!");
-                } else {
-                    if (!customer.getTenantId().equals(currentUser.getTenantId())) {
-                        return ValidationResult.accessDenied("Customer doesn't belong to the current Tenant!");
-                    } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
-                        return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
-                    } else {
-                        return ValidationResult.ok();
-                    }
-                }
-            }));
-        }
-    }
-
-    private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
-        if (currentUser.isCustomerUser()) {
-            callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
-        } else if (currentUser.isSystemAdmin()) {
-            callback.onSuccess(ValidationResult.ok());
-        } else {
-            ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
-            Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
-                if (tenant == null) {
-                    return ValidationResult.entityNotFound("Tenant with requested id wasn't found!");
-                } else if (!tenant.getId().equals(currentUser.getTenantId())) {
-                    return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
-                } else {
-                    return ValidationResult.ok();
-                }
-            }));
-        }
+    private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) {
+        auditLogService.logEntityAction(
+                user.getTenantId(),
+                user.getCustomerId(),
+                user.getId(),
+                user.getName(),
+                (UUIDBased & EntityId) entityId,
+                null,
+                ActionType.ATTRIBUTES_UPDATED,
+                toException(e),
+                scope,
+                attributes);
     }
 
-    private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {
-        return new FutureCallback<T>() {
-            @Override
-            public void onSuccess(@Nullable T result) {
-                callback.onSuccess(transformer.apply(result));
-            }
 
-            @Override
-            public void onFailure(Throwable t) {
-                callback.onFailure(t);
-            }
-        };
+    private void logAttributesRead(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
+        auditLogService.logEntityAction(
+                user.getTenantId(),
+                user.getCustomerId(),
+                user.getId(),
+                user.getName(),
+                (UUIDBased & EntityId) entityId,
+                null,
+                ActionType.ATTRIBUTES_READ,
+                toException(e),
+                scope,
+                keys);
     }
 
     private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) {
@@ -467,4 +551,40 @@ public class TelemetryController extends BaseController {
                 }, executor);
     }
 
+    private List<String> toKeysList(String keys) {
+        List<String> keyList = null;
+        if (!StringUtils.isEmpty(keys)) {
+            keyList = Arrays.asList(keys.split(","));
+        }
+        return keyList;
+    }
+
+    private DeferredResult<ResponseEntity> getImmediateDeferredResult(String message, HttpStatus status) {
+        DeferredResult<ResponseEntity> result = new DeferredResult<>();
+        result.setResult(new ResponseEntity<>(message, status));
+        return result;
+    }
+
+    private List<AttributeKvEntry> extractRequestAttributes(JsonNode jsonNode) {
+        long ts = System.currentTimeMillis();
+        List<AttributeKvEntry> attributes = new ArrayList<>();
+        jsonNode.fields().forEachRemaining(entry -> {
+            String key = entry.getKey();
+            JsonNode value = entry.getValue();
+            if (entry.getValue().isTextual()) {
+                attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
+            } else if (entry.getValue().isBoolean()) {
+                attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
+            } else if (entry.getValue().isDouble()) {
+                attributes.add(new BaseAttributeKvEntry(new DoubleDataEntry(key, value.doubleValue()), ts));
+            } else if (entry.getValue().isNumber()) {
+                if (entry.getValue().isBigInteger()) {
+                    throw new UncheckedApiException(new InvalidParametersException("Big integer values are not supported!"));
+                } else {
+                    attributes.add(new BaseAttributeKvEntry(new LongDataEntry(key, value.longValue()), ts));
+                }
+            }
+        });
+        return attributes;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
index ead90ea..6b2718f 100644
--- a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
new file mode 100644
index 0000000..01bd238
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -0,0 +1,262 @@
+package org.thingsboard.server.service.security;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.Customer;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.Tenant;
+import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.controller.ValidationCallback;
+import org.thingsboard.server.dao.alarm.AlarmService;
+import org.thingsboard.server.dao.asset.AssetService;
+import org.thingsboard.server.dao.customer.CustomerService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.dao.tenant.TenantService;
+import org.thingsboard.server.dao.user.UserService;
+import org.thingsboard.server.exception.ThingsboardException;
+import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import javax.annotation.Nullable;
+import java.util.function.BiConsumer;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Component
+public class AccessValidator {
+
+    public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
+    public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
+    public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";
+
+    @Autowired
+    protected TenantService tenantService;
+
+    @Autowired
+    protected CustomerService customerService;
+
+    @Autowired
+    protected UserService userService;
+
+    @Autowired
+    protected DeviceService deviceService;
+
+    @Autowired
+    protected AssetService assetService;
+
+    @Autowired
+    protected AlarmService alarmService;
+
+    @Autowired
+    protected RuleChainService ruleChainService;
+
+    public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, String entityType, String entityIdStr,
+                                                                    BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess) throws ThingsboardException {
+        return validateEntityAndCallback(currentUser, entityType, entityIdStr, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+    }
+
+    public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, String entityType, String entityIdStr,
+                                                                    BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess,
+                                                                    BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
+        return validateEntityAndCallback(currentUser, EntityIdFactory.getByTypeAndId(entityType, entityIdStr),
+                onSuccess, onFailure);
+    }
+
+    public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, EntityId entityId,
+                                                                    BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess) throws ThingsboardException {
+        return validateEntityAndCallback(currentUser, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+    }
+
+    public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, EntityId entityId,
+                                                                    BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess,
+                                                                    BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
+
+        final DeferredResult<ResponseEntity> response = new DeferredResult<>();
+
+        validate(currentUser, entityId, new ValidationCallback(response,
+                new FutureCallback<DeferredResult<ResponseEntity>>() {
+                    @Override
+                    public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
+                        onSuccess.accept(response, entityId);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        onFailure.accept(response, t);
+                    }
+                }));
+
+        return response;
+    }
+
+    public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+        switch (entityId.getEntityType()) {
+            case DEVICE:
+                validateDevice(currentUser, entityId, callback);
+                return;
+            case ASSET:
+                validateAsset(currentUser, entityId, callback);
+                return;
+            case RULE_CHAIN:
+                validateRuleChain(currentUser, entityId, callback);
+                return;
+            case CUSTOMER:
+                validateCustomer(currentUser, entityId, callback);
+                return;
+            case TENANT:
+                validateTenant(currentUser, entityId, callback);
+                return;
+            default:
+                //TODO: add support of other entities
+                throw new IllegalStateException("Not Implemented!");
+        }
+    }
+
+    private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+        if (currentUser.isSystemAdmin()) {
+            callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+        } else {
+            ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
+            Futures.addCallback(deviceFuture, getCallback(callback, device -> {
+                if (device == null) {
+                    return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
+                } else {
+                    if (!device.getTenantId().equals(currentUser.getTenantId())) {
+                        return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
+                    } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
+                        return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
+                    } else {
+                        return ValidationResult.ok();
+                    }
+                }
+            }));
+        }
+    }
+
+    private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+        if (currentUser.isSystemAdmin()) {
+            callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+        } else {
+            ListenableFuture<Asset> assetFuture = assetService.findAssetByIdAsync(new AssetId(entityId.getId()));
+            Futures.addCallback(assetFuture, getCallback(callback, asset -> {
+                if (asset == null) {
+                    return ValidationResult.entityNotFound("Asset with requested id wasn't found!");
+                } else {
+                    if (!asset.getTenantId().equals(currentUser.getTenantId())) {
+                        return ValidationResult.accessDenied("Asset doesn't belong to the current Tenant!");
+                    } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
+                        return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
+                    } else {
+                        return ValidationResult.ok();
+                    }
+                }
+            }));
+        }
+    }
+
+
+    private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+        if (currentUser.isCustomerUser()) {
+            callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+        } else {
+            ListenableFuture<RuleChain> ruleChainFuture = ruleChainService.findRuleChainByIdAsync(new RuleChainId(entityId.getId()));
+            Futures.addCallback(ruleChainFuture, getCallback(callback, ruleChain -> {
+                if (ruleChain == null) {
+                    return ValidationResult.entityNotFound("Rule chain with requested id wasn't found!");
+                } else {
+                    if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
+                        return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
+                    } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
+                        return ValidationResult.accessDenied("Rule chain is not in system scope!");
+                    } else {
+                        return ValidationResult.ok();
+                    }
+                }
+            }));
+        }
+    }
+
+    private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+        if (currentUser.isSystemAdmin()) {
+            callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+        } else {
+            ListenableFuture<Customer> customerFuture = customerService.findCustomerByIdAsync(new CustomerId(entityId.getId()));
+            Futures.addCallback(customerFuture, getCallback(callback, customer -> {
+                if (customer == null) {
+                    return ValidationResult.entityNotFound("Customer with requested id wasn't found!");
+                } else {
+                    if (!customer.getTenantId().equals(currentUser.getTenantId())) {
+                        return ValidationResult.accessDenied("Customer doesn't belong to the current Tenant!");
+                    } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
+                        return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
+                    } else {
+                        return ValidationResult.ok();
+                    }
+                }
+            }));
+        }
+    }
+
+    private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+        if (currentUser.isCustomerUser()) {
+            callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+        } else if (currentUser.isSystemAdmin()) {
+            callback.onSuccess(ValidationResult.ok());
+        } else {
+            ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
+            Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
+                if (tenant == null) {
+                    return ValidationResult.entityNotFound("Tenant with requested id wasn't found!");
+                } else if (!tenant.getId().equals(currentUser.getTenantId())) {
+                    return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
+                } else {
+                    return ValidationResult.ok();
+                }
+            }));
+        }
+    }
+
+    private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {
+        return new FutureCallback<T>() {
+            @Override
+            public void onSuccess(@Nullable T result) {
+                callback.onSuccess(transformer.apply(result));
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                callback.onFailure(t);
+            }
+        };
+    }
+
+    public static void handleError(Throwable e, final DeferredResult<ResponseEntity> response, HttpStatus defaultErrorStatus) {
+        ResponseEntity responseEntity;
+        if (e != null && e instanceof ToErrorResponseEntity) {
+            responseEntity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
+        } else if (e != null && e instanceof IllegalArgumentException) {
+            responseEntity = new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
+        } else {
+            responseEntity = new ResponseEntity<>(defaultErrorStatus);
+        }
+        response.setResult(responseEntity);
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
new file mode 100644
index 0000000..359949e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -0,0 +1,42 @@
+package org.thingsboard.server.service.telemetry;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Service
+@Slf4j
+public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptionService {
+
+    @Autowired
+    private TelemetryWebSocketService wsService;
+
+
+    private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
+
+    private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
+
+
+
+    @Override
+    public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+
+    }
+
+    @Override
+    public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
+
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
new file mode 100644
index 0000000..6d6c33e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -0,0 +1,261 @@
+package org.thingsboard.server.service.telemetry;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
+import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Service
+@Slf4j
+public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
+
+    private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
+    private static final String PROCESSING_MSG = "[{}] Processing: {}";
+    private static final ObjectMapper jsonMapper = new ObjectMapper();
+    private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
+    private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
+    private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
+
+    private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    private TelemetrySubscriptionService subscriptionManager;
+
+    @Autowired
+    private TelemetryWebSocketMsgEndpoint msgEndpoint;
+
+    @Autowired
+    private AttributesService attributesService;
+
+    @Autowired
+    private TimeseriesService tsService;
+
+    @Override
+    public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
+        String sessionId = sessionRef.getSessionId();
+        log.debug(PROCESSING_MSG, sessionId, event);
+        switch (event.getEventType()) {
+            case ESTABLISHED:
+                wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
+                break;
+            case ERROR:
+                log.debug("[{}] Unknown websocket session error: {}. ", sessionId, event.getError().orElse(null));
+                break;
+            case CLOSED:
+                wsSessionsMap.remove(sessionId);
+                subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
+                break;
+        }
+    }
+
+    @Override
+    public void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg) {
+        if (log.isTraceEnabled()) {
+            log.trace("[{}] Processing: {}", sessionRef.getSessionId(), msg);
+        }
+
+        try {
+            TelemetryPluginCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, TelemetryPluginCmdsWrapper.class);
+            if (cmdsWrapper != null) {
+                if (cmdsWrapper.getAttrSubCmds() != null) {
+                    cmdsWrapper.getAttrSubCmds().forEach(cmd -> handleWsAttributesSubscriptionCmd(sessionRef, cmd));
+                }
+                if (cmdsWrapper.getTsSubCmds() != null) {
+                    cmdsWrapper.getTsSubCmds().forEach(cmd -> handleWsTimeseriesSubscriptionCmd(sessionRef, cmd));
+                }
+                if (cmdsWrapper.getHistoryCmds() != null) {
+                    cmdsWrapper.getHistoryCmds().forEach(cmd -> handleWsHistoryCmd(sessionRef, cmd));
+                }
+            }
+        } catch (IOException e) {
+            log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
+            SubscriptionUpdate update = new SubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
+            sendWsMsg(sessionRef, update);
+        }
+    }
+
+    private void handleWsAttributesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
+        String sessionId = sessionRef.getSessionId();
+        log.debug("[{}] Processing: {}", sessionId, cmd);
+
+        if (validateSessionMetadata(sessionRef, cmd, sessionId)) {
+            if (cmd.isUnsubscribe()) {
+                unsubscribe(sessionRef, cmd, sessionId);
+            } else if (validateSubscriptionCmd(sessionRef, cmd)) {
+                EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
+                log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId);
+                Optional<Set<String>> keysOptional = getKeys(cmd);
+                if (keysOptional.isPresent()) {
+                    List<String> keys = new ArrayList<>(keysOptional.get());
+                    handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys);
+                } else {
+                    handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId);
+                }
+            }
+        }
+    }
+
+    private void handleWsAttributesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
+                                                      AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId,
+                                                      List<String> keys) {
+        FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
+            @Override
+            public void onSuccess(List<AttributeKvEntry> data) {
+                List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+                sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+
+                Map<String, Long> subState = new HashMap<>(keys.size());
+                keys.forEach(key -> subState.put(key, 0L));
+                attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
+                subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
+                SubscriptionUpdate update;
+                if (UnauthorizedException.class.isInstance(e)) {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+                            SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+                } else {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                            FAILED_TO_FETCH_ATTRIBUTES);
+                }
+                sendWsMsg(sessionRef, update);
+            }
+        };
+
+        if (StringUtils.isEmpty(cmd.getScope())) {
+            //ValidationCallback?
+            ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback);
+        } else {
+            ctx.loadAttributes(entityId, cmd.getScope(), keys, callback);
+        }
+    }
+
+    private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef,
+                                                AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
+        PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+            @Override
+            public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+                List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+
+                Map<String, Long> subState = new HashMap<>(attributesData.size());
+                attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
+                subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
+                SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                        FAILED_TO_FETCH_ATTRIBUTES);
+                sendWsMsg(ctx, sessionRef, update);
+            }
+        };
+
+        if (StringUtils.isEmpty(cmd.getScope())) {
+            ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback);
+        } else {
+            ctx.loadAttributes(entityId, cmd.getScope(), callback);
+        }
+    }
+
+    private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
+        if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
+            subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
+        } else {
+            subscriptionManager.removeSubscription(sessionId, cmd.getCmdId());
+        }
+    }
+
+    private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
+        if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
+            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
+                    "Device id is empty!");
+            sendWsMsg(sessionRef, update);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean validateSessionMetadata(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
+        WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
+        if (sessionMD == null) {
+            log.warn("[{}] Session meta data not found. ", sessionId);
+            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                    SESSION_META_DATA_NOT_FOUND);
+            sendWsMsg(sessionRef, update);
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, SubscriptionUpdate update) {
+        try {
+            msgEndpoint.send(sessionRef, jsonMapper.writeValueAsString(update));
+        } catch (JsonProcessingException e) {
+            log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
+        } catch (IOException e) {
+            log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);
+        }
+    }
+
+    private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) {
+        if (!StringUtils.isEmpty(cmd.getKeys())) {
+            Set<String> keys = new HashSet<>();
+            Collections.addAll(keys, cmd.getKeys().split(","));
+            return Optional.of(keys);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
new file mode 100644
index 0000000..9673629
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -0,0 +1,24 @@
+package org.thingsboard.server.service.telemetry;
+
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public interface TelemetrySubscriptionService {
+
+    void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
+
+    void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries);
+
+    void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
+
+    void removeSubscription(String sessionId, int cmdId);
+
+    void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
new file mode 100644
index 0000000..a7e7cad
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
@@ -0,0 +1,14 @@
+package org.thingsboard.server.service.telemetry;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public interface TelemetryWebSocketMsgEndpoint {
+
+    void send(TelemetryWebSocketSessionRef sessionRef, String msg) throws IOException;
+
+    void close(TelemetryWebSocketSessionRef sessionRef) throws IOException;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
new file mode 100644
index 0000000..883a174
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
@@ -0,0 +1,13 @@
+package org.thingsboard.server.service.telemetry;
+
+import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public interface TelemetryWebSocketService {
+
+    void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent sessionEvent);
+
+    void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
new file mode 100644
index 0000000..3fd4b19
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
@@ -0,0 +1,53 @@
+package org.thingsboard.server.service.telemetry;
+
+import lombok.Getter;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public class TelemetryWebSocketSessionRef {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter
+    private final String sessionId;
+    @Getter
+    private final SecurityUser securityCtx;
+    @Getter
+    private final InetSocketAddress localAddress;
+    @Getter
+    private final InetSocketAddress remoteAddress;
+
+    public TelemetryWebSocketSessionRef(String sessionId, SecurityUser securityCtx, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
+        this.sessionId = sessionId;
+        this.securityCtx = securityCtx;
+        this.localAddress = localAddress;
+        this.remoteAddress = remoteAddress;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TelemetryWebSocketSessionRef that = (TelemetryWebSocketSessionRef) o;
+        return Objects.equals(sessionId, that.sessionId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sessionId);
+    }
+
+    @Override
+    public String toString() {
+        return "TelemetryWebSocketSessionRef{" +
+                "sessionId='" + sessionId + '\'' +
+                ", localAddress=" + localAddress +
+                ", remoteAddress=" + remoteAddress +
+                '}';
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
new file mode 100644
index 0000000..6d57122
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
@@ -0,0 +1,19 @@
+package org.thingsboard.server.service.telemetry;
+
+import lombok.Data;
+import lombok.Getter;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Data
+public class TelemetryWebSocketTextMsg {
+
+    private final TelemetryWebSocketSessionRef sessionRef;
+    private final String payload;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
new file mode 100644
index 0000000..c0b162b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
@@ -0,0 +1,38 @@
+package org.thingsboard.server.service.telemetry;
+
+import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public class WsSessionMetaData {
+    private TelemetryWebSocketSessionRef sessionRef;
+    private long lastActivityTime;
+
+    public WsSessionMetaData(TelemetryWebSocketSessionRef sessionRef) {
+        super();
+        this.sessionRef = sessionRef;
+        this.lastActivityTime = System.currentTimeMillis();
+    }
+
+    public TelemetryWebSocketSessionRef getSessionRef() {
+        return sessionRef;
+    }
+
+    public void setSessionRef(TelemetryWebSocketSessionRef sessionRef) {
+        this.sessionRef = sessionRef;
+    }
+
+    public long getLastActivityTime() {
+        return lastActivityTime;
+    }
+
+    public void setLastActivityTime(long lastActivityTime) {
+        this.lastActivityTime = lastActivityTime;
+    }
+
+    @Override
+    public String toString() {
+        return "WsSessionMetaData [sessionRef=" + sessionRef + ", lastActivityTime=" + lastActivityTime + "]";
+    }
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
index 76b3e33..31c1cda 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
@@ -33,6 +33,10 @@ public class EntityIdFactory {
         return getByTypeAndUuid(EntityType.valueOf(type), uuid);
     }
 
+    public static EntityId getByTypeAndUuid(EntityType type, String uuid) {
+        return getByTypeAndUuid(type, UUID.fromString(uuid));
+    }
+
     public static EntityId getByTypeAndUuid(EntityType type, UUID uuid) {
         switch (type) {
             case TENANT:
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index c6e7a54..158945c 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -243,27 +243,19 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
         switch (attr.getDataType()) {
             case BOOLEAN:
                 Optional<Boolean> booleanValue = attr.getBooleanValue();
-                if (booleanValue.isPresent()) {
-                    dataBuilder.setBoolValue(booleanValue.get());
-                }
+                booleanValue.ifPresent(dataBuilder::setBoolValue);
                 break;
             case LONG:
                 Optional<Long> longValue = attr.getLongValue();
-                if (longValue.isPresent()) {
-                    dataBuilder.setLongValue(longValue.get());
-                }
+                longValue.ifPresent(dataBuilder::setLongValue);
                 break;
             case DOUBLE:
                 Optional<Double> doubleValue = attr.getDoubleValue();
-                if (doubleValue.isPresent()) {
-                    dataBuilder.setDoubleValue(doubleValue.get());
-                }
+                doubleValue.ifPresent(dataBuilder::setDoubleValue);
                 break;
             case STRING:
                 Optional<String> stringValue = attr.getStrValue();
-                if (stringValue.isPresent()) {
-                    dataBuilder.setStrValue(stringValue.get());
-                }
+                stringValue.ifPresent(dataBuilder::setStrValue);
                 break;
         }
         return dataBuilder;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index ec00677..9d9c2f3 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -344,9 +344,7 @@ public class SubscriptionManager {
     }
 
     private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) {
-        Iterator<Subscription> subscriptionIterator = subscriptions.iterator();
-        while (subscriptionIterator.hasNext()) {
-            Subscription s = subscriptionIterator.next();
+        for (Subscription s : subscriptions) {
             if (s.isLocal()) {
                 if (s.getServer() != null) {
                     log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());