thingsboard-memoizeit

Alarm Service implementation

5/24/2017 6:48:32 AM

Details

diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
index 21093ce..1f77904 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.common.data.id;
 
 import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.alarm.AlarmId;
 
 import java.util.UUID;
 
@@ -50,6 +51,8 @@ public class EntityIdFactory {
                 return new DeviceId(uuid);
             case ASSET:
                 return new AssetId(uuid);
+            case ALARM:
+                return new AlarmId(uuid);
         }
         throw new IllegalArgumentException("EntityType " + type + " is not supported!");
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
index 72ee9a2..587d2b6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
@@ -127,7 +127,7 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
         log.debug("Save entity {}", entity);
         if (entity.getId() == null) {
             entity.setId(UUIDs.timeBased());
-        } else {
+        } else if (isDeleteOnSave()) {
             removeById(entity.getId());
         }
         Statement saveStatement = getSaveQuery(entity);
@@ -136,6 +136,10 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
         return new EntityResultSet<>(resultSet, entity);
     }
 
+    protected boolean isDeleteOnSave() {
+        return true;
+    }
+
     public T save(T entity) {
         return saveWithResult(entity).getEntity();
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java
index 05678cc..994fe80 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.alarm.Alarm;
 import org.thingsboard.server.common.data.alarm.AlarmQuery;
 import org.thingsboard.server.common.data.id.EntityId;
@@ -59,6 +60,10 @@ public class AlarmDaoImpl extends AbstractModelDao<AlarmEntity> implements Alarm
         return ALARM_COLUMN_FAMILY_NAME;
     }
 
+    protected boolean isDeleteOnSave() {
+        return false;
+    }
+
     @Override
     public AlarmEntity save(Alarm alarm) {
         log.debug("Save asset [{}] ", alarm);
@@ -92,7 +97,7 @@ public class AlarmDaoImpl extends AbstractModelDao<AlarmEntity> implements Alarm
         log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getStatus(), query.getPageLink());
         EntityId affectedEntity = query.getAffectedEntityId();
         String relationType = query.getStatus() == null ? BaseAlarmService.ALARM_RELATION : BaseAlarmService.ALARM_RELATION_PREFIX + query.getStatus().name();
-        ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, query.getPageLink());
+        ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, EntityType.ALARM, query.getPageLink());
         return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Alarm>>) input -> {
             List<ListenableFuture<Alarm>> alarmFutures = new ArrayList<>(input.size());
             for (EntityRelation relation : input) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
index d274ea9..2675fa2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,8 +41,12 @@ import org.thingsboard.server.dao.service.DataValidator;
 import org.thingsboard.server.dao.tenant.TenantDao;
 
 import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static org.thingsboard.server.dao.DaoUtil.*;
@@ -64,6 +68,21 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService 
     @Autowired
     private RelationService relationService;
 
+    protected ExecutorService readResultsProcessingExecutor;
+
+    @PostConstruct
+    public void startExecutor() {
+        readResultsProcessingExecutor = Executors.newCachedThreadPool();
+    }
+
+    @PreDestroy
+    public void stopExecutor() {
+        if (readResultsProcessingExecutor != null) {
+            readResultsProcessingExecutor.shutdownNow();
+        }
+    }
+
+
     @Override
     public Alarm createOrUpdateAlarm(Alarm alarm) {
         alarmDataValidator.validate(alarm);
@@ -85,6 +104,8 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService 
                     createRelation(new EntityRelation(parentId, saved.getId(), ALARM_RELATION));
                     createRelation(new EntityRelation(parentId, saved.getId(), ALARM_RELATION_PREFIX + saved.getStatus().name()));
                 }
+                createRelation(new EntityRelation(alarm.getOriginator(), saved.getId(), ALARM_RELATION));
+                createRelation(new EntityRelation(alarm.getOriginator(), saved.getId(), ALARM_RELATION_PREFIX + saved.getStatus().name()));
                 return saved;
             } else {
                 log.debug("Alarm before merge: {}", alarm);
@@ -218,6 +239,8 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService 
                 deleteRelation(new EntityRelation(parentId, alarm.getId(), ALARM_RELATION_PREFIX + oldStatus.name()));
                 createRelation(new EntityRelation(parentId, alarm.getId(), ALARM_RELATION_PREFIX + newStatus.name()));
             }
