thingsboard-developers
Changes
application/pom.xml 54(+0 -54)
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 188(+111 -77)
common/queue/pom.xml 57(+0 -57)
dao/pom.xml 54(+0 -54)
docker-compose.yml 10(+2 -8)
pom.xml 5(+5 -0)
rule-engine/rule-engine-components/pom.xml 55(+0 -55)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java 9(+8 -1)
run.sh 2(+0 -2)
Details
application/pom.xml 54(+0 -54)
diff --git a/application/pom.xml b/application/pom.xml
index 6ca55bd..8b37284 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -264,11 +264,6 @@
<groupId>org.javadelight</groupId>
<artifactId>delight-nashorn-sandbox</artifactId>
</dependency>
- <dependency>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- <version>1.0</version>
- </dependency>
</dependencies>
<build>
@@ -611,55 +606,6 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
-
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>aspectj-maven-plugin</artifactId>
- <version>1.11</version>
- <configuration>
- <showWeaveInfo>false</showWeaveInfo>
- <complianceLevel>1.8</complianceLevel>
- <source>1.8</source>
- <target>1.8</target>
- <Xlint>ignore</Xlint>
- <encoding>UTF-8</encoding>
- <verbose>false</verbose>
- <forceAjcCompile>true</forceAjcCompile>
- <sources/>
- <weaveDirectories>
- <weaveDirectory>${project.build.directory}/classes</weaveDirectory>
- </weaveDirectories>
- <aspectLibraries>
- <aspectLibrary>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- </aspectLibrary>
- </aspectLibraries>
- </configuration>
- <executions>
- <execution>
- <phase>process-classes</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjrt</artifactId>
- <version>1.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjtools</artifactId>
- <version>1.9.1</version>
- </dependency>
- </dependencies>
- </plugin>
-
-
</plugins>
</build>
<repositories>
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index f86420e..3bccf57 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -87,6 +87,9 @@ import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import br.ufrgs.inf.prosoft.cache.MultiCache;
+import br.ufrgs.inf.prosoft.cache.KeyNotFoundException;
+
/**
* @author Andrew Shvayka
*/
@@ -95,7 +98,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
final TenantId tenantId;
final DeviceId deviceId;
- private final Map<UUID, SessionInfoMetaData> sessions;
+ // private final Map<UUID, SessionInfoMetaData> sessions;
+
+private MultiCache<UUID, SessionInfoMetaData> sessions = new MultiCache<>("DeviceActorMessageProcessor.sessions");
+
private final Map<UUID, SessionInfo> attributeSubscriptions;
private final Map<UUID, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
@@ -113,13 +119,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
super(systemContext);
this.tenantId = tenantId;
this.deviceId = deviceId;
- this.sessions = new LinkedHashMap<>();
+ // this.sessions = new LinkedHashMap<>();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.toDeviceRpcPendingMap = new HashMap<>();
this.toServerRpcPendingMap = new HashMap<>();
initAttributes();
-// restoreSessions();
+ restoreSessions();
}
private void initAttributes() {
@@ -420,14 +426,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.remove(sessionId);
} else {
- SessionInfoMetaData sessionMD = sessions.get(sessionId);
+ SessionInfoMetaData sessionMD = null;
+ try {
+ sessionMD = sessions.get(sessionId);
+ } catch (KeyNotFoundException ex) {}
if (sessionMD == null) {
sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
}
sessionMD.setSubscribedToAttributes(true);
log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
- //dumpSessions();
+ dumpSessions();
}
}
@@ -441,7 +450,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
} else {
- SessionInfoMetaData sessionMD = sessions.get(sessionId);
+ SessionInfoMetaData sessionMD = null;
+ try {
+ sessionMD = sessions.get(sessionId);
+ } catch (KeyNotFoundException ex) {}
if (sessionMD == null) {
sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
}
@@ -449,7 +461,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
sendPendingRequests(context, sessionId, sessionInfo);
-// dumpSessions();
+ dumpSessions();
}
}
@@ -461,33 +473,38 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
return;
}
log.debug("[{}] Processing new session [{}]", deviceId, sessionId);
- if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
- UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
+ if (sessions.values().size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
+ UUID sessionIdToRemove = sessions.entrySet().stream().findFirst().orElse(null).getKey();
if (sessionIdToRemove != null) {
- notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
+ SessionInfoMetaData s = null;
+ try {
+ s = sessions.get(sessionIdToRemove);
+ sessions.invalidate(sessionIdToRemove);
+ } catch (KeyNotFoundException ex) {}
+ notifyTransportAboutClosedSession(sessionIdToRemove, s);
}
}
sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())));
- if (sessions.size() == 1) {
+ if (sessions.values().size() == 1) {
reportSessionOpen();
}
-// dumpSessions();
+ dumpSessions();
} else if (msg.getEvent() == SessionEvent.CLOSED) {
log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
- sessions.remove(sessionId);
+ sessions.invalidate(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
if (sessions.isEmpty()) {
reportSessionClose();
}
-// dumpSessions();
+ dumpSessions();
}
}
private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
UUID sessionId = getSessionId(sessionInfoProto);
SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
- id -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
+ () -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
@@ -498,14 +515,14 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (subscriptionInfo.getRpcSubscription()) {
rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
-// dumpSessions();
+ dumpSessions();
}
void processCredentialsUpdate() {
sessions.forEach(this::notifyTransportAboutClosedSession);
attributeSubscriptions.clear();
rpcSubscriptions.clear();
-// dumpSessions();
+ dumpSessions();
}
private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) {
@@ -628,64 +645,81 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
return builder.build();
}
-// private void restoreSessions() {
-// log.debug("[{}] Restoring sessions from cache", deviceId);
-// TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
-// try {
-// sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
-// } catch (InvalidProtocolBufferException e) {
-// log.warn("[{}] Failed to decode device sessions from cache", deviceId);
-// return;
-// }
-// if (sessionsDump.getSessionsCount() == 0) {
-// log.debug("[{}] No session information found", deviceId);
-// return;
-// }
-// for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
-// SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
-// UUID sessionId = getSessionId(sessionInfoProto);
-// SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
-// TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
-// SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
-// sessions.put(sessionId, sessionMD);
-// if (subInfo.getAttributeSubscription()) {
-// attributeSubscriptions.put(sessionId, sessionInfo);
-// sessionMD.setSubscribedToAttributes(true);
-// }
-// if (subInfo.getRpcSubscription()) {
-// rpcSubscriptions.put(sessionId, sessionInfo);
-// sessionMD.setSubscribedToRPC(true);
-// }
-// log.debug("[{}] Restored session: {}", deviceId, sessionMD);
-// }
-// log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
-// }
-
-// private void dumpSessions() {
-// log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
-// List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
-// sessions.forEach((uuid, sessionMD) -> {
-// if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
-// return;
-// }
-// SessionInfo sessionInfo = sessionMD.getSessionInfo();
-// TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
-// .setLastActivityTime(sessionMD.getLastActivityTime())
-// .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
-// .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
-// TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder()
-// .setSessionIdMSB(uuid.getMostSignificantBits())
-// .setSessionIdLSB(uuid.getLeastSignificantBits())
-// .setNodeId(sessionInfo.getNodeId()).build();
-// sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
-// .setSessionInfo(sessionInfoProto)
-// .setSubscriptionInfo(subscriptionInfoProto).build());
-// log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
-// });
-// systemContext.getDeviceSessionCacheService()
-// .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
-// .addAllSessions(sessionsList).build().toByteArray());
-// }
+public static MultiCache<DeviceId, List<TransportProtos.SessionSubscriptionInfoProto>> deviceSessionCache = new MultiCache<>("ActorSystemContext.deviceSession");
+
+ private void restoreSessions() {
+ log.debug("[{}] Restoring sessions from cache", deviceId);
+
+ List<TransportProtos.SessionSubscriptionInfoProto> deviceList = null;
+
+ try {
+ deviceList = deviceSessionCache.get(deviceId);
+ } catch (KeyNotFoundException ex) {
+ return;
+ }
+ if (deviceList == null || deviceList.isEmpty()) {
+ return;
+ }
+
+ // TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
+ // try {
+ // sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
+ // } catch (InvalidProtocolBufferException e) {
+ // log.warn("[{}] Failed to decode device sessions from cache", deviceId);
+ // return;
+ // }
+ // if (sessionsDump.getSessionsCount() == 0) {
+ // log.debug("[{}] No session information found", deviceId);
+ // return;
+ // }
+ for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : deviceList) {
+ SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
+ UUID sessionId = getSessionId(sessionInfoProto);
+ SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
+ TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
+ SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
+ sessions.put(sessionId, sessionMD);
+ if (subInfo.getAttributeSubscription()) {
+ attributeSubscriptions.put(sessionId, sessionInfo);
+ sessionMD.setSubscribedToAttributes(true);
+ }
+ if (subInfo.getRpcSubscription()) {
+ rpcSubscriptions.put(sessionId, sessionInfo);
+ sessionMD.setSubscribedToRPC(true);
+ }
+ log.debug("[{}] Restored session: {}", deviceId, sessionMD);
+ }
+ log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
+ }
+
+ private void dumpSessions() {
+ log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
+ List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
+ sessions.forEach((uuid, sessionMD) -> {
+ if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
+ return;
+ }
+ SessionInfo sessionInfo = sessionMD.getSessionInfo();
+ TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
+ .setLastActivityTime(sessionMD.getLastActivityTime())
+ .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
+ .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
+ TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder()
+ .setSessionIdMSB(uuid.getMostSignificantBits())
+ .setSessionIdLSB(uuid.getLeastSignificantBits())
+ .setNodeId(sessionInfo.getNodeId()).build();
+ sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
+ .setSessionInfo(sessionInfoProto)
+ .setSubscriptionInfo(subscriptionInfoProto).build());
+ log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
+ });
+
+deviceSessionCache.put(deviceId, sessionsList);
+
+ // systemContext.getDeviceSessionCacheService()
+ // .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
+ // .addAllSessions(sessionsList).build().toByteArray());
+ }
void initSessionTimeout(ActorContext context) {
schedulePeriodicMsgWithDelay(context, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout());
@@ -695,13 +729,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
Map<UUID, SessionInfoMetaData> sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
sessionsToRemove.forEach((sessionId, sessionMD) -> {
- sessions.remove(sessionId);
+ sessions.invalidate(sessionId);
rpcSubscriptions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
notifyTransportAboutClosedSession(sessionId, sessionMD);
});
if (!sessionsToRemove.isEmpty()) {
-// dumpSessions();
+ dumpSessions();
}
}
}
common/queue/pom.xml 57(+0 -57)
diff --git a/common/queue/pom.xml b/common/queue/pom.xml
index 63e0854..b87c864 100644
--- a/common/queue/pom.xml
+++ b/common/queue/pom.xml
@@ -94,62 +94,5 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- <version>1.0</version>
- </dependency>
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>aspectj-maven-plugin</artifactId>
- <version>1.11</version>
- <configuration>
- <showWeaveInfo>false</showWeaveInfo>
- <complianceLevel>1.8</complianceLevel>
- <source>1.8</source>
- <target>1.8</target>
- <Xlint>ignore</Xlint>
- <encoding>UTF-8</encoding>
- <verbose>false</verbose>
- <forceAjcCompile>true</forceAjcCompile>
- <sources/>
- <weaveDirectories>
- <weaveDirectory>${project.build.directory}/classes</weaveDirectory>
- </weaveDirectories>
- <aspectLibraries>
- <aspectLibrary>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- </aspectLibrary>
- </aspectLibraries>
- </configuration>
- <executions>
- <execution>
- <phase>process-classes</phase>
- <goals>
- <goal>compile</goal>
- <goal>test-compile</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjrt</artifactId>
- <version>1.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjtools</artifactId>
- <version>1.9.1</version>
- </dependency>
- </dependencies>
- </plugin>
- </plugins>
- </build>
-
</project>
dao/pom.xml 54(+0 -54)
diff --git a/dao/pom.xml b/dao/pom.xml
index 6e016fe..63332eb 100644
--- a/dao/pom.xml
+++ b/dao/pom.xml
@@ -194,11 +194,6 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>rest</artifactId>
</dependency>
- <dependency>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- <version>1.0</version>
- </dependency>
</dependencies>
<build>
<plugins>
@@ -224,55 +219,6 @@
</execution>
</executions>
</plugin>
-
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>aspectj-maven-plugin</artifactId>
- <version>1.11</version>
- <configuration>
- <showWeaveInfo>false</showWeaveInfo>
- <complianceLevel>1.8</complianceLevel>
- <source>1.6</source>
- <target>1.6</target>
- <Xlint>ignore</Xlint>
- <encoding>UTF-8</encoding>
- <verbose>false</verbose>
- <forceAjcCompile>true</forceAjcCompile>
- <sources/>
- <weaveDirectories>
- <weaveDirectory>${project.build.directory}/classes</weaveDirectory>
- </weaveDirectories>
- <aspectLibraries>
- <aspectLibrary>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- </aspectLibrary>
- </aspectLibraries>
- </configuration>
- <executions>
- <execution>
- <phase>process-classes</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjrt</artifactId>
- <version>1.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjtools</artifactId>
- <version>1.9.1</version>
- </dependency>
- </dependencies>
- </plugin>
-
-
</plugins>
</build>
</project>
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
index 629aeb4..79fd06e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
@@ -62,6 +62,10 @@ import static org.thingsboard.server.dao.DaoUtil.toUUIDs;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
import static org.thingsboard.server.dao.service.Validator.*;
+import br.ufrgs.inf.prosoft.cache.MultiCache;
+import br.ufrgs.inf.prosoft.cache.KeyNotFoundException;
+import br.ufrgs.inf.prosoft.cache.Parameters;
+
@Service
@Slf4j
public class BaseAssetService extends AbstractEntityService implements AssetService {
@@ -99,13 +103,19 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
return assetDao.findByIdAsync(tenantId, assetId.getId());
}
+public static MultiCache<Parameters, Asset> findAssetByTenantIdAndNameCache = new MultiCache<>("BaseAssetService.findAssetByTenantIdAndName");
+
// @Cacheable(cacheNames = ASSET_CACHE, key = "{#tenantId, #name}")
@Override
public Asset findAssetByTenantIdAndName(TenantId tenantId, String name) {
+
+return findAssetByTenantIdAndNameCache.computeIfAbsent(new Parameters(tenantId, name), () -> {
+
log.trace("Executing findAssetByTenantIdAndName [{}][{}]", tenantId, name);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
return assetDao.findAssetsByTenantIdAndName(tenantId.getId(), name)
.orElse(null);
+}, 1440000);
}
// @CacheEvict(cacheNames = ASSET_CACHE, key = "{#asset.tenantId, #asset.name}")
@@ -113,7 +123,9 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
public Asset saveAsset(Asset asset) {
log.trace("Executing saveAsset [{}]", asset);
assetValidator.validate(asset, Asset::getTenantId);
- return assetDao.save(asset.getTenantId(), asset);
+ Asset save = assetDao.save(asset.getTenantId(), asset);
+findAssetByTenantIdAndNameCache.invalidate(new Parameters(asset.getTenantId(), asset.getName()));
+ return save;
}
@Override
@@ -147,11 +159,12 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
throw new RuntimeException("Exception while finding entity views for assetId [" + assetId + "]", e);
}
- List<Object> list = new ArrayList<>();
- list.add(asset.getTenantId());
- list.add(asset.getName());
+ // List<Object> list = new ArrayList<>();
+ // list.add(asset.getTenantId());
+ // list.add(asset.getName());
// Cache cache = cacheManager.getCache(ASSET_CACHE);
// cache.evict(list);
+findAssetByTenantIdAndNameCache.invalidate(new Parameters(asset.getTenantId(), asset.getName()));
assetDao.removeById(tenantId, assetId.getId());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java
index ff505e2..e078b88 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsServiceImpl.java
@@ -36,6 +36,8 @@ import org.thingsboard.server.dao.service.DataValidator;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validateString;
+import br.ufrgs.inf.prosoft.cache.*;
+
@Service
@Slf4j
public class DeviceCredentialsServiceImpl implements DeviceCredentialsService {
@@ -53,17 +55,22 @@ public class DeviceCredentialsServiceImpl implements DeviceCredentialsService {
return deviceCredentialsDao.findByDeviceId(tenantId, deviceId.getId());
}
+public static MultiCache<String, DeviceCredentials> findDeviceCredentialsByCredentialsIdCache = new MultiCache<>("DeviceCredentialsServiceImpl.findDeviceCredentialsByCredentialsId");
+
@Override
// @Cacheable(cacheNames = DEVICE_CREDENTIALS_CACHE, unless = "#result == null")
public DeviceCredentials findDeviceCredentialsByCredentialsId(String credentialsId) {
+return findDeviceCredentialsByCredentialsIdCache.computeIfAbsent(credentialsId, () -> {
log.trace("Executing findDeviceCredentialsByCredentialsId [{}]", credentialsId);
validateString(credentialsId, "Incorrect credentialsId " + credentialsId);
return deviceCredentialsDao.findByCredentialsId(new TenantId(EntityId.NULL_UUID), credentialsId);
+}, 1440000);
}
@Override
// @CacheEvict(cacheNames = DEVICE_CREDENTIALS_CACHE, keyGenerator = "previousDeviceCredentialsId", beforeInvocation = true)
public DeviceCredentials updateDeviceCredentials(TenantId tenantId, DeviceCredentials deviceCredentials) {
+ findDeviceCredentialsByCredentialsIdCache.invalidate(deviceCredentials.getCredentialsId());
return saveOrUpdate(tenantId, deviceCredentials);
}
@@ -93,6 +100,7 @@ public class DeviceCredentialsServiceImpl implements DeviceCredentialsService {
public void deleteDeviceCredentials(TenantId tenantId, DeviceCredentials deviceCredentials) {
log.trace("Executing deleteDeviceCredentials [{}]", deviceCredentials);
deviceCredentialsDao.removeById(tenantId, deviceCredentials.getUuidId());
+ findDeviceCredentialsByCredentialsIdCache.invalidate(deviceCredentials.getCredentialsId());
}
private DataValidator<DeviceCredentials> credentialsValidator =
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 250e719..d0c9b42 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -69,6 +69,8 @@ import static org.thingsboard.server.dao.service.Validator.validateIds;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
import static org.thingsboard.server.dao.service.Validator.validateString;
+import br.ufrgs.inf.prosoft.cache.*;
+
@Service
@Slf4j
public class DeviceServiceImpl extends AbstractEntityService implements DeviceService {
@@ -109,13 +111,17 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
return deviceDao.findByIdAsync(tenantId, deviceId.getId());
}
+public static MultiCache<Parameters, Device> findDeviceByTenantIdAndNameCache = new MultiCache<>("DeviceServiceImpl.findDeviceByTenantIdAndName");
+
// @Cacheable(cacheNames = DEVICE_CACHE, key = "{#tenantId, #name}")
@Override
public Device findDeviceByTenantIdAndName(TenantId tenantId, String name) {
+return findDeviceByTenantIdAndNameCache.computeIfAbsent(new Parameters(tenantId, name), () -> {
log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
Optional<Device> deviceOpt = deviceDao.findDeviceByTenantIdAndName(tenantId.getId(), name);
return deviceOpt.orElse(null);
+}, 1440000);
}
// @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}")
@@ -131,6 +137,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
deviceCredentials.setCredentialsId(RandomStringUtils.randomAlphanumeric(20));
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
}
+ findDeviceByTenantIdAndNameCache.invalidate(new Parameters(device.getTenantId(), device.getName()));
return savedDevice;
}
@@ -170,12 +177,14 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
}
deleteEntityRelations(tenantId, deviceId);
- List<Object> list = new ArrayList<>();
- list.add(device.getTenantId());
- list.add(device.getName());
+ // List<Object> list = new ArrayList<>();
+ // list.add(device.getTenantId());
+ // list.add(device.getName());
// Cache cache = cacheManager.getCache(DEVICE_CACHE);
// cache.evict(list);
+ findDeviceByTenantIdAndNameCache.invalidate(new Parameters(device.getTenantId(), device.getName()));
+
deviceDao.removeById(tenantId, deviceId.getId());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
index 549fa76..46a84b5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
@@ -64,6 +64,8 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
import static org.thingsboard.server.dao.service.Validator.validateString;
+import br.ufrgs.inf.prosoft.cache.*;
+
/**
* Created by Victor Basanets on 8/28/2017.
*/
@@ -97,6 +99,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
log.trace("Executing save entity view [{}]", entityView);
entityViewValidator.validate(entityView, EntityView::getTenantId);
EntityView savedEntityView = entityViewDao.save(entityView.getTenantId(), entityView);
+findEntityViewByTenantIdAndNameCache.invalidate(new Parameters(entityView.getTenantId(), entityView.getName()));
return savedEntityView;
}
@@ -105,7 +108,9 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
public EntityView assignEntityViewToCustomer(TenantId tenantId, EntityViewId entityViewId, CustomerId customerId) {
EntityView entityView = findEntityViewById(tenantId, entityViewId);
entityView.setCustomerId(customerId);
- return saveEntityView(entityView);
+ EntityView save = saveEntityView(entityView);
+findEntityViewByIdCache.invalidate(entityViewId);
+ return save;
}
// @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityViewId}")
@@ -113,7 +118,9 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
public EntityView unassignEntityViewFromCustomer(TenantId tenantId, EntityViewId entityViewId) {
EntityView entityView = findEntityViewById(tenantId, entityViewId);
entityView.setCustomerId(null);
- return saveEntityView(entityView);
+ EntityView save = saveEntityView(entityView);
+findEntityViewByIdCache.invalidate(entityViewId);
+ return save;
}
@Override
@@ -124,21 +131,29 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
customerEntityViewsUnAssigner.removeEntities(tenantId, customerId);
}
+public static MultiCache<EntityViewId, EntityView> findEntityViewByIdCache = new MultiCache<>("EntityViewServiceImpl.findEntityViewById");
+
// @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityViewId}")
@Override
public EntityView findEntityViewById(TenantId tenantId, EntityViewId entityViewId) {
+return findEntityViewByIdCache.computeIfAbsent(entityViewId, () -> {
log.trace("Executing findEntityViewById [{}]", entityViewId);
validateId(entityViewId, INCORRECT_ENTITY_VIEW_ID + entityViewId);
return entityViewDao.findById(tenantId, entityViewId.getId());
+}, 1440000);
}
+public static MultiCache<Parameters, EntityView> findEntityViewByTenantIdAndNameCache = new MultiCache<>("EntityViewServiceImpl.findEntityViewByTenantIdAndName");
+
// @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #name}")
@Override
public EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name) {
+return findEntityViewByTenantIdAndNameCache.computeIfAbsent(new Parameters(tenantId, name), () -> {
log.trace("Executing findEntityViewByTenantIdAndName [{}][{}]", tenantId, name);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
Optional<EntityView> entityViewOpt = entityViewDao.findEntityViewByTenantIdAndName(tenantId.getId(), name);
return entityViewOpt.orElse(null);
+}, 1440000);
}
@Override
@@ -225,13 +240,20 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(entityId.getId(), "Incorrect entityId" + entityId);
- List<Object> tenantIdAndEntityId = new ArrayList<>();
- tenantIdAndEntityId.add(tenantId);
- tenantIdAndEntityId.add(entityId);
+ // List<Object> tenantIdAndEntityId = new ArrayList<>();
+ // tenantIdAndEntityId.add(tenantId);
+ // tenantIdAndEntityId.add(entityId);
// Cache cache = cacheManager.getCache(ENTITY_VIEW_CACHE);
// List<EntityView> fromCache = cache.get(tenantIdAndEntityId, List.class);
+
List<EntityView> fromCache = null;
+try {
+ EntityView cachedview = findEntityViewByTenantIdAndNameCache.get(new Parameters(tenantId, entityId));
+ fromCache = new ArrayList<>();
+ fromCache.add(cachedview);
+} catch (KeyNotFoundException ex) {}
+
if (fromCache != null) {
return Futures.immediateFuture(fromCache);
} else {
@@ -241,6 +263,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
@Override
public void onSuccess(@Nullable List<EntityView> result) {
// cache.putIfAbsent(tenantIdAndEntityId, result);
+ findEntityViewByTenantIdAndNameCache.put(new Parameters(tenantId, entityId), result.get(0), 1440000);
}
@Override
@@ -261,6 +284,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
EntityView entityView = entityViewDao.findById(tenantId, entityViewId.getId());
// cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getEntityId()));
// cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getName()));
+findEntityViewByTenantIdAndNameCache.invalidate(new Parameters(entityView.getTenantId(), entityView.getName()));
entityViewDao.removeById(tenantId, entityViewId.getId());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
index 7627f11..3381850 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
@@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
+import br.ufrgs.inf.prosoft.cache.*;
+
//import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
/**
@@ -74,14 +76,18 @@ public class BaseRelationService implements RelationService {
return relationDao.checkRelation(tenantId, from, to, relationType, typeGroup);
}
+public static MultiCache<Parameters, EntityRelation> getRelationCache = new MultiCache<>("BaseRelationService.getRelation");
+
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}")
@Override
public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
+ return getRelationCache.computeIfAbsent(new Parameters(from, to, relationType, typeGroup), () -> {
try {
- return getRelationAsync(tenantId, from, to, relationType, typeGroup).get();
+ return getRelationAsync(tenantId, from, to, relationType, typeGroup).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
+ }, 1440000);
}
@Override
@@ -102,7 +108,13 @@ public class BaseRelationService implements RelationService {
public boolean saveRelation(TenantId tenantId, EntityRelation relation) {
log.trace("Executing saveRelation [{}]", relation);
validate(relation);
- return relationDao.saveRelation(tenantId, relation);
+ boolean save = relationDao.saveRelation(tenantId, relation);
+ getRelationCache.invalidate(new Parameters(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup()));
+ findByFromCache.invalidate(new Parameters(relation.getFrom(), relation.getType()));
+ findByFromAndTypeCache.invalidate(new Parameters(relation.getFrom(), relation.getTypeGroup(), relation.getType()));
+ findByToCache.invalidate(new Parameters(relation.getTo(), relation.getType()));
+ findByToAndTypeCache.invalidate(new Parameters(relation.getTo(), relation.getTypeGroup(), relation.getType()));
+ return save;
}
//
// @Caching(evict = {
@@ -116,7 +128,13 @@ public class BaseRelationService implements RelationService {
public ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation) {
log.trace("Executing saveRelationAsync [{}]", relation);
validate(relation);
- return relationDao.saveRelationAsync(tenantId, relation);
+ ListenableFuture<Boolean> save = relationDao.saveRelationAsync(tenantId, relation);
+ getRelationCache.invalidate(new Parameters(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup()));
+ findByFromCache.invalidate(new Parameters(relation.getFrom(), relation.getType()));
+ findByFromAndTypeCache.invalidate(new Parameters(relation.getFrom(), relation.getTypeGroup(), relation.getType()));
+ findByToCache.invalidate(new Parameters(relation.getTo(), relation.getType()));
+ findByToAndTypeCache.invalidate(new Parameters(relation.getTo(), relation.getTypeGroup(), relation.getType()));
+ return save;
}
// @Caching(evict = {
@@ -130,7 +148,13 @@ public class BaseRelationService implements RelationService {
public boolean deleteRelation(TenantId tenantId, EntityRelation relation) {
log.trace("Executing deleteRelation [{}]", relation);
validate(relation);
- return relationDao.deleteRelation(tenantId, relation);
+ boolean save = relationDao.deleteRelation(tenantId, relation);
+ getRelationCache.invalidate(new Parameters(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup()));
+ findByFromCache.invalidate(new Parameters(relation.getFrom(), relation.getType()));
+ findByFromAndTypeCache.invalidate(new Parameters(relation.getFrom(), relation.getTypeGroup(), relation.getType()));
+ findByToCache.invalidate(new Parameters(relation.getTo(), relation.getType()));
+ findByToAndTypeCache.invalidate(new Parameters(relation.getTo(), relation.getTypeGroup(), relation.getType()));
+ return save;
}
// @Caching(evict = {
@@ -144,7 +168,13 @@ public class BaseRelationService implements RelationService {
public ListenableFuture<Boolean> deleteRelationAsync(TenantId tenantId, EntityRelation relation) {
log.trace("Executing deleteRelationAsync [{}]", relation);
validate(relation);
- return relationDao.deleteRelationAsync(tenantId, relation);
+ ListenableFuture<Boolean> save = relationDao.deleteRelationAsync(tenantId, relation);
+ getRelationCache.invalidate(new Parameters(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup()));
+ findByFromCache.invalidate(new Parameters(relation.getFrom(), relation.getType()));
+ findByFromAndTypeCache.invalidate(new Parameters(relation.getFrom(), relation.getTypeGroup(), relation.getType()));
+ findByToCache.invalidate(new Parameters(relation.getTo(), relation.getType()));
+ findByToAndTypeCache.invalidate(new Parameters(relation.getTo(), relation.getTypeGroup(), relation.getType()));
+ return save;
}
// @Caching(evict = {
@@ -158,7 +188,9 @@ public class BaseRelationService implements RelationService {
public boolean deleteRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
log.trace("Executing deleteRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup);
validate(from, to, relationType, typeGroup);
- return relationDao.deleteRelation(tenantId, from, to, relationType, typeGroup);
+ boolean save = relationDao.deleteRelation(tenantId, from, to, relationType, typeGroup);
+ getRelationCache.invalidate(new Parameters(from, to, relationType, typeGroup));
+ return save;
}
// @Caching(evict = {
@@ -172,7 +204,13 @@ public class BaseRelationService implements RelationService {
public ListenableFuture<Boolean> deleteRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
log.trace("Executing deleteRelationAsync [{}][{}][{}][{}]", from, to, relationType, typeGroup);
validate(from, to, relationType, typeGroup);
- return relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup);
+ ListenableFuture<Boolean> save = relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup);
+ getRelationCache.invalidate(new Parameters(from, to, relationType, typeGroup));
+ findByFromCache.invalidate(new Parameters(from, typeGroup));
+ findByFromAndTypeCache.invalidate(new Parameters(from, relationType, typeGroup));
+ findByToCache.invalidate(new Parameters(to, typeGroup));
+ findByToAndTypeCache.invalidate(new Parameters(to, relationType, typeGroup));
+ return save;
}
@Override
@@ -239,43 +277,51 @@ public class BaseRelationService implements RelationService {
}
private void cacheEviction(EntityRelation relation, Cache cache) {
- List<Object> fromToTypeAndTypeGroup = new ArrayList<>();
- fromToTypeAndTypeGroup.add(relation.getFrom());
- fromToTypeAndTypeGroup.add(relation.getTo());
- fromToTypeAndTypeGroup.add(relation.getType());
- fromToTypeAndTypeGroup.add(relation.getTypeGroup());
+ getRelationCache.invalidate(new Parameters(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup()));
+ findByFromCache.invalidate(new Parameters(relation.getFrom(), relation.getType()));
+ findByFromAndTypeCache.invalidate(new Parameters(relation.getFrom(), relation.getTypeGroup(), relation.getType()));
+ findByToCache.invalidate(new Parameters(relation.getTo(), relation.getType()));
+ findByToAndTypeCache.invalidate(new Parameters(relation.getTo(), relation.getTypeGroup(), relation.getType()));
+ // List<Object> fromToTypeAndTypeGroup = new ArrayList<>();
+ // fromToTypeAndTypeGroup.add(relation.getFrom());
+ // fromToTypeAndTypeGroup.add(relation.getTo());
+ // fromToTypeAndTypeGroup.add(relation.getType());
+ // fromToTypeAndTypeGroup.add(relation.getTypeGroup());
// cache.evict(fromToTypeAndTypeGroup);
- List<Object> fromTypeAndTypeGroup = new ArrayList<>();
- fromTypeAndTypeGroup.add(relation.getFrom());
- fromTypeAndTypeGroup.add(relation.getType());
- fromTypeAndTypeGroup.add(relation.getTypeGroup());
- fromTypeAndTypeGroup.add(EntitySearchDirection.FROM.name());
+ // List<Object> fromTypeAndTypeGroup = new ArrayList<>();
+ // fromTypeAndTypeGroup.add(relation.getFrom());
+ // fromTypeAndTypeGroup.add(relation.getType());
+ // fromTypeAndTypeGroup.add(relation.getTypeGroup());
+ // fromTypeAndTypeGroup.add(EntitySearchDirection.FROM.name());
// cache.evict(fromTypeAndTypeGroup);
- List<Object> fromAndTypeGroup = new ArrayList<>();
- fromAndTypeGroup.add(relation.getFrom());
- fromAndTypeGroup.add(relation.getTypeGroup());
- fromAndTypeGroup.add(EntitySearchDirection.FROM.name());
+ // List<Object> fromAndTypeGroup = new ArrayList<>();
+ // fromAndTypeGroup.add(relation.getFrom());
+ // fromAndTypeGroup.add(relation.getTypeGroup());
+ // fromAndTypeGroup.add(EntitySearchDirection.FROM.name());
// cache.evict(fromAndTypeGroup);
- List<Object> toAndTypeGroup = new ArrayList<>();
- toAndTypeGroup.add(relation.getTo());
- toAndTypeGroup.add(relation.getTypeGroup());
- toAndTypeGroup.add(EntitySearchDirection.TO.name());
+ // List<Object> toAndTypeGroup = new ArrayList<>();
+ // toAndTypeGroup.add(relation.getTo());
+ // toAndTypeGroup.add(relation.getTypeGroup());
+ // toAndTypeGroup.add(EntitySearchDirection.TO.name());
// cache.evict(toAndTypeGroup);
- List<Object> toTypeAndTypeGroup = new ArrayList<>();
- toTypeAndTypeGroup.add(relation.getTo());
- toTypeAndTypeGroup.add(relation.getType());
- toTypeAndTypeGroup.add(relation.getTypeGroup());
- toTypeAndTypeGroup.add(EntitySearchDirection.TO.name());
+ // List<Object> toTypeAndTypeGroup = new ArrayList<>();
+ // toTypeAndTypeGroup.add(relation.getTo());
+ // toTypeAndTypeGroup.add(relation.getType());
+ // toTypeAndTypeGroup.add(relation.getTypeGroup());
+ // toTypeAndTypeGroup.add(EntitySearchDirection.TO.name());
// cache.evict(toTypeAndTypeGroup);
}
+public static MultiCache<Parameters, List<EntityRelation>> findByFromCache = new MultiCache<>("BaseRelationService.findByFrom");
+
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}")
@Override
public List<EntityRelation> findByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup) {
+return findByFromCache.computeIfAbsent(new Parameters(from, typeGroup), () -> {
validate(from);
validateTypeGroup(typeGroup);
try {
@@ -283,6 +329,7 @@ public class BaseRelationService implements RelationService {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
+}, 1440000);
}
@Override
@@ -291,14 +338,20 @@ public class BaseRelationService implements RelationService {
validate(from);
validateTypeGroup(typeGroup);
- List<Object> fromAndTypeGroup = new ArrayList<>();
- fromAndTypeGroup.add(from);
- fromAndTypeGroup.add(typeGroup);
- fromAndTypeGroup.add(EntitySearchDirection.FROM.name());
+ // List<Object> fromAndTypeGroup = new ArrayList<>();
+ // fromAndTypeGroup.add(from);
+ // fromAndTypeGroup.add(typeGroup);
+ // fromAndTypeGroup.add(EntitySearchDirection.FROM.name());
// Cache cache = cacheManager.getCache(RELATIONS_CACHE);
// List<EntityRelation> fromCache = cache.get(fromAndTypeGroup, List.class);
+
List<EntityRelation> fromCache = null;
+try {
+ fromCache = findByFromCache.get(new Parameters(from, typeGroup));
+} catch (KeyNotFoundException ex){
+}
+
if (fromCache != null) {
return Futures.immediateFuture(fromCache);
} else {
@@ -308,6 +361,7 @@ public class BaseRelationService implements RelationService {
@Override
public void onSuccess(@Nullable List<EntityRelation> result) {
// cache.putIfAbsent(fromAndTypeGroup, result);
+ findByFromCache.put(new Parameters(from, typeGroup), result, 1440000);
}
@Override
public void onFailure(Throwable t) {}
@@ -334,14 +388,18 @@ public class BaseRelationService implements RelationService {
});
}
+public static MultiCache<Parameters, List<EntityRelation>> findByFromAndTypeCache = new MultiCache<>("BaseRelationService.findByFromAndType");
+
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}")
@Override
public List<EntityRelation> findByFromAndType(TenantId tenantId, EntityId from, String relationType, RelationTypeGroup typeGroup) {
+return findByFromAndTypeCache.computeIfAbsent(new Parameters(from, relationType, typeGroup), () -> {
try {
return findByFromAndTypeAsync(tenantId, from, relationType, typeGroup).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
+}, 1440000);
}
@Override
@@ -353,9 +411,12 @@ public class BaseRelationService implements RelationService {
return relationDao.findAllByFromAndType(tenantId, from, relationType, typeGroup);
}
+public static MultiCache<Parameters, List<EntityRelation>> findByToCache = new MultiCache<>("BaseRelationService.findByTo");
+
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup, 'TO'}")
@Override
public List<EntityRelation> findByTo(TenantId tenantId, EntityId to, RelationTypeGroup typeGroup) {
+return findByToCache.computeIfAbsent(new Parameters(to, typeGroup), () -> {
validate(to);
validateTypeGroup(typeGroup);
try {
@@ -363,6 +424,7 @@ public class BaseRelationService implements RelationService {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
+}, 1440000);
}
@Override
@@ -371,14 +433,19 @@ public class BaseRelationService implements RelationService {
validate(to);
validateTypeGroup(typeGroup);
- List<Object> toAndTypeGroup = new ArrayList<>();
- toAndTypeGroup.add(to);
- toAndTypeGroup.add(typeGroup);
- toAndTypeGroup.add(EntitySearchDirection.TO.name());
+ // List<Object> toAndTypeGroup = new ArrayList<>();
+ // toAndTypeGroup.add(to);
+ // toAndTypeGroup.add(typeGroup);
+ // toAndTypeGroup.add(EntitySearchDirection.TO.name());
+
+ List<EntityRelation> fromCache = null;
+try{
+ fromCache = findByToCache.get(new Parameters(to, typeGroup));
+} catch (KeyNotFoundException ex){
+}
// Cache cache = cacheManager.getCache(RELATIONS_CACHE);
// List<EntityRelation> fromCache = cache.get(toAndTypeGroup, List.class);
- List<EntityRelation> fromCache = null;
if (fromCache != null) {
return Futures.immediateFuture(fromCache);
} else {
@@ -388,6 +455,7 @@ public class BaseRelationService implements RelationService {
@Override
public void onSuccess(@Nullable List<EntityRelation> result) {
// cache.putIfAbsent(toAndTypeGroup, result);
+ findByToCache.put(new Parameters(to, typeGroup), result, 1440000);
}
@Override
public void onFailure(Throwable t) {}
@@ -425,14 +493,18 @@ public class BaseRelationService implements RelationService {
});
}
+public static MultiCache<Parameters, List<EntityRelation>> findByToAndTypeCache = new MultiCache<>("BaseRelationService.findByToAndType");
+
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}")
@Override
public List<EntityRelation> findByToAndType(TenantId tenantId, EntityId to, String relationType, RelationTypeGroup typeGroup) {
+return findByToAndTypeCache.computeIfAbsent(new Parameters(to, relationType, typeGroup), () -> {
try {
return findByToAndTypeAsync(tenantId, to, relationType, typeGroup).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
+}, 1440000);
}
@Override
docker-compose.yml 10(+2 -8)
diff --git a/docker-compose.yml b/docker-compose.yml
index 2496297..fe84bd7 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -5,14 +5,8 @@ services:
container_name: thingsboard
environment:
- JAVA_OPTS=${JAVA_OPTS:-"-Xmx6124m"}
- - TRACER_ENABLE=${TRACER_ENABLE:-true}
- - TRACER_MINIMUM_EXECUTION_TIME=${TRACER_MINIMUM_EXECUTION_TIME:-1}
- - TRACER_SERIALISE_INTERNALS=false
- - TRACER_VERBOSE=true
- - TRACER_TRACES=/caching-approaches-comparison/applications/traces/thingsboard
- - TRACER_IGNORED_PACKAGES=/caching-approaches-comparison/applications/uncached/thingsboard/ignored
- - TRACER_WHITELIST=/caching-approaches-comparison/applications/uncached/thingsboard/whitelist
- - TRACER_LOG=/caching-approaches-comparison/applications/output/thingsboard-tracer.log
+ - CACHE_EVENTS=${CACHE_EVENTS:-/caching-approaches-comparison/applications/output/thingsboard-developers-cache}
+ - CACHE_REGISTER_SIZE=false
volumes:
- application:/application
- /root/.m2:/root/.m2
pom.xml 5(+5 -0)
diff --git a/pom.xml b/pom.xml
index f5e50dc..7315a86 100755
--- a/pom.xml
+++ b/pom.xml
@@ -327,6 +327,11 @@
<dependencies>
<dependency>
+ <groupId>br.ufrgs.inf.prosoft.cache</groupId>
+ <artifactId>Cache</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
rule-engine/rule-engine-components/pom.xml 55(+0 -55)
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index be747f0..f46ff9f 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -119,11 +119,6 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- <version>1.0</version>
- </dependency>
</dependencies>
<build>
@@ -160,56 +155,6 @@
</execution>
</executions>
</plugin>
-
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>aspectj-maven-plugin</artifactId>
- <version>1.11</version>
- <configuration>
- <showWeaveInfo>false</showWeaveInfo>
- <complianceLevel>1.8</complianceLevel>
- <source>1.8</source>
- <target>1.8</target>
- <Xlint>ignore</Xlint>
- <encoding>UTF-8</encoding>
- <verbose>false</verbose>
- <forceAjcCompile>true</forceAjcCompile>
- <sources/>
- <weaveDirectories>
- <weaveDirectory>${project.build.directory}/classes</weaveDirectory>
- </weaveDirectories>
- <aspectLibraries>
- <aspectLibrary>
- <groupId>br.ufrgs.inf.prosoft.applicationtracer</groupId>
- <artifactId>ApplicationTracer</artifactId>
- </aspectLibrary>
- </aspectLibraries>
- </configuration>
- <executions>
- <execution>
- <phase>process-classes</phase>
- <goals>
- <goal>compile</goal>
- <goal>test-compile</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjrt</artifactId>
- <version>1.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjtools</artifactId>
- <version>1.9.1</version>
- </dependency>
- </dependencies>
- </plugin>
-
-
</plugins>
</build>
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java
index 1fff583..f492252 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java
@@ -36,6 +36,8 @@ import org.thingsboard.server.dao.customer.CustomerService;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import br.ufrgs.inf.prosoft.cache.*;
+
import static org.thingsboard.rule.engine.api.util.DonAsynchron.withCallback;
@Slf4j
@@ -81,12 +83,17 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
protected abstract void doProcessCustomerAction(TbContext ctx, TbMsg msg, CustomerId customerId);
+private MultiCache<CustomerKey, Optional<CustomerId>> getCustomerCache = new MultiCache<>("TbAbstractCustomerActionNode.getCustomer");
+
protected ListenableFuture<CustomerId> getCustomer(TbContext ctx, TbMsg msg) {
String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg.getMetaData());
CustomerKey key = new CustomerKey(customerTitle);
return ctx.getDbCallbackExecutor().executeAsync(() -> {
// Optional<CustomerId> customerId = customerIdCache.get(key);
- Optional<CustomerId> customerId = customerLoader.load(key);
+ Optional<CustomerId> customerId = getCustomerCache.computeIfAbsent(key, () -> {
+ return customerLoader.load(key);
+ }, 1440000);
+
if (!customerId.isPresent()) {
throw new RuntimeException("No customer found with name '" + key.getCustomerTitle() + "'.");
}
run.sh 2(+0 -2)
diff --git a/run.sh b/run.sh
index b1cc836..0d4b734 100644
--- a/run.sh
+++ b/run.sh
@@ -1,12 +1,10 @@
#!/bin/bash
if [ ! -e compiled ]; then
- export TRACER_ENABLE=false
mvn clean install -DskipTests -Dlicense.skip=true
cd application/src/main/scripts/install
bash install_dev_db.sh
cd ../../../../..
touch compiled
fi
-export TRACER_ENABLE=${TRACER_ENABLE:-true}
java -jar application/target/thingsboard-2.2.0-boot.jar