thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java 110(+55 -55)
application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java 139(+73 -66)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 42(+42 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java 261(+261 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java 24(+24 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java 14(+14 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java 13(+13 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java 53(+53 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java 19(+19 -0)
application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java 38(+38 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
index a75ecb1..d44c50e 100644
--- a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
+++ b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
@@ -19,7 +19,7 @@ import java.util.Map;
import org.thingsboard.server.exception.ThingsboardErrorCode;
import org.thingsboard.server.exception.ThingsboardException;
-import org.thingsboard.server.controller.plugin.PluginWebSocketHandler;
+import org.thingsboard.server.controller.plugin.TbWebSocketHandler;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -54,7 +54,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
- registry.addHandler(pluginWsHandler(), WS_PLUGIN_MAPPING).setAllowedOrigins("*")
+ registry.addHandler(wsHandler(), WS_PLUGIN_MAPPING).setAllowedOrigins("*")
.addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() {
@Override
@@ -82,8 +82,8 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
}
@Bean
- public WebSocketHandler pluginWsHandler() {
- return new PluginWebSocketHandler();
+ public WebSocketHandler wsHandler() {
+ return new TbWebSocketHandler();
}
protected SecurityUser getCurrentUser() throws ThingsboardException {
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java b/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java
index 8e3cee4..045835e 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/PluginApiController.java
@@ -48,59 +48,59 @@ import javax.servlet.http.HttpServletRequest;
@Slf4j
public class PluginApiController extends BaseController {
- @SuppressWarnings("rawtypes")
- @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{pluginToken}/**")
- @ResponseStatus(value = HttpStatus.OK)
- public DeferredResult<ResponseEntity> processRequest(
- @PathVariable("pluginToken") String pluginToken,
- RequestEntity<byte[]> requestEntity,
- HttpServletRequest request)
- throws ThingsboardException {
- log.debug("[{}] Going to process requst uri: {}", pluginToken, requestEntity.getUrl());
- DeferredResult<ResponseEntity> result = new DeferredResult<ResponseEntity>();
- PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);
- if (pluginMd == null) {
- result.setErrorResult(new PluginNotFoundException("Plugin with token: " + pluginToken + " not found!"));
- } else {
- TenantId tenantId = getCurrentUser().getTenantId();
- CustomerId customerId = getCurrentUser().getCustomerId();
- if (validatePluginAccess(pluginMd, tenantId, customerId)) {
- if(tenantId != null && ModelConstants.NULL_UUID.equals(tenantId.getId())){
- tenantId = null;
- }
- UserId userId = getCurrentUser().getId();
- String userName = getCurrentUser().getName();
- PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(),
- tenantId, customerId, userId, userName);
- actorService.process(new BasicPluginRestMsg(securityCtx, new RestRequest(requestEntity, request), result));
- } else {
- result.setResult(new ResponseEntity<>(HttpStatus.FORBIDDEN));
- }
-
- }
- return result;
- }
-
- public static boolean validatePluginAccess(PluginMetaData pluginMd, TenantId tenantId, CustomerId customerId) {
- boolean systemAdministrator = tenantId == null || ModelConstants.NULL_UUID.equals(tenantId.getId());
- boolean tenantAdministrator = !systemAdministrator && (customerId == null || ModelConstants.NULL_UUID.equals(customerId.getId()));
- boolean systemPlugin = ModelConstants.NULL_UUID.equals(pluginMd.getTenantId().getId());
-
- boolean validUser = false;
- if (systemPlugin) {
- if (pluginMd.isPublicAccess() || systemAdministrator) {
- // All users can access public system plugins. Only system
- // users can access private system plugins
- validUser = true;
- }
- } else {
- if ((pluginMd.isPublicAccess() || tenantAdministrator) && tenantId != null && tenantId.equals(pluginMd.getTenantId())) {
- // All tenant users can access public tenant plugins. Only tenant
- // administrator can access private tenant plugins
- validUser = true;
- }
- }
- return validUser;
- }
+// @SuppressWarnings("rawtypes")
+// @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+// @RequestMapping(value = "/{pluginToken}/**")
+// @ResponseStatus(value = HttpStatus.OK)
+// public DeferredResult<ResponseEntity> processRequest(
+// @PathVariable("pluginToken") String pluginToken,
+// RequestEntity<byte[]> requestEntity,
+// HttpServletRequest request)
+// throws ThingsboardException {
+// log.debug("[{}] Going to process requst uri: {}", pluginToken, requestEntity.getUrl());
+// DeferredResult<ResponseEntity> result = new DeferredResult<ResponseEntity>();
+// PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);
+// if (pluginMd == null) {
+// result.setErrorResult(new PluginNotFoundException("Plugin with token: " + pluginToken + " not found!"));
+// } else {
+// TenantId tenantId = getCurrentUser().getTenantId();
+// CustomerId customerId = getCurrentUser().getCustomerId();
+// if (validatePluginAccess(pluginMd, tenantId, customerId)) {
+// if(tenantId != null && ModelConstants.NULL_UUID.equals(tenantId.getId())){
+// tenantId = null;
+// }
+// UserId userId = getCurrentUser().getId();
+// String userName = getCurrentUser().getName();
+// PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(),
+// tenantId, customerId, userId, userName);
+// actorService.process(new BasicPluginRestMsg(securityCtx, new RestRequest(requestEntity, request), result));
+// } else {
+// result.setResult(new ResponseEntity<>(HttpStatus.FORBIDDEN));
+// }
+//
+// }
+// return result;
+// }
+//
+// public static boolean validatePluginAccess(PluginMetaData pluginMd, TenantId tenantId, CustomerId customerId) {
+// boolean systemAdministrator = tenantId == null || ModelConstants.NULL_UUID.equals(tenantId.getId());
+// boolean tenantAdministrator = !systemAdministrator && (customerId == null || ModelConstants.NULL_UUID.equals(customerId.getId()));
+// boolean systemPlugin = ModelConstants.NULL_UUID.equals(pluginMd.getTenantId().getId());
+//
+// boolean validUser = false;
+// if (systemPlugin) {
+// if (pluginMd.isPublicAccess() || systemAdministrator) {
+// // All users can access public system plugins. Only system
+// // users can access private system plugins
+// validUser = true;
+// }
+// } else {
+// if ((pluginMd.isPublicAccess() || tenantAdministrator) && tenantId != null && tenantId.equals(pluginMd.getTenantId())) {
+// // All tenant users can access public tenant plugins. Only tenant
+// // administrator can access private tenant plugins
+// validUser = true;
+// }
+// }
+// return validUser;
+// }
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 1944fa2..a2867ce 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -1,9 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.controller;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
@@ -11,47 +29,54 @@ import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
-import org.thingsboard.server.actors.plugin.ValidationResult;
-import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.Tenant;
-import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.audit.ActionType;
-import org.thingsboard.server.common.data.id.AssetId;
-import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
-import org.thingsboard.server.common.data.id.RuleChainId;
-import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.exception.ThingsboardException;
-import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
+import org.thingsboard.server.extensions.api.exception.InvalidParametersException;
+import org.thingsboard.server.extensions.api.exception.UncheckedApiException;
import org.thingsboard.server.extensions.api.plugins.PluginConstants;
import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
+import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
+import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
@@ -62,9 +87,8 @@ import java.util.stream.Collectors;
@Slf4j
public class TelemetryController extends BaseController {
- public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
- public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
- public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";
+ @Autowired
+ private TelemetrySubscriptionService subscriptionService;
@Autowired
private AttributesService attributesService;
@@ -72,6 +96,9 @@ public class TelemetryController extends BaseController {
@Autowired
private TimeseriesService tsService;
+ @Autowired
+ private AccessValidator accessValidator;
+
private ExecutorService executor;
public void initExecutor() {
@@ -87,117 +114,277 @@ public class TelemetryController extends BaseController {
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
+ @ResponseBody
public DeferredResult<ResponseEntity> getAttributeKeys(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
- return validateEntityAndCallback(entityType, entityIdStr,
- this::getAttributeKeysCallback,
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, this::getAttributeKeysCallback);
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
+ @ResponseBody
public DeferredResult<ResponseEntity> getAttributeKeysByScope(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr
, @PathVariable("scope") String scope) throws ThingsboardException {
- return validateEntityAndCallback(entityType, entityIdStr,
- (result, entityId) -> getAttributeKeysCallback(result, entityId, scope),
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+ (result, entityId) -> getAttributeKeysCallback(result, entityId, scope));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
+ @ResponseBody
public DeferredResult<ResponseEntity> getAttributes(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
SecurityUser user = getCurrentUser();
- return validateEntityAndCallback(entityType, entityIdStr,
- (result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr),
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+ (result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
+ @ResponseBody
public DeferredResult<ResponseEntity> getAttributesByScope(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@PathVariable("scope") String scope,
@RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
SecurityUser user = getCurrentUser();
- return validateEntityAndCallback(entityType, entityIdStr,
- (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+ (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
+ @ResponseBody
public DeferredResult<ResponseEntity> getTimeseriesKeys(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
- return validateEntityAndCallback(entityType, entityIdStr,
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
(result, entityId) -> {
Futures.addCallback(tsService.findAllLatest(entityId), getTsKeysToResponseCallback(result));
- },
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ });
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
+ @ResponseBody
public DeferredResult<ResponseEntity> getLatestTimeseries(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
- @PathVariable("scope") String scope,
@RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
SecurityUser user = getCurrentUser();
- return validateEntityAndCallback(entityType, entityIdStr,
- (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+ (result, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
- @ResponseStatus(value = HttpStatus.OK)
- public DeferredResult<ResponseEntity> getLatestTimeseries(
+ @ResponseBody
+ public DeferredResult<ResponseEntity> getTimeseries(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
- @PathVariable("scope") String scope,
- @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
- SecurityUser user = getCurrentUser();
+ @RequestParam(name = "keys") String keys,
+ @RequestParam(name = "startTs") Long startTs,
+ @RequestParam(name = "endTs") Long endTs,
+ @RequestParam(name = "interval", defaultValue = "0") Long interval,
+ @RequestParam(name = "limit", defaultValue = "100") Integer limit,
+ @RequestParam(name = "agg", defaultValue = "NONE") String aggStr
+ ) throws ThingsboardException {
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
+ (result, entityId) -> {
+ // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
+ Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
+ List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
+ .collect(Collectors.toList());
+
+ Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
+ });
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> saveDeviceAttributes(@PathVariable("deviceId") String deviceIdStr, @PathVariable("scope") String scope,
+ @RequestBody JsonNode request) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
+ return saveAttributes(entityId, scope, request);
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> saveEntityAttributesV1(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @PathVariable("scope") String scope,
+ @RequestBody JsonNode request) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return saveAttributes(entityId, scope, request);
+ }
- return validateEntityAndCallback(entityType, entityIdStr,
- (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),
- (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @PathVariable("scope") String scope,
+ @RequestBody JsonNode request) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return saveAttributes(entityId, scope, request);
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @PathVariable("scope") String scope,
+ @RequestBody String requestBody) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return saveTelemetry(entityId, requestBody, 0L);
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @PathVariable("scope") String scope, @PathVariable("ttl") Long ttl,
+ @RequestBody String requestBody) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return saveTelemetry(entityId, requestBody, ttl);
}
- private DeferredResult<ResponseEntity> validateEntityAndCallback(String entityType, String entityIdStr,
- BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
- final DeferredResult<ResponseEntity> response = new DeferredResult<>();
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("deviceId") String deviceIdStr,
+ @PathVariable("scope") String scope,
+ @RequestParam(name = "keys") String keysStr) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
+ return deleteAttributes(entityId, scope, keysStr);
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.DELETE)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @PathVariable("scope") String scope,
+ @RequestParam(name = "keys") String keysStr) throws ThingsboardException {
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return deleteAttributes(entityId, scope, keysStr);
+ }
+
+ private DeferredResult<ResponseEntity> deleteAttributes(EntityId entityIdStr, String scope, String keysStr) throws ThingsboardException {
+ List<String> keys = toKeysList(keysStr);
+ if (keys.isEmpty()) {
+ return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
+ }
+ SecurityUser user = getCurrentUser();
+ if (DataConstants.SERVER_SCOPE.equals(scope) ||
+ DataConstants.SHARED_SCOPE.equals(scope) ||
+ DataConstants.CLIENT_SCOPE.equals(scope)) {
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdStr, (result, entityId) -> {
+ ListenableFuture<List<Void>> future = attributesService.removeAll(entityId, scope, keys);
+ Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> tmp) {
+ logAttributesDeleted(user, entityId, scope, keys, null);
+ result.setResult(new ResponseEntity<>(HttpStatus.OK));
+ }
- validate(getCurrentUser(), entityId, new ValidationCallback(response,
- new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
- public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
- onSuccess.accept(response, entityId);
+ public void onFailure(Throwable t) {
+ logAttributesDeleted(user, entityId, scope, keys, t);
+ result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ }, executor);
+ });
+ } else {
+ return getImmediateDeferredResult("Invalid attribute scope: " + scope, HttpStatus.BAD_REQUEST);
+ }
+ }
+
+ private DeferredResult<ResponseEntity> saveAttributes(EntityId entityIdSrc, String scope, JsonNode json) throws ThingsboardException {
+ if (!DataConstants.SERVER_SCOPE.equals(scope) && !DataConstants.SHARED_SCOPE.equals(scope)) {
+ return getImmediateDeferredResult("Invalid scope: " + scope, HttpStatus.BAD_REQUEST);
+ }
+ if (json.isObject()) {
+ List<AttributeKvEntry> attributes = extractRequestAttributes(json);
+ if (attributes.isEmpty()) {
+ return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST);
+ }
+ SecurityUser user = getCurrentUser();
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
+ ListenableFuture<List<Void>> future = attributesService.save(entityId, scope, attributes);
+ Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> tmp) {
+ logAttributesUpdated(user, entityId, scope, attributes, null);
+ result.setResult(new ResponseEntity(HttpStatus.OK));
+ subscriptionService.onAttributesUpdateFromServer(entityId, scope, attributes);
}
@Override
public void onFailure(Throwable t) {
- onFailure.accept(response, t);
+ logAttributesUpdated(user, entityId, scope, attributes, t);
+ AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
}
- }));
+ });
+ result.setResult(new ResponseEntity(HttpStatus.OK));
+ });
+ } else {
+ return getImmediateDeferredResult("Request is not a JSON object", HttpStatus.BAD_REQUEST);
+ }
+ }
+
+ private DeferredResult<ResponseEntity> saveTelemetry(EntityId entityIdSrc, String requestBody, long ttl) throws ThingsboardException {
+ TelemetryUploadRequest telemetryRequest;
+ JsonElement telemetryJson;
+ try {
+ telemetryJson = new JsonParser().parse(requestBody);
+ } catch (Exception e) {
+ return getImmediateDeferredResult("Unable to parse timeseries payload: Invalid JSON body!", HttpStatus.BAD_REQUEST);
+ }
+ try {
+ telemetryRequest = JsonConverter.convertToTelemetry(telemetryJson);
+ } catch (Exception e) {
+ return getImmediateDeferredResult("Unable to parse timeseries payload. Invalid JSON body: " + e.getMessage(), HttpStatus.BAD_REQUEST);
+ }
+ List<TsKvEntry> entries = new ArrayList<>();
+ for (Map.Entry<Long, List<KvEntry>> entry : telemetryRequest.getData().entrySet()) {
+ for (KvEntry kv : entry.getValue()) {
+ entries.add(new BasicTsKvEntry(entry.getKey(), kv));
+ }
+ }
+ if (entries.isEmpty()) {
+ return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST);
+ }
+ SecurityUser user = getCurrentUser();
+ return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
+ ListenableFuture<List<Void>> future = tsService.save(entityId, entries, ttl);
+ Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> tmp) {
+ result.setResult(new ResponseEntity(HttpStatus.OK));
+ subscriptionService.onTimeseriesUpdateFromServer(entityId, entries);
+ }
- return response;
+ @Override
+ public void onFailure(Throwable t) {
+ AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+ });
+ result.setResult(new ResponseEntity(HttpStatus.OK));
+ });
}
- private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {
- List<String> keyList = null;
- if (!StringUtils.isEmpty(keys)) {
- keyList = Arrays.asList(keys.split(","));
+ private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys) {
+ ListenableFuture<List<TsKvEntry>> future;
+ if (StringUtils.isEmpty(keys)) {
+ future = tsService.findAllLatest(entityId);
+ } else {
+ future = tsService.findLatest(entityId, toKeysList(keys));
}
+ Futures.addCallback(future, getTsKvListCallback(result));
+ }
+
+ private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {
+ List<String> keyList = toKeysList(keys);
FutureCallback<List<AttributeKvEntry>> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList);
if (!StringUtils.isEmpty(scope)) {
if (keyList != null && !keyList.isEmpty()) {
@@ -247,7 +434,7 @@ public class TelemetryController extends BaseController {
@Override
public void onFailure(Throwable e) {
log.error("Failed to fetch attributes", e);
- handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+ AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
}
};
}
@@ -264,12 +451,13 @@ public class TelemetryController extends BaseController {
@Override
public void onFailure(Throwable e) {
log.error("Failed to fetch attributes", e);
- handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+ AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
}
};
}
- private FutureCallback<List<AttributeKvEntry>> getAttributeValuesToResponseCallback(final DeferredResult<ResponseEntity> response, final SecurityUser user, final String scope,
+ private FutureCallback<List<AttributeKvEntry>> getAttributeValuesToResponseCallback(final DeferredResult<ResponseEntity> response,
+ final SecurityUser user, final String scope,
final EntityId entityId, final List<String> keyList) {
return new FutureCallback<List<AttributeKvEntry>>() {
@Override
@@ -284,12 +472,32 @@ public class TelemetryController extends BaseController {
public void onFailure(Throwable e) {
log.error("Failed to fetch attributes", e);
logAttributesRead(user, entityId, scope, keyList, e);
- handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+ AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
}
};
}
- private void logAttributesRead(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
+ private FutureCallback<List<TsKvEntry>> getTsKvListCallback(final DeferredResult<ResponseEntity> response) {
+ return new FutureCallback<List<TsKvEntry>>() {
+ @Override
+ public void onSuccess(List<TsKvEntry> data) {
+ Map<String, List<TsData>> result = new LinkedHashMap<>();
+ for (TsKvEntry entry : data) {
+ result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>())
+ .add(new TsData(entry.getTs(), entry.getValueAsString()));
+ }
+ response.setResult(new ResponseEntity<>(result, HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ log.error("Failed to fetch historical data", e);
+ AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+ };
+ }
+
+ private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
auditLogService.logEntityAction(
user.getTenantId(),
user.getCustomerId(),
@@ -297,163 +505,39 @@ public class TelemetryController extends BaseController {
user.getName(),
(UUIDBased & EntityId) entityId,
null,
- ActionType.ATTRIBUTES_READ,
+ ActionType.ATTRIBUTES_DELETED,
toException(e),
scope,
keys);
}
- private void handleError(Throwable e, final DeferredResult<ResponseEntity> response, HttpStatus defaultErrorStatus) {
- ResponseEntity responseEntity;
- if (e != null && e instanceof ToErrorResponseEntity) {
- responseEntity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
- } else if (e != null && e instanceof IllegalArgumentException) {
- responseEntity = new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
- } else {
- responseEntity = new ResponseEntity<>(defaultErrorStatus);
- }
- response.setResult(responseEntity);
- }
-
- private void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
- switch (entityId.getEntityType()) {
- case DEVICE:
- validateDevice(currentUser, entityId, callback);
- return;
- case ASSET:
- validateAsset(currentUser, entityId, callback);
- return;
- case RULE_CHAIN:
- validateRuleChain(currentUser, entityId, callback);
- return;
- case CUSTOMER:
- validateCustomer(currentUser, entityId, callback);
- return;
- case TENANT:
- validateTenant(currentUser, entityId, callback);
- return;
- default:
- //TODO: add support of other entities
- throw new IllegalStateException("Not Implemented!");
- }
- }
-
- private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
- if (currentUser.isSystemAdmin()) {
- callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
- } else {
- ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
- Futures.addCallback(deviceFuture, getCallback(callback, device -> {
- if (device == null) {
- return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
- } else {
- if (!device.getTenantId().equals(currentUser.getTenantId())) {
- return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
- } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
- return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
- } else {
- return ValidationResult.ok();
- }
- }
- }));
- }
- }
-
- private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
- if (currentUser.isSystemAdmin()) {
- callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
- } else {
- ListenableFuture<Asset> assetFuture = assetService.findAssetByIdAsync(new AssetId(entityId.getId()));
- Futures.addCallback(assetFuture, getCallback(callback, asset -> {
- if (asset == null) {
- return ValidationResult.entityNotFound("Asset with requested id wasn't found!");
- } else {
- if (!asset.getTenantId().equals(currentUser.getTenantId())) {
- return ValidationResult.accessDenied("Asset doesn't belong to the current Tenant!");
- } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
- return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
- } else {
- return ValidationResult.ok();
- }
- }
- }));
- }
- }
-
-
- private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
- if (currentUser.isCustomerUser()) {
- callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
- } else {
- ListenableFuture<RuleChain> ruleChainFuture = ruleChainService.findRuleChainByIdAsync(new RuleChainId(entityId.getId()));
- Futures.addCallback(ruleChainFuture, getCallback(callback, ruleChain -> {
- if (ruleChain == null) {
- return ValidationResult.entityNotFound("Rule chain with requested id wasn't found!");
- } else {
- if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
- return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
- } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
- return ValidationResult.accessDenied("Rule chain is not in system scope!");
- } else {
- return ValidationResult.ok();
- }
- }
- }));
- }
- }
-
- private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
- if (currentUser.isSystemAdmin()) {
- callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
- } else {
- ListenableFuture<Customer> customerFuture = customerService.findCustomerByIdAsync(new CustomerId(entityId.getId()));
- Futures.addCallback(customerFuture, getCallback(callback, customer -> {
- if (customer == null) {
- return ValidationResult.entityNotFound("Customer with requested id wasn't found!");
- } else {
- if (!customer.getTenantId().equals(currentUser.getTenantId())) {
- return ValidationResult.accessDenied("Customer doesn't belong to the current Tenant!");
- } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
- return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
- } else {
- return ValidationResult.ok();
- }
- }
- }));
- }
- }
-
- private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
- if (currentUser.isCustomerUser()) {
- callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
- } else if (currentUser.isSystemAdmin()) {
- callback.onSuccess(ValidationResult.ok());
- } else {
- ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
- Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
- if (tenant == null) {
- return ValidationResult.entityNotFound("Tenant with requested id wasn't found!");
- } else if (!tenant.getId().equals(currentUser.getTenantId())) {
- return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
- } else {
- return ValidationResult.ok();
- }
- }));
- }
+ private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) {
+ auditLogService.logEntityAction(
+ user.getTenantId(),
+ user.getCustomerId(),
+ user.getId(),
+ user.getName(),
+ (UUIDBased & EntityId) entityId,
+ null,
+ ActionType.ATTRIBUTES_UPDATED,
+ toException(e),
+ scope,
+ attributes);
}
- private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {
- return new FutureCallback<T>() {
- @Override
- public void onSuccess(@Nullable T result) {
- callback.onSuccess(transformer.apply(result));
- }
- @Override
- public void onFailure(Throwable t) {
- callback.onFailure(t);
- }
- };
+ private void logAttributesRead(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
+ auditLogService.logEntityAction(
+ user.getTenantId(),
+ user.getCustomerId(),
+ user.getId(),
+ user.getName(),
+ (UUIDBased & EntityId) entityId,
+ null,
+ ActionType.ATTRIBUTES_READ,
+ toException(e),
+ scope,
+ keys);
}
private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) {
@@ -467,4 +551,40 @@ public class TelemetryController extends BaseController {
}, executor);
}
+ private List<String> toKeysList(String keys) {
+ List<String> keyList = null;
+ if (!StringUtils.isEmpty(keys)) {
+ keyList = Arrays.asList(keys.split(","));
+ }
+ return keyList;
+ }
+
+ private DeferredResult<ResponseEntity> getImmediateDeferredResult(String message, HttpStatus status) {
+ DeferredResult<ResponseEntity> result = new DeferredResult<>();
+ result.setResult(new ResponseEntity<>(message, status));
+ return result;
+ }
+
+ private List<AttributeKvEntry> extractRequestAttributes(JsonNode jsonNode) {
+ long ts = System.currentTimeMillis();
+ List<AttributeKvEntry> attributes = new ArrayList<>();
+ jsonNode.fields().forEachRemaining(entry -> {
+ String key = entry.getKey();
+ JsonNode value = entry.getValue();
+ if (entry.getValue().isTextual()) {
+ attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
+ } else if (entry.getValue().isBoolean()) {
+ attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
+ } else if (entry.getValue().isDouble()) {
+ attributes.add(new BaseAttributeKvEntry(new DoubleDataEntry(key, value.doubleValue()), ts));
+ } else if (entry.getValue().isNumber()) {
+ if (entry.getValue().isBigInteger()) {
+ throw new UncheckedApiException(new InvalidParametersException("Big integer values are not supported!"));
+ } else {
+ attributes.add(new BaseAttributeKvEntry(new LongDataEntry(key, value.longValue()), ts));
+ }
+ }
+ });
+ return attributes;
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
index ead90ea..6b2718f 100644
--- a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
new file mode 100644
index 0000000..01bd238
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java
@@ -0,0 +1,262 @@
+package org.thingsboard.server.service.security;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.actors.plugin.ValidationResult;
+import org.thingsboard.server.common.data.Customer;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.Tenant;
+import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.controller.ValidationCallback;
+import org.thingsboard.server.dao.alarm.AlarmService;
+import org.thingsboard.server.dao.asset.AssetService;
+import org.thingsboard.server.dao.customer.CustomerService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.dao.tenant.TenantService;
+import org.thingsboard.server.dao.user.UserService;
+import org.thingsboard.server.exception.ThingsboardException;
+import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import javax.annotation.Nullable;
+import java.util.function.BiConsumer;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Component
+public class AccessValidator {
+
+ public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
+ public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
+ public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";
+
+ @Autowired
+ protected TenantService tenantService;
+
+ @Autowired
+ protected CustomerService customerService;
+
+ @Autowired
+ protected UserService userService;
+
+ @Autowired
+ protected DeviceService deviceService;
+
+ @Autowired
+ protected AssetService assetService;
+
+ @Autowired
+ protected AlarmService alarmService;
+
+ @Autowired
+ protected RuleChainService ruleChainService;
+
+ public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, String entityType, String entityIdStr,
+ BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess) throws ThingsboardException {
+ return validateEntityAndCallback(currentUser, entityType, entityIdStr, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+
+ public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, String entityType, String entityIdStr,
+ BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess,
+ BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
+ return validateEntityAndCallback(currentUser, EntityIdFactory.getByTypeAndId(entityType, entityIdStr),
+ onSuccess, onFailure);
+ }
+
+ public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, EntityId entityId,
+ BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess) throws ThingsboardException {
+ return validateEntityAndCallback(currentUser, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+
+ public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, EntityId entityId,
+ BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess,
+ BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
+
+ final DeferredResult<ResponseEntity> response = new DeferredResult<>();
+
+ validate(currentUser, entityId, new ValidationCallback(response,
+ new FutureCallback<DeferredResult<ResponseEntity>>() {
+ @Override
+ public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
+ onSuccess.accept(response, entityId);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ onFailure.accept(response, t);
+ }
+ }));
+
+ return response;
+ }
+
+ public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ switch (entityId.getEntityType()) {
+ case DEVICE:
+ validateDevice(currentUser, entityId, callback);
+ return;
+ case ASSET:
+ validateAsset(currentUser, entityId, callback);
+ return;
+ case RULE_CHAIN:
+ validateRuleChain(currentUser, entityId, callback);
+ return;
+ case CUSTOMER:
+ validateCustomer(currentUser, entityId, callback);
+ return;
+ case TENANT:
+ validateTenant(currentUser, entityId, callback);
+ return;
+ default:
+ //TODO: add support of other entities
+ throw new IllegalStateException("Not Implemented!");
+ }
+ }
+
+ private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ if (currentUser.isSystemAdmin()) {
+ callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+ } else {
+ ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
+ Futures.addCallback(deviceFuture, getCallback(callback, device -> {
+ if (device == null) {
+ return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
+ } else {
+ if (!device.getTenantId().equals(currentUser.getTenantId())) {
+ return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
+ } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
+ return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
+ } else {
+ return ValidationResult.ok();
+ }
+ }
+ }));
+ }
+ }
+
+ private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ if (currentUser.isSystemAdmin()) {
+ callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+ } else {
+ ListenableFuture<Asset> assetFuture = assetService.findAssetByIdAsync(new AssetId(entityId.getId()));
+ Futures.addCallback(assetFuture, getCallback(callback, asset -> {
+ if (asset == null) {
+ return ValidationResult.entityNotFound("Asset with requested id wasn't found!");
+ } else {
+ if (!asset.getTenantId().equals(currentUser.getTenantId())) {
+ return ValidationResult.accessDenied("Asset doesn't belong to the current Tenant!");
+ } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
+ return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
+ } else {
+ return ValidationResult.ok();
+ }
+ }
+ }));
+ }
+ }
+
+
+ private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ if (currentUser.isCustomerUser()) {
+ callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+ } else {
+ ListenableFuture<RuleChain> ruleChainFuture = ruleChainService.findRuleChainByIdAsync(new RuleChainId(entityId.getId()));
+ Futures.addCallback(ruleChainFuture, getCallback(callback, ruleChain -> {
+ if (ruleChain == null) {
+ return ValidationResult.entityNotFound("Rule chain with requested id wasn't found!");
+ } else {
+ if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
+ return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
+ } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
+ return ValidationResult.accessDenied("Rule chain is not in system scope!");
+ } else {
+ return ValidationResult.ok();
+ }
+ }
+ }));
+ }
+ }
+
+ private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ if (currentUser.isSystemAdmin()) {
+ callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+ } else {
+ ListenableFuture<Customer> customerFuture = customerService.findCustomerByIdAsync(new CustomerId(entityId.getId()));
+ Futures.addCallback(customerFuture, getCallback(callback, customer -> {
+ if (customer == null) {
+ return ValidationResult.entityNotFound("Customer with requested id wasn't found!");
+ } else {
+ if (!customer.getTenantId().equals(currentUser.getTenantId())) {
+ return ValidationResult.accessDenied("Customer doesn't belong to the current Tenant!");
+ } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
+ return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
+ } else {
+ return ValidationResult.ok();
+ }
+ }
+ }));
+ }
+ }
+
+ private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
+ if (currentUser.isCustomerUser()) {
+ callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
+ } else if (currentUser.isSystemAdmin()) {
+ callback.onSuccess(ValidationResult.ok());
+ } else {
+ ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
+ Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
+ if (tenant == null) {
+ return ValidationResult.entityNotFound("Tenant with requested id wasn't found!");
+ } else if (!tenant.getId().equals(currentUser.getTenantId())) {
+ return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
+ } else {
+ return ValidationResult.ok();
+ }
+ }));
+ }
+ }
+
+ private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {
+ return new FutureCallback<T>() {
+ @Override
+ public void onSuccess(@Nullable T result) {
+ callback.onSuccess(transformer.apply(result));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(t);
+ }
+ };
+ }
+
+ public static void handleError(Throwable e, final DeferredResult<ResponseEntity> response, HttpStatus defaultErrorStatus) {
+ ResponseEntity responseEntity;
+ if (e != null && e instanceof ToErrorResponseEntity) {
+ responseEntity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
+ } else if (e != null && e instanceof IllegalArgumentException) {
+ responseEntity = new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
+ } else {
+ responseEntity = new ResponseEntity<>(defaultErrorStatus);
+ }
+ response.setResult(responseEntity);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
new file mode 100644
index 0000000..359949e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -0,0 +1,42 @@
+package org.thingsboard.server.service.telemetry;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Service
+@Slf4j
+public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptionService {
+
+ @Autowired
+ private TelemetryWebSocketService wsService;
+
+
+ private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
+
+ private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
+
+
+
+ @Override
+ public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+
+ }
+
+ @Override
+ public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
+
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
new file mode 100644
index 0000000..6d6c33e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -0,0 +1,261 @@
+package org.thingsboard.server.service.telemetry;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
+import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
+import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Service
+@Slf4j
+public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
+
+ private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
+ private static final String PROCESSING_MSG = "[{}] Processing: {}";
+ private static final ObjectMapper jsonMapper = new ObjectMapper();
+ private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
+ private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
+ private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
+
+ private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>();
+
+ @Autowired
+ private TelemetrySubscriptionService subscriptionManager;
+
+ @Autowired
+ private TelemetryWebSocketMsgEndpoint msgEndpoint;
+
+ @Autowired
+ private AttributesService attributesService;
+
+ @Autowired
+ private TimeseriesService tsService;
+
+ @Override
+ public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
+ String sessionId = sessionRef.getSessionId();
+ log.debug(PROCESSING_MSG, sessionId, event);
+ switch (event.getEventType()) {
+ case ESTABLISHED:
+ wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
+ break;
+ case ERROR:
+ log.debug("[{}] Unknown websocket session error: {}. ", sessionId, event.getError().orElse(null));
+ break;
+ case CLOSED:
+ wsSessionsMap.remove(sessionId);
+ subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
+ break;
+ }
+ }
+
+ @Override
+ public void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg) {
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] Processing: {}", sessionRef.getSessionId(), msg);
+ }
+
+ try {
+ TelemetryPluginCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, TelemetryPluginCmdsWrapper.class);
+ if (cmdsWrapper != null) {
+ if (cmdsWrapper.getAttrSubCmds() != null) {
+ cmdsWrapper.getAttrSubCmds().forEach(cmd -> handleWsAttributesSubscriptionCmd(sessionRef, cmd));
+ }
+ if (cmdsWrapper.getTsSubCmds() != null) {
+ cmdsWrapper.getTsSubCmds().forEach(cmd -> handleWsTimeseriesSubscriptionCmd(sessionRef, cmd));
+ }
+ if (cmdsWrapper.getHistoryCmds() != null) {
+ cmdsWrapper.getHistoryCmds().forEach(cmd -> handleWsHistoryCmd(sessionRef, cmd));
+ }
+ }
+ } catch (IOException e) {
+ log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
+ SubscriptionUpdate update = new SubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
+ sendWsMsg(sessionRef, update);
+ }
+ }
+
+ private void handleWsAttributesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
+ String sessionId = sessionRef.getSessionId();
+ log.debug("[{}] Processing: {}", sessionId, cmd);
+
+ if (validateSessionMetadata(sessionRef, cmd, sessionId)) {
+ if (cmd.isUnsubscribe()) {
+ unsubscribe(sessionRef, cmd, sessionId);
+ } else if (validateSubscriptionCmd(sessionRef, cmd)) {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
+ log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId);
+ Optional<Set<String>> keysOptional = getKeys(cmd);
+ if (keysOptional.isPresent()) {
+ List<String> keys = new ArrayList<>(keysOptional.get());
+ handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys);
+ } else {
+ handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId);
+ }
+ }
+ }
+ }
+
+ private void handleWsAttributesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
+ AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId,
+ List<String> keys) {
+ FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(List<AttributeKvEntry> data) {
+ List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+ sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+
+ Map<String, Long> subState = new HashMap<>(keys.size());
+ keys.forEach(key -> subState.put(key, 0L));
+ attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
+ subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
+ SubscriptionUpdate update;
+ if (UnauthorizedException.class.isInstance(e)) {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+ SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+ } else {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ FAILED_TO_FETCH_ATTRIBUTES);
+ }
+ sendWsMsg(sessionRef, update);
+ }
+ };
+
+ if (StringUtils.isEmpty(cmd.getScope())) {
+ //ValidationCallback?
+ ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback);
+ } else {
+ ctx.loadAttributes(entityId, cmd.getScope(), keys, callback);
+ }
+ }
+
+ private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef,
+ AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
+ PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+ List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+ sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+
+ Map<String, Long> subState = new HashMap<>(attributesData.size());
+ attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
+ subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ FAILED_TO_FETCH_ATTRIBUTES);
+ sendWsMsg(ctx, sessionRef, update);
+ }
+ };
+
+ if (StringUtils.isEmpty(cmd.getScope())) {
+ ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback);
+ } else {
+ ctx.loadAttributes(entityId, cmd.getScope(), callback);
+ }
+ }
+
+ private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
+ if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
+ subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
+ } else {
+ subscriptionManager.removeSubscription(sessionId, cmd.getCmdId());
+ }
+ }
+
+ private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
+ if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
+ "Device id is empty!");
+ sendWsMsg(sessionRef, update);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean validateSessionMetadata(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
+ WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
+ if (sessionMD == null) {
+ log.warn("[{}] Session meta data not found. ", sessionId);
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ SESSION_META_DATA_NOT_FOUND);
+ sendWsMsg(sessionRef, update);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, SubscriptionUpdate update) {
+ try {
+ msgEndpoint.send(sessionRef, jsonMapper.writeValueAsString(update));
+ } catch (JsonProcessingException e) {
+ log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
+ } catch (IOException e) {
+ log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);
+ }
+ }
+
+ private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) {
+ if (!StringUtils.isEmpty(cmd.getKeys())) {
+ Set<String> keys = new HashSet<>();
+ Collections.addAll(keys, cmd.getKeys().split(","));
+ return Optional.of(keys);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
new file mode 100644
index 0000000..9673629
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -0,0 +1,24 @@
+package org.thingsboard.server.service.telemetry;
+
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public interface TelemetrySubscriptionService {
+
+ void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
+
+ void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries);
+
+ void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
+
+ void removeSubscription(String sessionId, int cmdId);
+
+ void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
new file mode 100644
index 0000000..a7e7cad
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketMsgEndpoint.java
@@ -0,0 +1,14 @@
+package org.thingsboard.server.service.telemetry;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public interface TelemetryWebSocketMsgEndpoint {
+
+ void send(TelemetryWebSocketSessionRef sessionRef, String msg) throws IOException;
+
+ void close(TelemetryWebSocketSessionRef sessionRef) throws IOException;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
new file mode 100644
index 0000000..883a174
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java
@@ -0,0 +1,13 @@
+package org.thingsboard.server.service.telemetry;
+
+import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public interface TelemetryWebSocketService {
+
+ void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent sessionEvent);
+
+ void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
new file mode 100644
index 0000000..3fd4b19
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java
@@ -0,0 +1,53 @@
+package org.thingsboard.server.service.telemetry;
+
+import lombok.Getter;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public class TelemetryWebSocketSessionRef {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter
+ private final String sessionId;
+ @Getter
+ private final SecurityUser securityCtx;
+ @Getter
+ private final InetSocketAddress localAddress;
+ @Getter
+ private final InetSocketAddress remoteAddress;
+
+ public TelemetryWebSocketSessionRef(String sessionId, SecurityUser securityCtx, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
+ this.sessionId = sessionId;
+ this.securityCtx = securityCtx;
+ this.localAddress = localAddress;
+ this.remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TelemetryWebSocketSessionRef that = (TelemetryWebSocketSessionRef) o;
+ return Objects.equals(sessionId, that.sessionId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sessionId);
+ }
+
+ @Override
+ public String toString() {
+ return "TelemetryWebSocketSessionRef{" +
+ "sessionId='" + sessionId + '\'' +
+ ", localAddress=" + localAddress +
+ ", remoteAddress=" + remoteAddress +
+ '}';
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
new file mode 100644
index 0000000..6d57122
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketTextMsg.java
@@ -0,0 +1,19 @@
+package org.thingsboard.server.service.telemetry;
+
+import lombok.Data;
+import lombok.Getter;
+import org.thingsboard.server.service.security.model.SecurityUser;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+@Data
+public class TelemetryWebSocketTextMsg {
+
+ private final TelemetryWebSocketSessionRef sessionRef;
+ private final String payload;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
new file mode 100644
index 0000000..c0b162b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/WsSessionMetaData.java
@@ -0,0 +1,38 @@
+package org.thingsboard.server.service.telemetry;
+
+import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
+
+/**
+ * Created by ashvayka on 27.03.18.
+ */
+public class WsSessionMetaData {
+ private TelemetryWebSocketSessionRef sessionRef;
+ private long lastActivityTime;
+
+ public WsSessionMetaData(TelemetryWebSocketSessionRef sessionRef) {
+ super();
+ this.sessionRef = sessionRef;
+ this.lastActivityTime = System.currentTimeMillis();
+ }
+
+ public TelemetryWebSocketSessionRef getSessionRef() {
+ return sessionRef;
+ }
+
+ public void setSessionRef(TelemetryWebSocketSessionRef sessionRef) {
+ this.sessionRef = sessionRef;
+ }
+
+ public long getLastActivityTime() {
+ return lastActivityTime;
+ }
+
+ public void setLastActivityTime(long lastActivityTime) {
+ this.lastActivityTime = lastActivityTime;
+ }
+
+ @Override
+ public String toString() {
+ return "WsSessionMetaData [sessionRef=" + sessionRef + ", lastActivityTime=" + lastActivityTime + "]";
+ }
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
index 76b3e33..31c1cda 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
@@ -33,6 +33,10 @@ public class EntityIdFactory {
return getByTypeAndUuid(EntityType.valueOf(type), uuid);
}
+ public static EntityId getByTypeAndUuid(EntityType type, String uuid) {
+ return getByTypeAndUuid(type, UUID.fromString(uuid));
+ }
+
public static EntityId getByTypeAndUuid(EntityType type, UUID uuid) {
switch (type) {
case TENANT:
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index c6e7a54..158945c 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -243,27 +243,19 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
switch (attr.getDataType()) {
case BOOLEAN:
Optional<Boolean> booleanValue = attr.getBooleanValue();
- if (booleanValue.isPresent()) {
- dataBuilder.setBoolValue(booleanValue.get());
- }
+ booleanValue.ifPresent(dataBuilder::setBoolValue);
break;
case LONG:
Optional<Long> longValue = attr.getLongValue();
- if (longValue.isPresent()) {
- dataBuilder.setLongValue(longValue.get());
- }
+ longValue.ifPresent(dataBuilder::setLongValue);
break;
case DOUBLE:
Optional<Double> doubleValue = attr.getDoubleValue();
- if (doubleValue.isPresent()) {
- dataBuilder.setDoubleValue(doubleValue.get());
- }
+ doubleValue.ifPresent(dataBuilder::setDoubleValue);
break;
case STRING:
Optional<String> stringValue = attr.getStrValue();
- if (stringValue.isPresent()) {
- dataBuilder.setStrValue(stringValue.get());
- }
+ stringValue.ifPresent(dataBuilder::setStrValue);
break;
}
return dataBuilder;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index ec00677..9d9c2f3 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -344,9 +344,7 @@ public class SubscriptionManager {
}
private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) {
- Iterator<Subscription> subscriptionIterator = subscriptions.iterator();
- while (subscriptionIterator.hasNext()) {
- Subscription s = subscriptionIterator.next();
+ for (Subscription s : subscriptions) {
if (s.isLocal()) {
if (s.getServer() != null) {
log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());