+            deleteRelation(new EntityRelation(alarm.getOriginator(), alarm.getId(), ALARM_RELATION_PREFIX + oldStatus.name()));
+            createRelation(new EntityRelation(alarm.getOriginator(), alarm.getId(), ALARM_RELATION_PREFIX + newStatus.name()));
         } catch (ExecutionException | InterruptedException e) {
             log.warn("[{}] Failed to update relations. Old status: [{}], New status: [{}]", alarm.getId(), oldStatus, newStatus);
             throw new RuntimeException(e);
@@ -227,7 +250,7 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService 
     private ListenableFuture<Boolean> getAndUpdate(AlarmId alarmId, Function<Alarm, Boolean> function) {
         validateId(alarmId, "Alarm id should be specified!");
         ListenableFuture<Alarm> entity = alarmDao.findAlarmByIdAsync(alarmId.getId());
-        return Futures.transform(entity, function);
+        return Futures.transform(entity, function, readResultsProcessingExecutor);
     }
 
     private DataValidator<Alarm> alarmDataValidator =
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 0c68c39..2648b49 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -170,6 +170,7 @@ public class ModelConstants {
     public static final String RELATION_TO_TYPE_PROPERTY = "to_type";
     public static final String RELATION_TYPE_PROPERTY = "relation_type";
 
+    public static final String RELATION_BY_TYPE_AND_CHILD_TYPE_VIEW_NAME = "relation_by_type_and_child_type";
     public static final String RELATION_REVERSE_VIEW_NAME = "reverse_relation";
 
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
index b179a1e..1b05866 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.page.TimePageLink;
@@ -154,12 +155,13 @@ public class BaseRelationDao extends AbstractAsyncDao implements RelationDao {
     }
 
     @Override
-    public ListenableFuture<List<EntityRelation>> findRelations(EntityId from, String relationType, TimePageLink pageLink) {
-        Select.Where query = AbstractSearchTimeDao.buildQuery(RELATION_COLUMN_FAMILY_NAME,
+    public ListenableFuture<List<EntityRelation>> findRelations(EntityId from, String relationType, EntityType childType, TimePageLink pageLink) {
+        Select.Where query = AbstractSearchTimeDao.buildQuery(ModelConstants.RELATION_BY_TYPE_AND_CHILD_TYPE_VIEW_NAME,
                 Arrays.asList(eq(ModelConstants.RELATION_FROM_ID_PROPERTY, from.getId()),
                         eq(ModelConstants.RELATION_FROM_TYPE_PROPERTY, from.getEntityType().name()),
-                        eq(ModelConstants.RELATION_TYPE_PROPERTY, relationType)),
-                QueryBuilder.asc(ModelConstants.RELATION_TYPE_PROPERTY),
+                        eq(ModelConstants.RELATION_TYPE_PROPERTY, relationType),
+                        eq(ModelConstants.RELATION_TO_TYPE_PROPERTY, childType.name())),
+                Arrays.asList(QueryBuilder.asc(ModelConstants.RELATION_TYPE_PROPERTY), QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY)),
                 pageLink, ModelConstants.RELATION_TO_ID_PROPERTY);
         return getFuture(executeAsyncRead(query), rs -> getEntityRelations(rs));
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java
index 1a4838a..bec0320 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.dao.relation;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.page.TimePageLink;
 import org.thingsboard.server.common.data.relation.EntityRelation;
@@ -45,6 +46,6 @@ public interface RelationDao {
 
     ListenableFuture<Boolean> deleteOutboundRelations(EntityId entity);
 
-    ListenableFuture<List<EntityRelation>> findRelations(EntityId from, String relationType, TimePageLink pageLink);
+    ListenableFuture<List<EntityRelation>> findRelations(EntityId from, String relationType, EntityType toType, TimePageLink pageLink);
 
 }
diff --git a/dao/src/main/resources/schema.cql b/dao/src/main/resources/schema.cql
index 071697e..44afed9 100644
--- a/dao/src/main/resources/schema.cql
+++ b/dao/src/main/resources/schema.cql
@@ -271,6 +271,13 @@ CREATE TABLE IF NOT EXISTS thingsboard.relation (
 	PRIMARY KEY ((from_id, from_type), relation_type, to_id, to_type)
 ) WITH CLUSTERING ORDER BY ( relation_type ASC, to_id ASC, to_type ASC);
 
+CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.relation_by_type_and_child_type AS
+    SELECT *
+    from thingsboard.relation
+    WHERE from_id IS NOT NULL AND from_type IS NOT NULL AND relation_type IS NOT NULL AND to_id IS NOT NULL AND to_type IS NOT NULL
+    PRIMARY KEY ((from_id, from_type), relation_type, to_type, to_id)
+    WITH CLUSTERING ORDER BY ( relation_type ASC, from_type ASC, from_id ASC);
+
 CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.reverse_relation AS
     SELECT *
     from thingsboard.relation
diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
index 19def01..b150259 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 
 @RunWith(ClasspathSuite.class)
 @ClassnameFilters({
-//        "org.thingsboard.server.dao.service.AlarmServiceTest"
         "org.thingsboard.server.dao.service.*Test",
         "org.thingsboard.server.dao.kv.*Test",
         "org.thingsboard.server.dao.plugin.*Test",
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java
index ca4cdef..4264cbd 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -97,14 +97,76 @@ public class AlarmServiceTest extends AbstractServiceTest {
 
         Alarm fetched = alarmService.findAlarmById(created.getId()).get();
         Assert.assertEquals(created, fetched);
+    }
+
+    @Test
+    public void testFindAlarm() throws ExecutionException, InterruptedException {
+        AssetId parentId = new AssetId(UUIDs.timeBased());
+        AssetId childId = new AssetId(UUIDs.timeBased());
+
+        EntityRelation relation = new EntityRelation(parentId, childId, EntityRelation.CONTAINS_TYPE);
+
+        Assert.assertTrue(relationService.saveRelation(relation).get());
+
+        long ts = System.currentTimeMillis();
+        Alarm alarm = Alarm.builder().tenantId(tenantId).originator(childId)
+                .type(TEST_ALARM)
+                .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK)
+                .startTs(ts).build();
+
+        Alarm created = alarmService.createOrUpdateAlarm(alarm);
 
-//        TimePageData<Alarm> alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId)
-//                .affectedEntityId(parentId)
-//                .status(AlarmStatus.ACTIVE_UNACK).pageLink(
-//                        new TimePageLink(1, 0L, Long.MAX_VALUE, true)
-//                ).build()).get();
-//        Assert.assertNotNull(alarms.getData());
-//        Assert.assertEquals(1, alarms.getData().size());
-//        Assert.assertEquals(created, alarms.getData().get(0));
+        // Check child relation
+        TimePageData<Alarm> alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId)
+                .affectedEntityId(childId)
+                .status(AlarmStatus.ACTIVE_UNACK).pageLink(
+                        new TimePageLink(1, 0L, System.currentTimeMillis(), true)
+                ).build()).get();
+        Assert.assertNotNull(alarms.getData());
+        Assert.assertEquals(1, alarms.getData().size());
+        Assert.assertEquals(created, alarms.getData().get(0));
+
+        // Check parent relation
+        alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId)
+                .affectedEntityId(parentId)
+                .status(AlarmStatus.ACTIVE_UNACK).pageLink(
+                        new TimePageLink(1, 0L, System.currentTimeMillis(), true)
+                ).build()).get();
+        Assert.assertNotNull(alarms.getData());
+        Assert.assertEquals(1, alarms.getData().size());
+        Assert.assertEquals(created, alarms.getData().get(0));
+
+        alarmService.ackAlarm(created.getId(), System.currentTimeMillis()).get();
+        created = alarmService.findAlarmById(created.getId()).get();
+
+        alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId)
+                .affectedEntityId(childId)
+                .status(AlarmStatus.ACTIVE_ACK).pageLink(
+                        new TimePageLink(1, 0L, System.currentTimeMillis(), true)
+                ).build()).get();
+        Assert.assertNotNull(alarms.getData());
+        Assert.assertEquals(1, alarms.getData().size());
+        Assert.assertEquals(created, alarms.getData().get(0));
+
+        // Check not existing relation
+        alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId)
+                .affectedEntityId(childId)
+                .status(AlarmStatus.ACTIVE_UNACK).pageLink(
+                        new TimePageLink(1, 0L, System.currentTimeMillis(), true)
+                ).build()).get();
+        Assert.assertNotNull(alarms.getData());
+        Assert.assertEquals(0, alarms.getData().size());
+
+        alarmService.clearAlarm(created.getId(), System.currentTimeMillis()).get();
+        created = alarmService.findAlarmById(created.getId()).get();
+
+        alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId)
+                .affectedEntityId(childId)
+                .status(AlarmStatus.CLEARED_ACK).pageLink(
+                        new TimePageLink(1, 0L, System.currentTimeMillis(), true)
+                ).build()).get();
+        Assert.assertNotNull(alarms.getData());
+        Assert.assertEquals(1, alarms.getData().size());
+        Assert.assertEquals(created, alarms.getData().get(0));
     }
 }