thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 4840f2a..94581b9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -21,6 +21,9 @@ import akka.actor.Scheduler;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import lombok.Getter;
@@ -66,6 +69,7 @@ import org.thingsboard.server.service.script.JsSandboxService;
 import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -314,22 +318,22 @@ public class ActorSystemContext {
     }
 
     public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
-        persistDebug(tenantId, entityId, "IN", tbMsg, relationType, null);
+        persistDebugAsync(tenantId, entityId, "IN", tbMsg, relationType, null);
     }
 
     public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
-        persistDebug(tenantId, entityId, "IN", tbMsg, relationType, error);
+        persistDebugAsync(tenantId, entityId, "IN", tbMsg, relationType, error);
     }
 
     public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
-        persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, error);
+        persistDebugAsync(tenantId, entityId, "OUT", tbMsg, relationType, error);
     }
 
     public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
-        persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, null);
+        persistDebugAsync(tenantId, entityId, "OUT", tbMsg, relationType, null);
     }
 
-    private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) {
+    private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) {
         try {
             Event event = new Event();
             event.setTenantId(tenantId);
@@ -355,7 +359,18 @@ public class ActorSystemContext {
             }
 
             event.setBody(node);
-            eventService.save(event);
+            ListenableFuture<Event> future = eventService.saveAsync(event);
+            Futures.addCallback(future, new FutureCallback<Event>() {
+                @Override
+                public void onSuccess(@Nullable Event event) {
+
+                }
+
+                @Override
+                public void onFailure(Throwable th) {
+                    log.error("Could not save debug Event for Node", th);
+                }
+            });
         } catch (IOException ex) {
             log.warn("Failed to persist rule node debug message", ex);
         }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java
index 55da480..7dddec1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.dao.event;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +45,12 @@ public class BaseEventService implements EventService {
     }
 
     @Override
+    public ListenableFuture<Event> saveAsync(Event event) {
+        eventValidator.validate(event);
+        return eventDao.saveAsync(event);
+    }
+
+    @Override
     public Optional<Event> saveIfNotExists(Event event) {
         eventValidator.validate(event);
         if (StringUtils.isEmpty(event.getUid())) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java
index 23655bb..7549e40 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java
@@ -15,11 +15,13 @@
  */
 package org.thingsboard.server.dao.event;
 
-import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
 import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Component;
@@ -38,13 +40,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_ID_VIEW_NAME;
-import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_TYPE_AND_ID_VIEW_NAME;
-import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME;
-import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
+import static org.thingsboard.server.dao.model.ModelConstants.*;
 
 @Component
 @Slf4j
@@ -65,6 +65,15 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
 
     @Override
     public Event save(Event event) {
+        try {
+            return saveAsync(event).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException("Could not save EventEntity", e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Event> saveAsync(Event event) {
         log.debug("Save event [{}] ", event);
         if (event.getTenantId() == null) {
             log.trace("Save system event with predefined id {}", systemTenantId);
@@ -76,7 +85,8 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         if (StringUtils.isEmpty(event.getUid())) {
             event.setUid(event.getId().toString());
         }
-        return save(new EventEntity(event), false).orElse(null);
+        ListenableFuture<Optional<Event>> optionalSave = saveAsync(new EventEntity(event), false);
+        return Futures.transform(optionalSave, opt -> opt.orElse(null));
     }
 
     @Override
@@ -153,6 +163,14 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
     }
 
     private Optional<Event> save(EventEntity entity, boolean ifNotExists) {
+        try {
+            return saveAsync(entity, ifNotExists).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException("Could not save EventEntity", e);
+        }
+    }
+
+    private ListenableFuture<Optional<Event>> saveAsync(EventEntity entity, boolean ifNotExists) {
         if (entity.getId() == null) {
             entity.setId(UUIDs.timeBased());
         }
@@ -167,11 +185,13 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
         if (ifNotExists) {
             insert = insert.ifNotExists();
         }
-        ResultSet rs = executeWrite(insert);
-        if (rs.wasApplied()) {
-            return Optional.of(DaoUtil.getData(entity));
-        } else {
-            return Optional.empty();
-        }
+        ResultSetFuture resultSetFuture = executeAsyncWrite(insert);
+        return Futures.transform(resultSetFuture, rs -> {
+            if (rs.wasApplied()) {
+                return Optional.of(DaoUtil.getData(entity));
+            } else {
+                return Optional.empty();
+            }
+        });
     }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java
index eb0fdbc..9469c61 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.dao.event;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.Event;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.page.TimePageLink;
@@ -38,6 +39,14 @@ public interface EventDao extends Dao<Event> {
     Event save(Event event);
 
     /**
+     * Save or update event object async
+     *
+     * @param event the event object
+     * @return saved event object future
+     */
+    ListenableFuture<Event> saveAsync(Event event);
+
+    /**
      * Save event object if it is not yet saved
      *
      * @param event the event object
diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java
index 64f823d..0698c6b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.dao.event;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.Event;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
@@ -28,6 +29,8 @@ public interface EventService {
 
     Event save(Event event);
 
+    ListenableFuture<Event> saveAsync(Event event);
+
     Optional<Event> saveIfNotExists(Event event);
 
     Optional<Event> findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java
index 01183a2..5a63ed8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.dao.sql.event;
 
 import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -82,6 +83,18 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event
     }
 
     @Override
+    public ListenableFuture<Event> saveAsync(Event event) {
+        log.debug("Save event [{}] ", event);
+        if (event.getId() == null) {
+            event.setId(new EventId(UUIDs.timeBased()));
+        }
+        if (StringUtils.isEmpty(event.getUid())) {
+            event.setUid(event.getId().toString());
+        }
+        return service.submit(() -> save(new EventEntity(event), false).orElse(null));
+    }
+
+    @Override
     public Optional<Event> saveIfNotExists(Event event) {
         return save(new EventEntity(event), true);
     }
@@ -89,7 +102,7 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event
     @Override
     public Event findEvent(UUID tenantId, EntityId entityId, String eventType, String eventUid) {
         return DaoUtil.getData(eventRepository.findByTenantIdAndEntityTypeAndEntityIdAndEventTypeAndEventUid(
-                UUIDConverter.fromTimeUUID(tenantId), entityId.getEntityType(),  UUIDConverter.fromTimeUUID(entityId.getId()), eventType, eventUid));
+                UUIDConverter.fromTimeUUID(tenantId), entityId.getEntityType(), UUIDConverter.fromTimeUUID(entityId.getId()), eventType, eventUid));
     }
 
     @Override