thingsboard-developers

TTL for events in Cassandra DAO

11/20/2018 9:36:08 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 46a76e6..816523d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -83,7 +83,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected void checkActive() {
         if (state != ComponentLifecycleState.ACTIVE) {
-            log.warn("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
+            log.debug("Component is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
             throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId);
         }
     }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 9f60fb6..c19f694 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -163,6 +163,7 @@ cassandra:
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
     ts_key_value_ttl: "${TS_KV_TTL:0}"
+    events_ttl: "${TS_EVENTS_TTL:0}"
     buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
     concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
     permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java
index 60af005..656fbd4 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.data.Event;
 import org.thingsboard.server.common.data.id.EntityId;
@@ -43,7 +44,9 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.in;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
 import static org.thingsboard.server.dao.model.ModelConstants.*;
 
 @Component
@@ -63,6 +66,9 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         return EVENT_COLUMN_FAMILY_NAME;
     }
 
+    @Value("${cassandra.query.events_ttl:0}")
+    private int eventsTtl;
+
     @Override
     public Event save(TenantId tenantId, Event event) {
         try {
@@ -85,7 +91,7 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         if (StringUtils.isEmpty(event.getUid())) {
             event.setUid(event.getId().toString());
         }
-        ListenableFuture<Optional<Event>> optionalSave = saveAsync(event.getTenantId(), new EventEntity(event), false);
+        ListenableFuture<Optional<Event>> optionalSave = saveAsync(event.getTenantId(), new EventEntity(event), false, eventsTtl);
         return Futures.transform(optionalSave, opt -> opt.orElse(null));
     }
 
@@ -98,7 +104,7 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         if (event.getId() == null) {
             event.setId(new EventId(UUIDs.timeBased()));
         }
-        return save(event.getTenantId(), new EventEntity(event), true);
+        return save(event.getTenantId(), new EventEntity(event), true, eventsTtl);
     }
 
     @Override
@@ -162,15 +168,15 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         return DaoUtil.convertDataList(entities);
     }
 
-    private Optional<Event> save(TenantId tenantId, EventEntity entity, boolean ifNotExists) {
+    private Optional<Event> save(TenantId tenantId, EventEntity entity, boolean ifNotExists, int ttl) {
         try {
-            return saveAsync(tenantId, entity, ifNotExists).get();
+            return saveAsync(tenantId, entity, ifNotExists, ttl).get();
         } catch (InterruptedException | ExecutionException e) {
             throw new IllegalStateException("Could not save EventEntity", e);
         }
     }
 
-    private ListenableFuture<Optional<Event>> saveAsync(TenantId tenantId, EventEntity entity, boolean ifNotExists) {
+    private ListenableFuture<Optional<Event>> saveAsync(TenantId tenantId, EventEntity entity, boolean ifNotExists, int ttl) {
         if (entity.getId() == null) {
             entity.setId(UUIDs.timeBased());
         }
@@ -185,6 +191,9 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         if (ifNotExists) {
             insert = insert.ifNotExists();
         }
+        if(ttl > 0){
+            insert.using(ttl(ttl));
+        }
         ResultSetFuture resultSetFuture = executeAsyncWrite(tenantId, insert);
         return Futures.transform(resultSetFuture, rs -> {
             if (rs.wasApplied()) {