thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java 6(+1 -5)
application/src/test/java/org/thingsboard/server/system/nosql/DeviceOfflineNoSqlTest.java 23(+0 -23)
common/data/src/main/java/org/thingsboard/server/common/data/device/DeviceStatusQuery.java 40(+0 -40)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java 9(+6 -3)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java 8(+1 -7)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java 6(+1 -5)
transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 20(+7 -13)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 25(+2 -23)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java 7(+2 -5)
Details
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.cql b/application/src/main/data/upgrade/1.5.0/schema_update.cql
index 9481fc0..0a3f42b 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.cql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.cql
@@ -94,5 +94,3 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
PRIMARY KEY (id)
);
-ALTER TABLE thingsboard.device ADD last_connect bigint;
-ALTER TABLE thingsboard.device ADD last_update bigint;
\ No newline at end of file
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.sql b/application/src/main/data/upgrade/1.5.0/schema_update.sql
index ef5f6db..c3c5ade 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.sql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.sql
@@ -35,7 +35,4 @@ CREATE TABLE IF NOT EXISTS rule_node (
name varchar(255),
debug_mode boolean,
search_text varchar(255)
-);
-
-ALTER TABLE device ADD COLUMN IF NOT EXISTS last_connect BIGINT;
-ALTER TABLE device ADD COLUMN IF NOT EXISTS last_update BIGINT;
\ No newline at end of file
+);
\ No newline at end of file
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index a35e5dc..f24c77a 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -45,7 +45,6 @@ import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@@ -136,9 +135,6 @@ public abstract class BaseController {
protected AuditLogService auditLogService;
@Autowired
- protected DeviceOfflineService offlineService;
-
- @Autowired
protected DeviceStateService deviceStateService;
@ExceptionHandler(ThingsboardException.class)
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 5322b2c..e1023ca 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -25,9 +25,6 @@ import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
-import org.thingsboard.server.common.data.device.DeviceStatusQuery;
-import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
-import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -37,6 +34,8 @@ import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
+import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
+import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
@@ -70,7 +69,7 @@ public class DeviceController extends BaseController {
device.setTenantId(getCurrentUser().getTenantId());
if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
if (device.getId() == null || device.getId().isNullUid() ||
- device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
+ device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
throw new ThingsboardException("You don't have permission to perform this operation!",
ThingsboardErrorCode.PERMISSION_DENIED);
} else {
@@ -374,32 +373,4 @@ public class DeviceController extends BaseController {
throw handleException(e);
}
}
-
- @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
- @RequestMapping(value = "/device/offline", method = RequestMethod.GET)
- @ResponseBody
- public List<Device> getOfflineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
- @RequestParam("threshold") long threshold) throws ThingsboardException {
- try {
- TenantId tenantId = getCurrentUser().getTenantId();
- ListenableFuture<List<Device>> offlineDevices = offlineService.findOfflineDevices(tenantId.getId(), contactType, threshold);
- return checkNotNull(offlineDevices.get());
- } catch (Exception e) {
- throw handleException(e);
- }
- }
-
- @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
- @RequestMapping(value = "/device/online", method = RequestMethod.GET)
- @ResponseBody
- public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
- @RequestParam("threshold") long threshold) throws ThingsboardException {
- try {
- TenantId tenantId = getCurrentUser().getTenantId();
- ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
- return checkNotNull(offlineDevices.get());
- } catch (Exception e) {
- throw handleException(e);
- }
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
index fc17ccd..b65e9c1 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -262,7 +262,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
state.setInactivityTimeout(inactivityTimeout);
boolean oldActive = state.isActive();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
- if (!oldActive && state.isActive()) {
+ if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
}
}
@@ -333,10 +333,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
});
}
- private long getLastPersistTime(List<AttributeKvEntry> attributes) {
- return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L);
- }
-
private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
for (AttributeKvEntry attribute : attributes) {
if (attribute.getKey().equals(attributeName)) {
diff --git a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
index c3e87c2..4fa6162 100644
--- a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
+++ b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.server.system;
-import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.web.servlet.ResultActions;
@@ -29,9 +28,6 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -52,9 +48,6 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
device = new Device();
device.setName("My device");
device.setType("default");
- long currentTime = System.currentTimeMillis();
- device.setLastConnectTs(currentTime);
- device.setLastUpdateTs(currentTime);
device = doPost("/api/device", device, Device.class);
deviceCredentials =
@@ -74,34 +67,6 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk());
}
- @Test
- public void deviceLastContactAndUpdateFieldsAreUpdated() throws Exception {
- Device actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
- Long initConnectTs = actualDevice.getLastConnectTs();
- Long initUpdateTs = actualDevice.getLastUpdateTs();
- assertNotNull(initConnectTs);
- assertNotNull(initUpdateTs);
- Thread.sleep(50);
-
- doPost("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes", ImmutableMap.of("keyA", "valueA"), new String[]{});
- actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
- Long postConnectTs = actualDevice.getLastConnectTs();
- Long postUpdateTs = actualDevice.getLastUpdateTs();
- System.out.println(postConnectTs + " - " + postUpdateTs + " -> " + (postConnectTs - initConnectTs) + " : " + (postUpdateTs - initUpdateTs));
- assertTrue(postConnectTs > initConnectTs);
- assertEquals(postConnectTs, postUpdateTs);
- Thread.sleep(50);
-
- doGet("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC");
- Thread.sleep(50);
- actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
- Long getConnectTs = actualDevice.getLastConnectTs();
- Long getUpdateTs = actualDevice.getLastUpdateTs();
- assertTrue(getConnectTs > postConnectTs);
- assertEquals(getUpdateTs, postUpdateTs);
-
- }
-
protected ResultActions doGetAsync(String urlTemplate, Object... urlVariables) throws Exception {
MockHttpServletRequestBuilder getRequest;
getRequest = get(urlTemplate, urlVariables);
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
index 6d257fc..95662c1 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
@@ -31,8 +31,6 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
private CustomerId customerId;
private String name;
private String type;
- private Long lastConnectTs;
- private Long lastUpdateTs;
public Device() {
super();
@@ -83,22 +81,6 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
this.type = type;
}
- public Long getLastConnectTs() {
- return lastConnectTs;
- }
-
- public void setLastConnectTs(Long lastConnectTs) {
- this.lastConnectTs = lastConnectTs;
- }
-
- public Long getLastUpdateTs() {
- return lastUpdateTs;
- }
-
- public void setLastUpdateTs(Long lastUpdateTs) {
- this.lastUpdateTs = lastUpdateTs;
- }
-
@Override
public String getSearchText() {
return getName();
@@ -119,10 +101,6 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
builder.append(getAdditionalInfo());
builder.append(", createdTime=");
builder.append(createdTime);
- builder.append(", lastUpdateTs=");
- builder.append(lastUpdateTs);
- builder.append(", lastConnectTs=");
- builder.append(lastConnectTs);
builder.append(", id=");
builder.append(id);
builder.append("]");
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
index 0246da5..641c464 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
@@ -15,9 +15,9 @@
*/
package org.thingsboard.server.dao.device;
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.querybuilder.Clause;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.mapping.Result;
import com.google.common.base.Function;
@@ -28,11 +28,9 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
-import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.EntitySubtypeEntity;
-import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.nosql.DeviceEntity;
import org.thingsboard.server.dao.nosql.CassandraAbstractSearchTextDao;
import org.thingsboard.server.dao.util.NoSqlDao;
@@ -159,7 +157,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
if (result != null) {
List<EntitySubtype> entitySubtypes = new ArrayList<>();
result.all().forEach((entitySubtypeEntity) ->
- entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
+ entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
);
return entitySubtypes;
} else {
@@ -169,68 +167,4 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
});
}
- @Override
- public ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery) {
- log.debug("Try to find [{}] devices by tenantId [{}]", statusQuery.getStatus(), tenantId);
-
- Select select = select().from(DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME).allowFiltering();
- Select.Where query = select.where();
- query.and(eq(DEVICE_TENANT_ID_PROPERTY, tenantId));
- Clause clause = statusClause(statusQuery);
- query.and(clause);
- return findListByStatementAsync(query);
- }
-
- @Override
- public ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery) {
- log.debug("Try to find [{}] devices by tenantId [{}] and type [{}]", statusQuery.getStatus(), tenantId, type);
-
- Select select = select().from(DEVICE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME).allowFiltering();
- Select.Where query = select.where()
- .and(eq(DEVICE_TENANT_ID_PROPERTY, tenantId))
- .and(eq(DEVICE_TYPE_PROPERTY, type));
-
- query.and(statusClause(statusQuery));
- return findListByStatementAsync(query);
- }
-
-
- @Override
- public void saveDeviceStatus(Device device) {
- PreparedStatement statement = prepare("insert into " +
- "device (id, tenant_id, customer_id, type, last_connect, last_update) values (?, ?, ?, ?, ?, ?)");
- BoundStatement boundStatement = statement.bind(device.getUuidId(), device.getTenantId().getId(), device.getCustomerId().getId(),
- device.getType(), device.getLastConnectTs(), device.getLastUpdateTs());
- ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
- Futures.withFallback(resultSetFuture, t -> {
- log.error("Can't update device status for [{}]", device, t);
- throw new IllegalArgumentException("Can't update device status for {" + device + "}", t);
- });
- }
-
- private String getStatusProperty(DeviceStatusQuery statusQuery) {
- switch (statusQuery.getContactType()) {
- case UPLOAD:
- return DEVICE_LAST_UPDATE_PROPERTY;
- case CONNECT:
- return DEVICE_LAST_CONNECT_PROPERTY;
- }
- return null;
- }
-
- private Clause statusClause(DeviceStatusQuery statusQuery) {
- long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
- String statusProperty = getStatusProperty(statusQuery);
- if (statusProperty != null) {
- switch (statusQuery.getStatus()) {
- case ONLINE:
- return gt(statusProperty, minTime);
- case OFFLINE:
- return lt(statusProperty, minTime);
- }
- }
- log.error("Could not build status query from [{}]", statusQuery);
- throw new IllegalStateException("Could not build status query for device []");
- }
-
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java
index 2b9e522..dbc098e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java
@@ -18,7 +18,6 @@ package org.thingsboard.server.dao.device;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
-import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.Dao;
@@ -28,6 +27,7 @@ import java.util.UUID;
/**
* The Interface DeviceDao.
+ *
*/
public interface DeviceDao extends Dao<Device> {
@@ -52,7 +52,7 @@ public interface DeviceDao extends Dao<Device> {
* Find devices by tenantId, type and page link.
*
* @param tenantId the tenantId
- * @param type the type
+ * @param type the type
* @param pageLink the page link
* @return the list of device objects
*/
@@ -61,7 +61,7 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId and devices Ids.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param deviceIds the device Ids
* @return the list of device objects
*/
@@ -70,9 +70,9 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId, customerId and page link.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param customerId the customerId
- * @param pageLink the page link
+ * @param pageLink the page link
* @return the list of device objects
*/
List<Device> findDevicesByTenantIdAndCustomerId(UUID tenantId, UUID customerId, TextPageLink pageLink);
@@ -80,10 +80,10 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId, customerId, type and page link.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param customerId the customerId
- * @param type the type
- * @param pageLink the page link
+ * @param type the type
+ * @param pageLink the page link
* @return the list of device objects
*/
List<Device> findDevicesByTenantIdAndCustomerIdAndType(UUID tenantId, UUID customerId, String type, TextPageLink pageLink);
@@ -92,9 +92,9 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId, customerId and devices Ids.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param customerId the customerId
- * @param deviceIds the device Ids
+ * @param deviceIds the device Ids
* @return the list of device objects
*/
ListenableFuture<List<Device>> findDevicesByTenantIdCustomerIdAndIdsAsync(UUID tenantId, UUID customerId, List<UUID> deviceIds);
@@ -103,7 +103,7 @@ public interface DeviceDao extends Dao<Device> {
* Find devices by tenantId and device name.
*
* @param tenantId the tenantId
- * @param name the device name
+ * @param name the device name
* @return the optional device object
*/
Optional<Device> findDeviceByTenantIdAndName(UUID tenantId, String name);
@@ -114,31 +114,4 @@ public interface DeviceDao extends Dao<Device> {
* @return the list of tenant device type objects
*/
ListenableFuture<List<EntitySubtype>> findTenantDeviceTypesAsync(UUID tenantId);
-
- /**
- * Find devices by tenantId, statusQuery and page link.
- *
- * @param tenantId the tenantId
- * @param statusQuery the page link
- * @return the list of device objects
- */
- ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery);
-
- /**
- * Find devices by tenantId, type, statusQuery and page link.
- *
- * @param tenantId the tenantId
- * @param type the type
- * @param statusQuery the page link
- * @return the list of device objects
- */
- ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery);
-
-
- /**
- * Update device last contact and update timestamp async
- *
- * @param device the device object
- */
- void saveDeviceStatus(Device device);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 37b9158..aff6381 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -133,8 +133,6 @@ public class ModelConstants {
public static final String DEVICE_NAME_PROPERTY = "name";
public static final String DEVICE_TYPE_PROPERTY = "type";
public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
- public static final String DEVICE_LAST_CONNECT_PROPERTY = "last_connect";
- public static final String DEVICE_LAST_UPDATE_PROPERTY = "last_update";
public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text";
public static final String DEVICE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_by_type_and_search_text";
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java
index 7458e56..ef0c5fe 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java
@@ -63,12 +63,6 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
@Column(name = DEVICE_ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
private JsonNode additionalInfo;
- @Column(name = DEVICE_LAST_CONNECT_PROPERTY)
- private Long lastConnectTs;
-
- @Column(name = DEVICE_LAST_UPDATE_PROPERTY)
- private Long lastUpdateTs;
-
public DeviceEntity() {
super();
}
@@ -86,8 +80,6 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
this.name = device.getName();
this.type = device.getType();
this.additionalInfo = device.getAdditionalInfo();
- this.lastConnectTs = device.getLastConnectTs();
- this.lastUpdateTs = device.getLastUpdateTs();
}
public UUID getId() {
@@ -137,23 +129,7 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
public void setAdditionalInfo(JsonNode additionalInfo) {
this.additionalInfo = additionalInfo;
}
-
- public Long getLastConnectTs() {
- return lastConnectTs;
- }
-
- public void setLastConnectTs(Long lastConnectTs) {
- this.lastConnectTs = lastConnectTs;
- }
-
- public Long getLastUpdateTs() {
- return lastUpdateTs;
- }
-
- public void setLastUpdateTs(Long lastUpdateTs) {
- this.lastUpdateTs = lastUpdateTs;
- }
-
+
@Override
public String getSearchTextSource() {
return getName();
@@ -181,8 +157,6 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
device.setName(name);
device.setType(type);
device.setAdditionalInfo(additionalInfo);
- device.setLastConnectTs(lastConnectTs);
- device.setLastUpdateTs(lastUpdateTs);
return device;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java
index e831c6e..7aaf0ae 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java
@@ -34,9 +34,6 @@ import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;
-import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_LAST_CONNECT_PROPERTY;
-import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_LAST_UPDATE_PROPERTY;
-
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@@ -63,12 +60,6 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
@Column(name = ModelConstants.DEVICE_ADDITIONAL_INFO_PROPERTY)
private JsonNode additionalInfo;
- @Column(name = DEVICE_LAST_CONNECT_PROPERTY)
- private Long lastConnectTs;
-
- @Column(name = DEVICE_LAST_UPDATE_PROPERTY)
- private Long lastUpdateTs;
-
public DeviceEntity() {
super();
}
@@ -86,8 +77,6 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
this.name = device.getName();
this.type = device.getType();
this.additionalInfo = device.getAdditionalInfo();
- this.lastConnectTs = device.getLastConnectTs();
- this.lastUpdateTs = device.getLastUpdateTs();
}
@Override
@@ -113,8 +102,6 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
device.setName(name);
device.setType(type);
device.setAdditionalInfo(additionalInfo);
- device.setLastConnectTs(lastConnectTs);
- device.setLastUpdateTs(lastUpdateTs);
return device;
}
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java
index a48805b..3bab1c5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java
@@ -79,28 +79,4 @@ public interface DeviceRepository extends CrudRepository<DeviceEntity, String> {
List<DeviceEntity> findDevicesByTenantIdAndCustomerIdAndIdIn(String tenantId, String customerId, List<String> deviceIds);
List<DeviceEntity> findDevicesByTenantIdAndIdIn(String tenantId, List<String> deviceIds);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs > :time")
- List<DeviceEntity> findConnectOnlineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs < :time")
- List<DeviceEntity> findConnectOfflineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs > :time")
- List<DeviceEntity> findUpdateOnlineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs < :time")
- List<DeviceEntity> findUpdateOfflineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs > :time AND d.type = :type")
- List<DeviceEntity> findConnectOnlineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs < :time AND d.type = :type")
- List<DeviceEntity> findConnectOfflineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs > :time AND d.type = :type")
- List<DeviceEntity> findUpdateOnlineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
-
- @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs < :time AND d.type = :type")
- List<DeviceEntity> findUpdateOfflineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java
index baba659..4f3cd7d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java
@@ -15,9 +15,7 @@
*/
package org.thingsboard.server.dao.sql.device;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.repository.CrudRepository;
@@ -26,7 +24,6 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.UUIDConverter;
-import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.DaoUtil;
@@ -46,7 +43,6 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID_STR;
*/
@Component
@SqlDao
-@Slf4j
public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device> implements DeviceDao {
@Autowired
@@ -128,73 +124,6 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
return service.submit(() -> convertTenantDeviceTypesToDto(tenantId, deviceRepository.findTenantDeviceTypes(fromTimeUUID(tenantId))));
}
- @Override
- public ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery) {
- String strTenantId = fromTimeUUID(tenantId);
- long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
- switch (statusQuery.getStatus()) {
- case OFFLINE: {
- switch (statusQuery.getContactType()) {
- case UPLOAD:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOfflineByTenantId(strTenantId, minTime)));
- case CONNECT:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOfflineByTenantId(strTenantId, minTime)));
- }
- break;
- }
- case ONLINE: {
- switch (statusQuery.getContactType()) {
- case UPLOAD:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOnlineByTenantId(strTenantId, minTime)));
- case CONNECT:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOnlineByTenantId(strTenantId, minTime)));
- }
- break;
- }
- }
-
- log.error("Could not build status query from [{}]", statusQuery);
- throw new IllegalStateException("Could not build status query for device []");
- }
-
- @Override
- public ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery) {
- String strTenantId = fromTimeUUID(tenantId);
- long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
- switch (statusQuery.getStatus()) {
- case OFFLINE: {
- switch (statusQuery.getContactType()) {
- case UPLOAD:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOfflineByTenantIdAndType(strTenantId, minTime, type)));
- case CONNECT:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOfflineByTenantIdAndType(strTenantId, minTime, type)));
- }
- break;
- }
- case ONLINE: {
- switch (statusQuery.getContactType()) {
- case UPLOAD:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOnlineByTenantIdAndType(strTenantId, minTime, type)));
- case CONNECT:
- return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOnlineByTenantIdAndType(strTenantId, minTime, type)));
- }
- break;
- }
- }
-
- log.error("Could not build status query from [{}]", statusQuery);
- throw new IllegalStateException("Could not build status query for device []");
- }
-
- @Override
- public void saveDeviceStatus(Device device) {
- ListenableFuture<Device> future = service.submit(() -> save(device));
- Futures.withFallback(future, t -> {
- log.error("Can't update device status for [{}]", device, t);
- throw new IllegalArgumentException("Can't update device status for {" + device + "}", t);
- });
- }
-
private List<EntitySubtype> convertTenantDeviceTypesToDto(UUID tenantId, List<String> types) {
List<EntitySubtype> list = Collections.emptyList();
if (types != null && !types.isEmpty()) {
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 24b2117..9150587 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -159,8 +159,6 @@ CREATE TABLE IF NOT EXISTS thingsboard.device (
type text,
search_text text,
additional_info text,
- last_connect bigint,
- last_update bigint,
PRIMARY KEY (id, tenant_id, customer_id, type)
);
diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql
index f2458e1..5876fbb 100644
--- a/dao/src/main/resources/sql/schema.sql
+++ b/dao/src/main/resources/sql/schema.sql
@@ -118,9 +118,7 @@ CREATE TABLE IF NOT EXISTS device (
type varchar(255),
name varchar(255),
search_text varchar(255),
- tenant_id varchar(31),
- last_connect bigint,
- last_update bigint
+ tenant_id varchar(31)
);
CREATE TABLE IF NOT EXISTS device_credentials (
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
index 6280827..19d4a49 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
@@ -59,6 +59,9 @@ public class TbMsgToEmailNodeTest {
initWithScript();
metaData.putValue("username", "oreo");
metaData.putValue("userEmail", "user@email.io");
+ metaData.putValue("name", "temp");
+ metaData.putValue("passed", "5");
+ metaData.putValue("count", "100");
TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
emailNode.onMsg(ctx, msg);
@@ -91,9 +94,9 @@ public class TbMsgToEmailNodeTest {
try {
TbMsgToEmailNodeConfiguration config = new TbMsgToEmailNodeConfiguration();
config.setFromTemplate("test@mail.org");
- config.setToTemplate("$metadata.userEmail");
- config.setSubjectTemplate("Hi $metadata.username there");
- config.setBodyTemplate("$msg.name is to high. Current $msg.passed and $msg.complex.count");
+ config.setToTemplate("${userEmail}");
+ config.setSubjectTemplate("Hi ${username} there");
+ config.setBodyTemplate("${name} is to high. Current ${passed} and ${count}");
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index d445aba..7674549 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -35,7 +35,6 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
@@ -54,17 +53,15 @@ public class CoapTransportResource extends CoapResource {
private final SessionMsgProcessor processor;
private final DeviceAuthService authService;
private final QuotaService quotaService;
- private final DeviceOfflineService offlineService;
private final Field observerField;
private final long timeout;
public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name,
- long timeout, QuotaService quotaService, DeviceOfflineService offlineService) {
+ long timeout, QuotaService quotaService) {
super(name);
this.processor = processor;
this.authService = authService;
this.quotaService = quotaService;
- this.offlineService = offlineService;
this.adaptor = adaptor;
this.timeout = timeout;
// This is important to turn off existing observable logic in
@@ -171,7 +168,6 @@ public class CoapTransportResource extends CoapResource {
case TO_SERVER_RPC_REQUEST:
ctx.setSessionType(SessionType.SYNC);
msg = adaptor.convertToActorMsg(ctx, type, request);
- offlineService.online(ctx.getDevice(), true);
break;
case SUBSCRIBE_ATTRIBUTES_REQUEST:
case SUBSCRIBE_RPC_COMMANDS_REQUEST:
@@ -179,13 +175,11 @@ public class CoapTransportResource extends CoapResource {
advanced.setObserver(new CoapExchangeObserverProxy(systemObserver, ctx));
ctx.setSessionType(SessionType.ASYNC);
msg = adaptor.convertToActorMsg(ctx, type, request);
- offlineService.online(ctx.getDevice(), false);
break;
case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
ctx.setSessionType(SessionType.ASYNC);
msg = adaptor.convertToActorMsg(ctx, type, request);
- offlineService.online(ctx.getDevice(), false);
break;
default:
log.trace("[{}] Unsupported msg type: {}", ctx.getSessionId(), type);
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
index 4037ee7..15706d4 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
@@ -27,7 +27,6 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import javax.annotation.PostConstruct;
@@ -58,9 +57,6 @@ public class CoapTransportService {
@Autowired(required = false)
private QuotaService quotaService;
- @Autowired(required = false)
- private DeviceOfflineService offlineService;
-
@Value("${coap.bind_address}")
private String host;
@@ -90,7 +86,7 @@ public class CoapTransportService {
private void createResources() {
CoapResource api = new CoapResource(API);
- api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService, offlineService));
+ api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService));
server.add(api);
}
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index df8545e..60b2220 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.server.transport.coap;
-import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResponse;
@@ -32,7 +31,6 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -53,7 +51,6 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import java.util.ArrayList;
import java.util.List;
@@ -140,31 +137,6 @@ public class CoapServerTest {
public static QuotaService quotaService() {
return key -> false;
}
-
- @Bean
- public static DeviceOfflineService offlineService() {
- return new DeviceOfflineService() {
- @Override
- public void online(Device device, boolean isUpdate) {
-
- }
-
- @Override
- public void offline(Device device) {
-
- }
-
- @Override
- public ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
- return null;
- }
-
- @Override
- public ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
- return null;
- }
- };
- }
}
@Autowired
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 03a4201..4d90b5f 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -26,7 +26,6 @@ import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
-import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
@@ -37,7 +36,6 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
import javax.servlet.http.HttpServletRequest;
@@ -65,9 +63,6 @@ public class DeviceApiController {
@Autowired(required = false)
private QuotaService quotaService;
- @Autowired(required = false)
- private DeviceOfflineService offlineService;
-
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
@@ -87,7 +82,7 @@ public class DeviceApiController {
Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
}
- process(ctx, request, false);
+ process(ctx, request);
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
@@ -105,7 +100,7 @@ public class DeviceApiController {
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
- process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)), true);
+ process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -125,7 +120,7 @@ public class DeviceApiController {
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
- process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)), true);
+ process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -155,7 +150,7 @@ public class DeviceApiController {
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
JsonObject response = new JsonParser().parse(json).getAsJsonObject();
- process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()), true);
+ process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -178,7 +173,7 @@ public class DeviceApiController {
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
process(ctx, new ToServerRpcRequestMsg(0,
request.get("method").getAsString(),
- request.get("params").toString()), true);
+ request.get("params").toString()));
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -204,7 +199,7 @@ public class DeviceApiController {
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
- process(ctx, msg, false);
+ process(ctx, msg);
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -222,10 +217,9 @@ public class DeviceApiController {
return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
}
- private void process(HttpSessionCtx ctx, FromDeviceMsg request, boolean isUpdate) {
+ private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
- offlineService.online(ctx.getDevice(), isUpdate);
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 4fa32c6..5ccce35 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -37,7 +37,6 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -73,14 +72,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final DeviceAuthService authService;
private final RelationService relationService;
private final QuotaService quotaService;
- private final DeviceOfflineService offlineService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
- MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService, DeviceOfflineService offlineService) {
+ MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
this.processor = processor;
this.deviceService = deviceService;
this.relationService = relationService;
@@ -90,7 +88,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
this.sslHandler = sslHandler;
this.quotaService = quotaService;
- this.offlineService = offlineService;
}
@Override
@@ -132,13 +129,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
- offlineService.online(deviceSessionCtx.getDevice(), false);
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
- offlineService.offline(deviceSessionCtx.getDevice());
}
break;
default:
@@ -190,28 +185,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
- offlineService.online(deviceSessionCtx.getDevice(), true);
} else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
- offlineService.online(deviceSessionCtx.getDevice(), true);
} else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
- offlineService.online(deviceSessionCtx.getDevice(), false);
} else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
- offlineService.online(deviceSessionCtx.getDevice(), true);
} else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
- offlineService.online(deviceSessionCtx.getDevice(), true);
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@@ -260,7 +250,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
- offlineService.online(deviceSessionCtx.getDevice(), false);
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
@@ -284,7 +273,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
- offlineService.online(deviceSessionCtx.getDevice(), false);
}
private MqttMessage createUnSubAckMessage(int msgId) {
@@ -316,7 +304,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true;
checkGatewaySession();
- offlineService.online(deviceSessionCtx.getDevice(), false);
}
}
@@ -328,7 +315,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true;
checkGatewaySession();
- offlineService.online(deviceSessionCtx.getDevice(), false);
} else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
@@ -379,9 +365,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[{}] Unexpected Exception", sessionId, cause);
ctx.close();
- if(deviceSessionCtx.getDevice() != null) {
- offlineService.offline(deviceSessionCtx.getDevice());
- }
}
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
@@ -420,8 +403,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (infoNode != null) {
JsonNode gatewayNode = infoNode.get("gateway");
if (gatewayNode != null && gatewayNode.asBoolean()) {
- gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService,
- relationService, deviceSessionCtx, offlineService);
+ gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
}
}
}
@@ -429,8 +411,5 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
- if(deviceSessionCtx.getDevice() != null) {
- offlineService.offline(deviceSessionCtx.getDevice());
- }
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 94cf940..976d8ba 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -24,7 +24,6 @@ import io.netty.handler.ssl.SslHandler;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -43,11 +42,10 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
private final QuotaService quotaService;
- private final DeviceOfflineService offlineService;
public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider,
- QuotaService quotaService, DeviceOfflineService offlineService) {
+ QuotaService quotaService) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
@@ -55,7 +53,6 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
this.quotaService = quotaService;
- this.offlineService = offlineService;
}
@Override
@@ -70,7 +67,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
- adaptor, sslHandler, quotaService, offlineService);
+ adaptor, sslHandler, quotaService);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 90b4591..1ae7d38 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -30,7 +30,6 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -70,9 +69,6 @@ public class MqttTransportService {
@Autowired(required = false)
private QuotaService quotaService;
- @Autowired(required = false)
- private DeviceOfflineService offlineService;
-
@Value("${mqtt.bind_address}")
private String host;
@Value("${mqtt.bind_port}")
@@ -110,7 +106,7 @@ public class MqttTransportService {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService,
- adaptor, sslHandlerProvider, quotaService, offlineService));
+ adaptor, sslHandlerProvider, quotaService));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index f35434a..2056452 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -36,7 +36,6 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@@ -62,17 +61,14 @@ public class GatewaySessionCtx {
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
- private final DeviceOfflineService offlineService;
private final Map<String, GatewayDeviceSessionCtx> devices;
private ChannelHandlerContext channel;
- public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
- RelationService relationService, DeviceSessionCtx gatewaySessionCtx, DeviceOfflineService offlineService) {
+ public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
- this.offlineService = offlineService;
this.gateway = gatewaySessionCtx.getDevice();
this.gatewaySessionId = gatewaySessionCtx.getSessionId();
this.devices = new HashMap<>();
@@ -102,7 +98,6 @@ public class GatewaySessionCtx {
log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
- offlineService.online(device, false);
}
}
@@ -112,7 +107,6 @@ public class GatewaySessionCtx {
if (deviceSessionCtx != null) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
deviceSessionCtx.setClosed(true);
- offlineService.offline(deviceSessionCtx.getDevice());
log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
} else {
log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
@@ -123,7 +117,6 @@ public class GatewaySessionCtx {
public void onGatewayDisconnect() {
devices.forEach((k, v) -> {
processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
- offlineService.offline(v.getDevice());
});
}
@@ -145,7 +138,6 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
- offlineService.online(deviceSessionCtx.getDevice(), true);
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -162,7 +154,6 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
- offlineService.online(deviceSessionCtx.getDevice(), true);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
@@ -185,7 +176,6 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
- offlineService.online(deviceSessionCtx.getDevice(), true);
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -220,7 +210,6 @@ public class GatewaySessionCtx {
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
ack(msg);
- offlineService.online(deviceSessionCtx.getDevice(), false);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}