thingsboard-memoizeit
Changes
application/src/test/java/org/thingsboard/server/system/nosql/DeviceOfflineNoSqlTest.java 23(+23 -0)
common/data/src/main/java/org/thingsboard/server/common/data/device/DeviceStatusQuery.java 40(+40 -0)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java 8(+7 -1)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java 6(+5 -1)
transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 20(+13 -7)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 25(+23 -2)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java 7(+5 -2)
Details
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.cql b/application/src/main/data/upgrade/1.5.0/schema_update.cql
index ab68846..5cdaede 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.cql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.cql
@@ -93,3 +93,5 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
PRIMARY KEY (id)
);
+ALTER TABLE thingsboard.device ADD last_connect bigint;
+ALTER TABLE thingsboard.device ADD last_update bigint;
\ No newline at end of file
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.sql b/application/src/main/data/upgrade/1.5.0/schema_update.sql
index 2bed6ad..ab91166 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.sql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.sql
@@ -34,4 +34,7 @@ CREATE TABLE IF NOT EXISTS rule_node (
name varchar(255),
debug_mode boolean,
search_text varchar(255)
-);
\ No newline at end of file
+);
+
+ALTER TABLE device ADD COLUMN IF NOT EXISTS last_connect BIGINT;
+ALTER TABLE device ADD COLUMN IF NOT EXISTS last_update BIGINT;
\ No newline at end of file
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index c7574e8..29436c1 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -45,6 +45,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@@ -133,6 +134,9 @@ public abstract class BaseController {
@Autowired
protected AuditLogService auditLogService;
+ @Autowired
+ protected DeviceOfflineService offlineService;
+
@ExceptionHandler(ThingsboardException.class)
public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
errorResponseHandler.handle(ex, response);
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 1f4b02a..f97603e 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
+import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
+import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -34,8 +37,6 @@ import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
-import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
-import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
@@ -69,7 +70,7 @@ public class DeviceController extends BaseController {
device.setTenantId(getCurrentUser().getTenantId());
if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
if (device.getId() == null || device.getId().isNullUid() ||
- device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
+ device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
throw new ThingsboardException("You don't have permission to perform this operation!",
ThingsboardErrorCode.PERMISSION_DENIED);
} else {
@@ -367,4 +368,32 @@ public class DeviceController extends BaseController {
throw handleException(e);
}
}
+
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/device/offline", method = RequestMethod.GET)
+ @ResponseBody
+ public List<Device> getOfflineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
+ @RequestParam("threshold") long threshold) throws ThingsboardException {
+ try {
+ TenantId tenantId = getCurrentUser().getTenantId();
+ ListenableFuture<List<Device>> offlineDevices = offlineService.findOfflineDevices(tenantId.getId(), contactType, threshold);
+ return checkNotNull(offlineDevices.get());
+ } catch (Exception e) {
+ throw handleException(e);
+ }
+ }
+
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/device/online", method = RequestMethod.GET)
+ @ResponseBody
+ public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
+ @RequestParam("threshold") long threshold) throws ThingsboardException {
+ try {
+ TenantId tenantId = getCurrentUser().getTenantId();
+ ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
+ return checkNotNull(offlineDevices.get());
+ } catch (Exception e) {
+ throw handleException(e);
+ }
+ }
}
diff --git a/application/src/test/java/org/thingsboard/server/system/BaseDeviceOfflineTest.java b/application/src/test/java/org/thingsboard/server/system/BaseDeviceOfflineTest.java
new file mode 100644
index 0000000..5a09a68
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/system/BaseDeviceOfflineTest.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.system;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.controller.AbstractControllerTest;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class BaseDeviceOfflineTest extends AbstractControllerTest {
+
+ private Device deviceA;
+ private Device deviceB;
+ private DeviceCredentials credA;
+ private DeviceCredentials credB;
+
+ @Before
+ public void before() throws Exception {
+ loginTenantAdmin();
+ deviceA = createDevice("DevA", "VMS");
+ credA = getCredentials(deviceA.getUuidId());
+ deviceB = createDevice("DevB", "SOLAR");
+ credB = getCredentials(deviceB.getUuidId());
+ }
+
+ @Test
+ public void offlineDevicesCanBeFoundByLastConnectField() throws Exception {
+ makeDeviceContact(credA);
+ Thread.sleep(1000);
+ makeDeviceContact(credB);
+ Thread.sleep(100);
+ List<Device> devices = doGetTyped("/api/device/offline?contactType=CONNECT&threshold=700", new TypeReference<List<Device>>() {
+ });
+
+ assertEquals(devices.toString(),1, devices.size());
+ assertEquals("DevA", devices.get(0).getName());
+ }
+
+ @Test
+ public void offlineDevicesCanBeFoundByLastUpdateField() throws Exception {
+ makeDeviceUpdate(credA);
+ Thread.sleep(1000);
+ makeDeviceUpdate(credB);
+ makeDeviceContact(credA);
+ Thread.sleep(100);
+ List<Device> devices = doGetTyped("/api/device/offline?contactType=UPLOAD&threshold=700", new TypeReference<List<Device>>() {
+ });
+
+ assertEquals(devices.toString(),1, devices.size());
+ assertEquals("DevA", devices.get(0).getName());
+ }
+
+ @Test
+ public void onlineDevicesCanBeFoundByLastConnectField() throws Exception {
+ makeDeviceContact(credB);
+ Thread.sleep(1000);
+ makeDeviceContact(credA);
+ Thread.sleep(100);
+ List<Device> devices = doGetTyped("/api/device/online?contactType=CONNECT&threshold=700", new TypeReference<List<Device>>() {
+ });
+
+ assertEquals(devices.toString(),1, devices.size());
+ assertEquals("DevA", devices.get(0).getName());
+ }
+
+ @Test
+ public void onlineDevicesCanBeFoundByLastUpdateField() throws Exception {
+ makeDeviceUpdate(credB);
+ Thread.sleep(1000);
+ makeDeviceUpdate(credA);
+ makeDeviceContact(credB);
+ Thread.sleep(100);
+ List<Device> devices = doGetTyped("/api/device/online?contactType=UPLOAD&threshold=700", new TypeReference<List<Device>>() {
+ });
+
+ assertEquals(devices.toString(),1, devices.size());
+ assertEquals("DevA", devices.get(0).getName());
+ }
+
+ private Device createDevice(String name, String type) throws Exception {
+ Device device = new Device();
+ device.setName(name);
+ device.setType(type);
+ long currentTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10);
+ device.setLastConnectTs(currentTime);
+ device.setLastUpdateTs(currentTime);
+ return doPost("/api/device", device, Device.class);
+ }
+
+ private DeviceCredentials getCredentials(UUID deviceId) throws Exception {
+ return doGet("/api/device/" + deviceId.toString() + "/credentials", DeviceCredentials.class);
+ }
+
+ private void makeDeviceUpdate(DeviceCredentials credentials) throws Exception {
+ doPost("/api/v1/" + credentials.getCredentialsId() + "/attributes", ImmutableMap.of("keyA", "valueA"), new String[]{});
+ }
+
+ private void makeDeviceContact(DeviceCredentials credentials) throws Exception {
+ doGet("/api/v1/" + credentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC");
+ }
+}
diff --git a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
index 4fa6162..c3e87c2 100644
--- a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
+++ b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.system;
+import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.web.servlet.ResultActions;
@@ -28,6 +29,9 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -48,6 +52,9 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
device = new Device();
device.setName("My device");
device.setType("default");
+ long currentTime = System.currentTimeMillis();
+ device.setLastConnectTs(currentTime);
+ device.setLastUpdateTs(currentTime);
device = doPost("/api/device", device, Device.class);
deviceCredentials =
@@ -67,6 +74,34 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk());
}
+ @Test
+ public void deviceLastContactAndUpdateFieldsAreUpdated() throws Exception {
+ Device actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
+ Long initConnectTs = actualDevice.getLastConnectTs();
+ Long initUpdateTs = actualDevice.getLastUpdateTs();
+ assertNotNull(initConnectTs);
+ assertNotNull(initUpdateTs);
+ Thread.sleep(50);
+
+ doPost("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes", ImmutableMap.of("keyA", "valueA"), new String[]{});
+ actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
+ Long postConnectTs = actualDevice.getLastConnectTs();
+ Long postUpdateTs = actualDevice.getLastUpdateTs();
+ System.out.println(postConnectTs + " - " + postUpdateTs + " -> " + (postConnectTs - initConnectTs) + " : " + (postUpdateTs - initUpdateTs));
+ assertTrue(postConnectTs > initConnectTs);
+ assertEquals(postConnectTs, postUpdateTs);
+ Thread.sleep(50);
+
+ doGet("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC");
+ Thread.sleep(50);
+ actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
+ Long getConnectTs = actualDevice.getLastConnectTs();
+ Long getUpdateTs = actualDevice.getLastUpdateTs();
+ assertTrue(getConnectTs > postConnectTs);
+ assertEquals(getUpdateTs, postUpdateTs);
+
+ }
+
protected ResultActions doGetAsync(String urlTemplate, Object... urlVariables) throws Exception {
MockHttpServletRequestBuilder getRequest;
getRequest = get(urlTemplate, urlVariables);
diff --git a/application/src/test/java/org/thingsboard/server/system/nosql/DeviceOfflineNoSqlTest.java b/application/src/test/java/org/thingsboard/server/system/nosql/DeviceOfflineNoSqlTest.java
new file mode 100644
index 0000000..f25ec27
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/system/nosql/DeviceOfflineNoSqlTest.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.system.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.system.BaseDeviceOfflineTest;
+
+@DaoNoSqlTest
+public class DeviceOfflineNoSqlTest extends BaseDeviceOfflineTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/system/sql/DeviceOfflineSqlTest.java b/application/src/test/java/org/thingsboard/server/system/sql/DeviceOfflineSqlTest.java
new file mode 100644
index 0000000..a9617bc
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/system/sql/DeviceOfflineSqlTest.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.system.sql;
+
+import org.thingsboard.server.dao.service.DaoSqlTest;
+import org.thingsboard.server.system.BaseDeviceOfflineTest;
+
+@DaoSqlTest
+public class DeviceOfflineSqlTest extends BaseDeviceOfflineTest {
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
index 95662c1..6d257fc 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
@@ -31,6 +31,8 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
private CustomerId customerId;
private String name;
private String type;
+ private Long lastConnectTs;
+ private Long lastUpdateTs;
public Device() {
super();
@@ -81,6 +83,22 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
this.type = type;
}
+ public Long getLastConnectTs() {
+ return lastConnectTs;
+ }
+
+ public void setLastConnectTs(Long lastConnectTs) {
+ this.lastConnectTs = lastConnectTs;
+ }
+
+ public Long getLastUpdateTs() {
+ return lastUpdateTs;
+ }
+
+ public void setLastUpdateTs(Long lastUpdateTs) {
+ this.lastUpdateTs = lastUpdateTs;
+ }
+
@Override
public String getSearchText() {
return getName();
@@ -101,6 +119,10 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
builder.append(getAdditionalInfo());
builder.append(", createdTime=");
builder.append(createdTime);
+ builder.append(", lastUpdateTs=");
+ builder.append(lastUpdateTs);
+ builder.append(", lastConnectTs=");
+ builder.append(lastConnectTs);
builder.append(", id=");
builder.append(id);
builder.append("]");
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/DeviceStatusQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/DeviceStatusQuery.java
new file mode 100644
index 0000000..0d0dad1
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/DeviceStatusQuery.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.device;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@AllArgsConstructor
+@ToString
+public class DeviceStatusQuery {
+
+ private Status status;
+ private ContactType contactType;
+ private long threshold;
+
+
+ public enum Status {
+ ONLINE, OFFLINE
+ }
+
+ public enum ContactType {
+ CONNECT, UPLOAD
+ }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
index 641c464..0246da5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
@@ -15,9 +15,9 @@
*/
package org.thingsboard.server.dao.device;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.mapping.Result;
import com.google.common.base.Function;
@@ -28,9 +28,11 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.EntitySubtypeEntity;
+import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.nosql.DeviceEntity;
import org.thingsboard.server.dao.nosql.CassandraAbstractSearchTextDao;
import org.thingsboard.server.dao.util.NoSqlDao;
@@ -157,7 +159,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
if (result != null) {
List<EntitySubtype> entitySubtypes = new ArrayList<>();
result.all().forEach((entitySubtypeEntity) ->
- entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
+ entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
);
return entitySubtypes;
} else {
@@ -167,4 +169,68 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
});
}
+ @Override
+ public ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery) {
+ log.debug("Try to find [{}] devices by tenantId [{}]", statusQuery.getStatus(), tenantId);
+
+ Select select = select().from(DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME).allowFiltering();
+ Select.Where query = select.where();
+ query.and(eq(DEVICE_TENANT_ID_PROPERTY, tenantId));
+ Clause clause = statusClause(statusQuery);
+ query.and(clause);
+ return findListByStatementAsync(query);
+ }
+
+ @Override
+ public ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery) {
+ log.debug("Try to find [{}] devices by tenantId [{}] and type [{}]", statusQuery.getStatus(), tenantId, type);
+
+ Select select = select().from(DEVICE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME).allowFiltering();
+ Select.Where query = select.where()
+ .and(eq(DEVICE_TENANT_ID_PROPERTY, tenantId))
+ .and(eq(DEVICE_TYPE_PROPERTY, type));
+
+ query.and(statusClause(statusQuery));
+ return findListByStatementAsync(query);
+ }
+
+
+ @Override
+ public void saveDeviceStatus(Device device) {
+ PreparedStatement statement = prepare("insert into " +
+ "device (id, tenant_id, customer_id, type, last_connect, last_update) values (?, ?, ?, ?, ?, ?)");
+ BoundStatement boundStatement = statement.bind(device.getUuidId(), device.getTenantId().getId(), device.getCustomerId().getId(),
+ device.getType(), device.getLastConnectTs(), device.getLastUpdateTs());
+ ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
+ Futures.withFallback(resultSetFuture, t -> {
+ log.error("Can't update device status for [{}]", device, t);
+ throw new IllegalArgumentException("Can't update device status for {" + device + "}", t);
+ });
+ }
+
+ private String getStatusProperty(DeviceStatusQuery statusQuery) {
+ switch (statusQuery.getContactType()) {
+ case UPLOAD:
+ return DEVICE_LAST_UPDATE_PROPERTY;
+ case CONNECT:
+ return DEVICE_LAST_CONNECT_PROPERTY;
+ }
+ return null;
+ }
+
+ private Clause statusClause(DeviceStatusQuery statusQuery) {
+ long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
+ String statusProperty = getStatusProperty(statusQuery);
+ if (statusProperty != null) {
+ switch (statusQuery.getStatus()) {
+ case ONLINE:
+ return gt(statusProperty, minTime);
+ case OFFLINE:
+ return lt(statusProperty, minTime);
+ }
+ }
+ log.error("Could not build status query from [{}]", statusQuery);
+ throw new IllegalStateException("Could not build status query for device []");
+ }
+
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java
index dbc098e..2b9e522 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.Dao;
@@ -27,7 +28,6 @@ import java.util.UUID;
/**
* The Interface DeviceDao.
- *
*/
public interface DeviceDao extends Dao<Device> {
@@ -52,7 +52,7 @@ public interface DeviceDao extends Dao<Device> {
* Find devices by tenantId, type and page link.
*
* @param tenantId the tenantId
- * @param type the type
+ * @param type the type
* @param pageLink the page link
* @return the list of device objects
*/
@@ -61,7 +61,7 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId and devices Ids.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param deviceIds the device Ids
* @return the list of device objects
*/
@@ -70,9 +70,9 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId, customerId and page link.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param customerId the customerId
- * @param pageLink the page link
+ * @param pageLink the page link
* @return the list of device objects
*/
List<Device> findDevicesByTenantIdAndCustomerId(UUID tenantId, UUID customerId, TextPageLink pageLink);
@@ -80,10 +80,10 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId, customerId, type and page link.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param customerId the customerId
- * @param type the type
- * @param pageLink the page link
+ * @param type the type
+ * @param pageLink the page link
* @return the list of device objects
*/
List<Device> findDevicesByTenantIdAndCustomerIdAndType(UUID tenantId, UUID customerId, String type, TextPageLink pageLink);
@@ -92,9 +92,9 @@ public interface DeviceDao extends Dao<Device> {
/**
* Find devices by tenantId, customerId and devices Ids.
*
- * @param tenantId the tenantId
+ * @param tenantId the tenantId
* @param customerId the customerId
- * @param deviceIds the device Ids
+ * @param deviceIds the device Ids
* @return the list of device objects
*/
ListenableFuture<List<Device>> findDevicesByTenantIdCustomerIdAndIdsAsync(UUID tenantId, UUID customerId, List<UUID> deviceIds);
@@ -103,7 +103,7 @@ public interface DeviceDao extends Dao<Device> {
* Find devices by tenantId and device name.
*
* @param tenantId the tenantId
- * @param name the device name
+ * @param name the device name
* @return the optional device object
*/
Optional<Device> findDeviceByTenantIdAndName(UUID tenantId, String name);
@@ -114,4 +114,31 @@ public interface DeviceDao extends Dao<Device> {
* @return the list of tenant device type objects
*/
ListenableFuture<List<EntitySubtype>> findTenantDeviceTypesAsync(UUID tenantId);
+
+ /**
+ * Find devices by tenantId, statusQuery and page link.
+ *
+ * @param tenantId the tenantId
+ * @param statusQuery the page link
+ * @return the list of device objects
+ */
+ ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery);
+
+ /**
+ * Find devices by tenantId, type, statusQuery and page link.
+ *
+ * @param tenantId the tenantId
+ * @param type the type
+ * @param statusQuery the page link
+ * @return the list of device objects
+ */
+ ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery);
+
+
+ /**
+ * Update device last contact and update timestamp async
+ *
+ * @param device the device object
+ */
+ void saveDeviceStatus(Device device);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceOfflineService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceOfflineService.java
new file mode 100644
index 0000000..3bf3662
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceOfflineService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.device;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
+
+import java.util.List;
+import java.util.UUID;
+
+public interface DeviceOfflineService {
+
+ void online(Device device, boolean isUpdate);
+
+ void offline(Device device);
+
+ ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold);
+
+ ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold);
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceOfflineServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceOfflineServiceImpl.java
new file mode 100644
index 0000000..f4d8e61
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceOfflineServiceImpl.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.device;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.thingsboard.server.common.data.device.DeviceStatusQuery.Status.OFFLINE;
+import static org.thingsboard.server.common.data.device.DeviceStatusQuery.Status.ONLINE;
+
+@Service
+public class DeviceOfflineServiceImpl implements DeviceOfflineService {
+
+ @Autowired
+ private DeviceDao deviceDao;
+
+ @Override
+ public void online(Device device, boolean isUpdate) {
+ long current = System.currentTimeMillis();
+ device.setLastConnectTs(current);
+ if(isUpdate) {
+ device.setLastUpdateTs(current);
+ }
+ deviceDao.saveDeviceStatus(device);
+ }
+
+ @Override
+ public void offline(Device device) {
+ online(device, false);
+ }
+
+ @Override
+ public ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
+ DeviceStatusQuery statusQuery = new DeviceStatusQuery(OFFLINE, contactType, offlineThreshold);
+ return deviceDao.findDevicesByTenantIdAndStatus(tenantId, statusQuery);
+ }
+
+ @Override
+ public ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
+ DeviceStatusQuery statusQuery = new DeviceStatusQuery(ONLINE, contactType, offlineThreshold);
+ return deviceDao.findDevicesByTenantIdAndStatus(tenantId, statusQuery);
+ }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 8c34cd3..52b15ef 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -133,6 +133,8 @@ public class ModelConstants {
public static final String DEVICE_NAME_PROPERTY = "name";
public static final String DEVICE_TYPE_PROPERTY = "type";
public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
+ public static final String DEVICE_LAST_CONNECT_PROPERTY = "last_connect";
+ public static final String DEVICE_LAST_UPDATE_PROPERTY = "last_update";
public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text";
public static final String DEVICE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_by_type_and_search_text";
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java
index ef0c5fe..7458e56 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/DeviceEntity.java
@@ -63,6 +63,12 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
@Column(name = DEVICE_ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
private JsonNode additionalInfo;
+ @Column(name = DEVICE_LAST_CONNECT_PROPERTY)
+ private Long lastConnectTs;
+
+ @Column(name = DEVICE_LAST_UPDATE_PROPERTY)
+ private Long lastUpdateTs;
+
public DeviceEntity() {
super();
}
@@ -80,6 +86,8 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
this.name = device.getName();
this.type = device.getType();
this.additionalInfo = device.getAdditionalInfo();
+ this.lastConnectTs = device.getLastConnectTs();
+ this.lastUpdateTs = device.getLastUpdateTs();
}
public UUID getId() {
@@ -129,7 +137,23 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
public void setAdditionalInfo(JsonNode additionalInfo) {
this.additionalInfo = additionalInfo;
}
-
+
+ public Long getLastConnectTs() {
+ return lastConnectTs;
+ }
+
+ public void setLastConnectTs(Long lastConnectTs) {
+ this.lastConnectTs = lastConnectTs;
+ }
+
+ public Long getLastUpdateTs() {
+ return lastUpdateTs;
+ }
+
+ public void setLastUpdateTs(Long lastUpdateTs) {
+ this.lastUpdateTs = lastUpdateTs;
+ }
+
@Override
public String getSearchTextSource() {
return getName();
@@ -157,6 +181,8 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
device.setName(name);
device.setType(type);
device.setAdditionalInfo(additionalInfo);
+ device.setLastConnectTs(lastConnectTs);
+ device.setLastUpdateTs(lastUpdateTs);
return device;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java
index 7aaf0ae..e831c6e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/DeviceEntity.java
@@ -34,6 +34,9 @@ import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;
+import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_LAST_CONNECT_PROPERTY;
+import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_LAST_UPDATE_PROPERTY;
+
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@@ -60,6 +63,12 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
@Column(name = ModelConstants.DEVICE_ADDITIONAL_INFO_PROPERTY)
private JsonNode additionalInfo;
+ @Column(name = DEVICE_LAST_CONNECT_PROPERTY)
+ private Long lastConnectTs;
+
+ @Column(name = DEVICE_LAST_UPDATE_PROPERTY)
+ private Long lastUpdateTs;
+
public DeviceEntity() {
super();
}
@@ -77,6 +86,8 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
this.name = device.getName();
this.type = device.getType();
this.additionalInfo = device.getAdditionalInfo();
+ this.lastConnectTs = device.getLastConnectTs();
+ this.lastUpdateTs = device.getLastUpdateTs();
}
@Override
@@ -102,6 +113,8 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
device.setName(name);
device.setType(type);
device.setAdditionalInfo(additionalInfo);
+ device.setLastConnectTs(lastConnectTs);
+ device.setLastUpdateTs(lastUpdateTs);
return device;
}
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java
index 3bab1c5..a48805b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java
@@ -79,4 +79,28 @@ public interface DeviceRepository extends CrudRepository<DeviceEntity, String> {
List<DeviceEntity> findDevicesByTenantIdAndCustomerIdAndIdIn(String tenantId, String customerId, List<String> deviceIds);
List<DeviceEntity> findDevicesByTenantIdAndIdIn(String tenantId, List<String> deviceIds);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs > :time")
+ List<DeviceEntity> findConnectOnlineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs < :time")
+ List<DeviceEntity> findConnectOfflineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs > :time")
+ List<DeviceEntity> findUpdateOnlineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs < :time")
+ List<DeviceEntity> findUpdateOfflineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs > :time AND d.type = :type")
+ List<DeviceEntity> findConnectOnlineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs < :time AND d.type = :type")
+ List<DeviceEntity> findConnectOfflineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs > :time AND d.type = :type")
+ List<DeviceEntity> findUpdateOnlineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
+
+ @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs < :time AND d.type = :type")
+ List<DeviceEntity> findUpdateOfflineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java
index 4f3cd7d..baba659 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java
@@ -15,7 +15,9 @@
*/
package org.thingsboard.server.dao.sql.device;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.repository.CrudRepository;
@@ -24,6 +26,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.UUIDConverter;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.dao.DaoUtil;
@@ -43,6 +46,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID_STR;
*/
@Component
@SqlDao
+@Slf4j
public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device> implements DeviceDao {
@Autowired
@@ -124,6 +128,73 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
return service.submit(() -> convertTenantDeviceTypesToDto(tenantId, deviceRepository.findTenantDeviceTypes(fromTimeUUID(tenantId))));
}
+ @Override
+ public ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery) {
+ String strTenantId = fromTimeUUID(tenantId);
+ long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
+ switch (statusQuery.getStatus()) {
+ case OFFLINE: {
+ switch (statusQuery.getContactType()) {
+ case UPLOAD:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOfflineByTenantId(strTenantId, minTime)));
+ case CONNECT:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOfflineByTenantId(strTenantId, minTime)));
+ }
+ break;
+ }
+ case ONLINE: {
+ switch (statusQuery.getContactType()) {
+ case UPLOAD:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOnlineByTenantId(strTenantId, minTime)));
+ case CONNECT:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOnlineByTenantId(strTenantId, minTime)));
+ }
+ break;
+ }
+ }
+
+ log.error("Could not build status query from [{}]", statusQuery);
+ throw new IllegalStateException("Could not build status query for device []");
+ }
+
+ @Override
+ public ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery) {
+ String strTenantId = fromTimeUUID(tenantId);
+ long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
+ switch (statusQuery.getStatus()) {
+ case OFFLINE: {
+ switch (statusQuery.getContactType()) {
+ case UPLOAD:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOfflineByTenantIdAndType(strTenantId, minTime, type)));
+ case CONNECT:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOfflineByTenantIdAndType(strTenantId, minTime, type)));
+ }
+ break;
+ }
+ case ONLINE: {
+ switch (statusQuery.getContactType()) {
+ case UPLOAD:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOnlineByTenantIdAndType(strTenantId, minTime, type)));
+ case CONNECT:
+ return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOnlineByTenantIdAndType(strTenantId, minTime, type)));
+ }
+ break;
+ }
+ }
+
+ log.error("Could not build status query from [{}]", statusQuery);
+ throw new IllegalStateException("Could not build status query for device []");
+ }
+
+ @Override
+ public void saveDeviceStatus(Device device) {
+ ListenableFuture<Device> future = service.submit(() -> save(device));
+ Futures.withFallback(future, t -> {
+ log.error("Can't update device status for [{}]", device, t);
+ throw new IllegalArgumentException("Can't update device status for {" + device + "}", t);
+ });
+ }
+
private List<EntitySubtype> convertTenantDeviceTypesToDto(UUID tenantId, List<String> types) {
List<EntitySubtype> list = Collections.emptyList();
if (types != null && !types.isEmpty()) {
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index d0e62b2..fb0619b 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -159,6 +159,8 @@ CREATE TABLE IF NOT EXISTS thingsboard.device (
type text,
search_text text,
additional_info text,
+ last_connect bigint,
+ last_update bigint,
PRIMARY KEY (id, tenant_id, customer_id, type)
);
diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql
index d7a0978..08fe7fa 100644
--- a/dao/src/main/resources/sql/schema.sql
+++ b/dao/src/main/resources/sql/schema.sql
@@ -118,7 +118,9 @@ CREATE TABLE IF NOT EXISTS device (
type varchar(255),
name varchar(255),
search_text varchar(255),
- tenant_id varchar(31)
+ tenant_id varchar(31),
+ last_connect bigint,
+ last_update bigint
);
CREATE TABLE IF NOT EXISTS device_credentials (
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 7ba5e36..af07737 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
@@ -53,15 +54,17 @@ public class CoapTransportResource extends CoapResource {
private final SessionMsgProcessor processor;
private final DeviceAuthService authService;
private final QuotaService quotaService;
+ private final DeviceOfflineService offlineService;
private final Field observerField;
private final long timeout;
public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name,
- long timeout, QuotaService quotaService) {
+ long timeout, QuotaService quotaService, DeviceOfflineService offlineService) {
super(name);
this.processor = processor;
this.authService = authService;
this.quotaService = quotaService;
+ this.offlineService = offlineService;
this.adaptor = adaptor;
this.timeout = timeout;
// This is important to turn off existing observable logic in
@@ -168,6 +171,7 @@ public class CoapTransportResource extends CoapResource {
case TO_SERVER_RPC_REQUEST:
ctx.setSessionType(SessionType.SYNC);
msg = adaptor.convertToActorMsg(ctx, type, request);
+ offlineService.online(ctx.getDevice(), true);
break;
case SUBSCRIBE_ATTRIBUTES_REQUEST:
case SUBSCRIBE_RPC_COMMANDS_REQUEST:
@@ -175,11 +179,13 @@ public class CoapTransportResource extends CoapResource {
advanced.setObserver(new CoapExchangeObserverProxy(systemObserver, ctx));
ctx.setSessionType(SessionType.ASYNC);
msg = adaptor.convertToActorMsg(ctx, type, request);
+ offlineService.online(ctx.getDevice(), false);
break;
case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
ctx.setSessionType(SessionType.ASYNC);
msg = adaptor.convertToActorMsg(ctx, type, request);
+ offlineService.online(ctx.getDevice(), false);
break;
default:
log.trace("[{}] Unsupported msg type: {}", ctx.getSessionId(), type);
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
index 15706d4..4037ee7 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
@@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import javax.annotation.PostConstruct;
@@ -57,6 +58,9 @@ public class CoapTransportService {
@Autowired(required = false)
private QuotaService quotaService;
+ @Autowired(required = false)
+ private DeviceOfflineService offlineService;
+
@Value("${coap.bind_address}")
private String host;
@@ -86,7 +90,7 @@ public class CoapTransportService {
private void createResources() {
CoapResource api = new CoapResource(API);
- api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService));
+ api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService, offlineService));
server.add(api);
}
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 072c735..fd0346a 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.transport.coap;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResponse;
@@ -31,6 +32,7 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.device.DeviceStatusQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -51,6 +53,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import java.util.ArrayList;
import java.util.List;
@@ -137,6 +140,31 @@ public class CoapServerTest {
public static QuotaService quotaService() {
return key -> false;
}
+
+ @Bean
+ public static DeviceOfflineService offlineService() {
+ return new DeviceOfflineService() {
+ @Override
+ public void online(Device device, boolean isUpdate) {
+
+ }
+
+ @Override
+ public void offline(Device device) {
+
+ }
+
+ @Override
+ public ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
+ return null;
+ }
+ };
+ }
}
@Autowired
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 4d90b5f..03a4201 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -26,6 +26,7 @@ import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
+import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
@@ -36,6 +37,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
import javax.servlet.http.HttpServletRequest;
@@ -63,6 +65,9 @@ public class DeviceApiController {
@Autowired(required = false)
private QuotaService quotaService;
+ @Autowired(required = false)
+ private DeviceOfflineService offlineService;
+
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
@@ -82,7 +87,7 @@ public class DeviceApiController {
Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
}
- process(ctx, request);
+ process(ctx, request, false);
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
@@ -100,7 +105,7 @@ public class DeviceApiController {
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
- process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
+ process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)), true);
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -120,7 +125,7 @@ public class DeviceApiController {
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
- process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
+ process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)), true);
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -150,7 +155,7 @@ public class DeviceApiController {
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
JsonObject response = new JsonParser().parse(json).getAsJsonObject();
- process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
+ process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()), true);
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -173,7 +178,7 @@ public class DeviceApiController {
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
process(ctx, new ToServerRpcRequestMsg(0,
request.get("method").getAsString(),
- request.get("params").toString()));
+ request.get("params").toString()), true);
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -199,7 +204,7 @@ public class DeviceApiController {
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
- process(ctx, msg);
+ process(ctx, msg, false);
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -217,9 +222,10 @@ public class DeviceApiController {
return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
}
- private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
+ private void process(HttpSessionCtx ctx, FromDeviceMsg request, boolean isUpdate) {
AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+ offlineService.online(ctx.getDevice(), isUpdate);
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 8766599..8d475a4 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -72,13 +73,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final DeviceAuthService authService;
private final RelationService relationService;
private final QuotaService quotaService;
+ private final DeviceOfflineService offlineService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
- MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
+ MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService, DeviceOfflineService offlineService) {
this.processor = processor;
this.deviceService = deviceService;
this.relationService = relationService;
@@ -88,6 +90,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
this.sslHandler = sslHandler;
this.quotaService = quotaService;
+ this.offlineService = offlineService;
}
@Override
@@ -129,11 +132,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+ offlineService.online(deviceSessionCtx.getDevice(), false);
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
+ offlineService.offline(deviceSessionCtx.getDevice());
}
break;
default:
@@ -185,23 +190,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
+ offlineService.online(deviceSessionCtx.getDevice(), true);
} else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
+ offlineService.online(deviceSessionCtx.getDevice(), true);
} else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
+ offlineService.online(deviceSessionCtx.getDevice(), false);
} else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
+ offlineService.online(deviceSessionCtx.getDevice(), true);
} else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
+ offlineService.online(deviceSessionCtx.getDevice(), true);
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@@ -250,6 +260,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
+ offlineService.online(deviceSessionCtx.getDevice(), false);
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
@@ -273,6 +284,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
+ offlineService.online(deviceSessionCtx.getDevice(), false);
}
private MqttMessage createUnSubAckMessage(int msgId) {
@@ -304,6 +316,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true;
checkGatewaySession();
+ offlineService.online(deviceSessionCtx.getDevice(), false);
}
}
@@ -315,6 +328,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true;
checkGatewaySession();
+ offlineService.online(deviceSessionCtx.getDevice(), false);
} else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
@@ -365,6 +379,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[{}] Unexpected Exception", sessionId, cause);
ctx.close();
+ if(deviceSessionCtx.getDevice() != null) {
+ offlineService.offline(deviceSessionCtx.getDevice());
+ }
}
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
@@ -403,7 +420,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (infoNode != null) {
JsonNode gatewayNode = infoNode.get("gateway");
if (gatewayNode != null && gatewayNode.asBoolean()) {
- gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
+ gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService,
+ relationService, deviceSessionCtx, offlineService);
}
}
}
@@ -411,5 +429,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
+ if(deviceSessionCtx.getDevice() != null) {
+ offlineService.offline(deviceSessionCtx.getDevice());
+ }
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 976d8ba..94cf940 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -24,6 +24,7 @@ import io.netty.handler.ssl.SslHandler;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -42,10 +43,11 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
private final QuotaService quotaService;
+ private final DeviceOfflineService offlineService;
public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider,
- QuotaService quotaService) {
+ QuotaService quotaService, DeviceOfflineService offlineService) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
@@ -53,6 +55,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
this.quotaService = quotaService;
+ this.offlineService = offlineService;
}
@Override
@@ -67,7 +70,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
- adaptor, sslHandler, quotaService);
+ adaptor, sslHandler, quotaService, offlineService);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 1ae7d38..90b4591 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -30,6 +30,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -69,6 +70,9 @@ public class MqttTransportService {
@Autowired(required = false)
private QuotaService quotaService;
+ @Autowired(required = false)
+ private DeviceOfflineService offlineService;
+
@Value("${mqtt.bind_address}")
private String host;
@Value("${mqtt.bind_port}")
@@ -106,7 +110,7 @@ public class MqttTransportService {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService,
- adaptor, sslHandlerProvider, quotaService));
+ adaptor, sslHandlerProvider, quotaService, offlineService));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 7eda5bd..b4dd8db 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceOfflineService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@@ -61,14 +62,17 @@ public class GatewaySessionCtx {
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
+ private final DeviceOfflineService offlineService;
private final Map<String, GatewayDeviceSessionCtx> devices;
private ChannelHandlerContext channel;
- public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
+ public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
+ RelationService relationService, DeviceSessionCtx gatewaySessionCtx, DeviceOfflineService offlineService) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
+ this.offlineService = offlineService;
this.gateway = gatewaySessionCtx.getDevice();
this.gatewaySessionId = gatewaySessionCtx.getSessionId();
this.devices = new HashMap<>();
@@ -98,6 +102,7 @@ public class GatewaySessionCtx {
log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+ offlineService.online(device, false);
}
}
@@ -107,6 +112,7 @@ public class GatewaySessionCtx {
if (deviceSessionCtx != null) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
deviceSessionCtx.setClosed(true);
+ offlineService.offline(deviceSessionCtx.getDevice());
log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
} else {
log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
@@ -117,6 +123,7 @@ public class GatewaySessionCtx {
public void onGatewayDisconnect() {
devices.forEach((k, v) -> {
processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
+ offlineService.offline(v.getDevice());
});
}
@@ -138,6 +145,7 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+ offlineService.online(deviceSessionCtx.getDevice(), true);
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -154,6 +162,7 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
+ offlineService.online(deviceSessionCtx.getDevice(), true);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
@@ -176,6 +185,7 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+ offlineService.online(deviceSessionCtx.getDevice(), true);
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -210,6 +220,7 @@ public class GatewaySessionCtx {
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
ack(msg);
+ offlineService.online(deviceSessionCtx.getDevice(), false);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}