thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java 1(+0 -1)
application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java 3(+1 -2)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 8(+4 -4)
application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java 5(+2 -3)
extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java 2(+1 -1)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/handlers/DefaultWebsocketMsgHandler.java 2(+1 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MethodNameFilter.java 4(+3 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MsgTypeFilter.java 2(+1 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java 2(+1 -1)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java 6(+3 -3)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index c370616..8f05422 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -121,7 +121,7 @@ public class AppActor extends ContextAwareActor {
private void broadcast(Object msg) {
pluginManager.broadcast(msg);
- tenantActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+ tenantActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
private void onToRuleMsg(ToRuleActorMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/SimpleRuleActorChain.java b/application/src/main/java/org/thingsboard/server/actors/rule/SimpleRuleActorChain.java
index 8112ac4..5a8b20a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rule/SimpleRuleActorChain.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rule/SimpleRuleActorChain.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.actors.rule;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -26,7 +25,7 @@ public class SimpleRuleActorChain implements RuleActorChain {
public SimpleRuleActorChain(Set<RuleActorMetaData> ruleSet) {
rules = new ArrayList<>(ruleSet);
- Collections.sort(rules, RuleActorMetaData.RULE_ACTOR_MD_COMPARATOR);
+ rules.sort(RuleActorMetaData.RULE_ACTOR_MD_COMPARATOR);
}
public int size() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index eb812df..90b0cb3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -111,7 +111,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
if (!newTargetServer.equals(currentTargetServer)) {
currentTargetServer = newTargetServer;
- pendingMap.values().stream().forEach(v -> {
+ pendingMap.values().forEach(v -> {
forwardToAppActor(context, v, currentTargetServer);
if (currentTargetServer.isPresent()) {
logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get());
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index 44eff16..c69946f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -66,7 +66,7 @@ public class SessionManagerActor extends ContextAwareActor {
}
private void broadcast(Object msg) {
- sessionActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+ sessionActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
private void onSessionTimeout(SessionTimeoutMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index 1c7f687..a3141ee 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -19,7 +19,6 @@ import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Scheduler;
import akka.event.LoggingAdapter;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
index c581c41..049accb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
@@ -15,9 +15,9 @@
*/
package org.thingsboard.server.actors.shared.plugin;
-import java.util.HashMap;
-import java.util.Map;
-
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.plugin.PluginActor;
@@ -29,12 +29,9 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.dao.plugin.PluginService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import akka.actor.ActorContext;
-import akka.actor.ActorRef;
-import akka.actor.Props;
+import java.util.HashMap;
+import java.util.Map;
@Slf4j
public abstract class PluginManager {
@@ -64,17 +61,13 @@ public abstract class PluginManager {
abstract TenantId getTenantId();
public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
- ActorRef pluginActor = pluginActors.get(pluginId);
- if (pluginActor == null) {
- pluginActor = context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pluginId))
- .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pluginId.toString());
- pluginActors.put(pluginId, pluginActor);
- }
- return pluginActor;
+ return pluginActors.computeIfAbsent(pluginId, pId ->
+ context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
+ .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pId.toString()));
}
public void broadcast(Object msg) {
- pluginActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+ pluginActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
public void remove(PluginId id) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
index d8b58a0..a27e903 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
@@ -20,7 +20,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.dao.plugin.BasePluginService;
-import org.thingsboard.server.dao.plugin.PluginService;
public class SystemPluginManager extends PluginManager {
@@ -30,7 +29,7 @@ public class SystemPluginManager extends PluginManager {
@Override
FetchFunction<PluginMetaData> getFetchPluginsFunction() {
- return link -> pluginService.findSystemPlugins(link);
+ return pluginService::findSystemPlugins;
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
index 67d44e9..dfe3f44 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
@@ -18,8 +18,7 @@ package org.thingsboard.server.actors.shared.rule;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.rule.RuleActor;
import org.thingsboard.server.actors.rule.RuleActorChain;
@@ -38,10 +37,9 @@ import org.thingsboard.server.dao.rule.RuleService;
import java.util.*;
+@Slf4j
public abstract class RuleManager {
- protected static final Logger logger = LoggerFactory.getLogger(RuleManager.class);
-
protected final ActorSystemContext systemContext;
protected final RuleService ruleService;
protected final Map<RuleId, ActorRef> ruleActors;
@@ -63,11 +61,11 @@ public abstract class RuleManager {
ruleMap = new HashMap<>();
for (RuleMetaData rule : ruleIterator) {
- logger.debug("[{}] Creating rule actor {}", rule.getId(), rule);
+ log.debug("[{}] Creating rule actor {}", rule.getId(), rule);
ActorRef ref = getOrCreateRuleActor(context, rule.getId());
RuleActorMetaData actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref);
ruleMap.put(rule, actorMd);
- logger.debug("[{}] Rule actor created.", rule.getId());
+ log.debug("[{}] Rule actor created.", rule.getId());
}
refreshRuleChain();
@@ -79,8 +77,11 @@ public abstract class RuleManager {
rule = systemContext.getRuleService().findRuleById(ruleId);
}
if (rule == null) {
- rule = ruleMap.keySet().stream().filter(r -> r.getId().equals(ruleId)).findFirst().orElse(null);
- rule.setState(ComponentLifecycleState.SUSPENDED);
+ rule = ruleMap.keySet().stream()
+ .filter(r -> r.getId().equals(ruleId))
+ .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED))
+ .findFirst()
+ .orElse(null);
}
if (rule != null) {
RuleActorMetaData actorMd = ruleMap.get(rule);
@@ -92,7 +93,7 @@ public abstract class RuleManager {
refreshRuleChain();
return Optional.of(actorMd.getActorRef());
} else {
- logger.warn("[{}] Can't process unknown rule!", rule.getId());
+ log.warn("[{}] Can't process unknown rule!", ruleId);
return Optional.empty();
}
}
@@ -100,13 +101,9 @@ public abstract class RuleManager {
abstract FetchFunction<RuleMetaData> getFetchRulesFunction();
public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) {
- ActorRef ruleActor = ruleActors.get(ruleId);
- if (ruleActor == null) {
- ruleActor = context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, ruleId))
- .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), ruleId.toString());
- ruleActors.put(ruleId, ruleActor);
- }
- return ruleActor;
+ return ruleActors.computeIfAbsent(ruleId, rId ->
+ context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId))
+ .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), rId.toString()));
}
public RuleActorChain getRuleChain() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java
index 6d56832..7fac168 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java
@@ -29,7 +29,7 @@ public class SystemRuleManager extends RuleManager {
@Override
FetchFunction<RuleMetaData> getFetchRulesFunction() {
- return link -> ruleService.findSystemRules(link);
+ return ruleService::findSystemRules;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 965c652..c8d5243 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -100,7 +100,7 @@ public class TenantActor extends ContextAwareActor {
private void broadcast(Object msg) {
pluginManager.broadcast(msg);
- deviceActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+ deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
private void onToDeviceActorMsg(ToDeviceActorMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 29e9b3c..dd784f0 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -166,7 +166,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
publishCurrentServer();
- getOtherServers().stream().forEach(
+ getOtherServers().forEach(
server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
);
}
@@ -194,13 +194,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
- listeners.stream().forEach(listener -> listener.onServerAdded(instance));
+ listeners.forEach(listener -> listener.onServerAdded(instance));
break;
case CHILD_UPDATED:
- listeners.stream().forEach(listener -> listener.onServerUpdated(instance));
+ listeners.forEach(listener -> listener.onServerUpdated(instance));
break;
case CHILD_REMOVED:
- listeners.stream().forEach(listener -> listener.onServerRemoved(instance));
+ listeners.forEach(listener -> listener.onServerRemoved(instance));
break;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 3c9ecf8..7a3c7ac 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -135,7 +135,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
private void logCircle() {
log.trace("Consistent Hash Circle Start");
- circle.entrySet().stream().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
+ circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
log.trace("Consistent Hash Circle End");
}
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index a51464c..975b52a 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -31,7 +31,6 @@ import org.thingsboard.server.dao.component.ComponentDescriptorService;
import org.thingsboard.server.extensions.api.component.*;
import javax.annotation.PostConstruct;
-import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.*;
import java.util.stream.Collectors;
@@ -72,7 +71,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
}
private void registerComponents(Collection<ComponentDescriptor> comps) {
- comps.stream().forEach(c -> components.put(c.getClazz(), c));
+ comps.forEach(c -> components.put(c.getClazz(), c));
}
private List<ComponentDescriptor> persist(Set<BeanDefinition> filterDefs, ComponentType type) {
@@ -119,7 +118,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
throw new RuntimeException("Plugin " + def.getBeanClassName() + "action " + actionClazz.getName() + " has wrong component type!");
}
}
- scannedComponent.setActions(Arrays.asList(pluginAnnotation.actions()).stream().map(action -> action.getName()).collect(Collectors.joining(",")));
+ scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(action -> action.getName()).collect(Collectors.joining(",")));
break;
default:
throw new RuntimeException(type + " is not supported yet!");
diff --git a/application/src/main/java/org/thingsboard/server/service/security/model/SecurityUser.java b/application/src/main/java/org/thingsboard/server/service/security/model/SecurityUser.java
index 83b87ab..6456968 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/model/SecurityUser.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/model/SecurityUser.java
@@ -20,9 +20,9 @@ import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.UserId;
-import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class SecurityUser extends User {
@@ -46,7 +46,7 @@ public class SecurityUser extends User {
public Collection<? extends GrantedAuthority> getAuthorities() {
if (authorities == null) {
- authorities = Arrays.asList(SecurityUser.this.getAuthority()).stream()
+ authorities = Stream.of(SecurityUser.this.getAuthority())
.map(authority -> new SimpleGrantedAuthority(authority.name()))
.collect(Collectors.toList());
}
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
index 538e8f9..20ceb77 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
@@ -129,8 +129,10 @@ public abstract class AbstractControllerTest {
@Autowired
void setConverters(HttpMessageConverter<?>[] converters) {
- this.mappingJackson2HttpMessageConverter = Arrays.asList(converters).stream().filter(
- hmc -> hmc instanceof MappingJackson2HttpMessageConverter).findAny().get();
+ this.mappingJackson2HttpMessageConverter = Arrays.stream(converters)
+ .filter(hmc -> hmc instanceof MappingJackson2HttpMessageConverter)
+ .findAny()
+ .get();
Assert.assertNotNull("the JSON message converter must not be null",
this.mappingJackson2HttpMessageConverter);
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/AbstractFeatureIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/AbstractFeatureIntegrationTest.java
index 8d22343..db90b89 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/AbstractFeatureIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/AbstractFeatureIntegrationTest.java
@@ -61,8 +61,10 @@ public class AbstractFeatureIntegrationTest {
@Autowired
void setConverters(HttpMessageConverter<?>[] converters) {
- this.mappingJackson2HttpMessageConverter = Arrays.asList(converters).stream().filter(
- hmc -> hmc instanceof MappingJackson2HttpMessageConverter).findAny().get();
+ this.mappingJackson2HttpMessageConverter = Arrays.stream(converters)
+ .filter(hmc -> hmc instanceof MappingJackson2HttpMessageConverter)
+ .findAny()
+ .get();
assertNotNull("the JSON message converter must not be null",
this.mappingJackson2HttpMessageConverter);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
index 4c542e3..ce4dd81 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
@@ -140,7 +140,7 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
List<Row> rows = resultSet.all();
List<AttributeKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) {
- rows.stream().forEach(row -> {
+ rows.forEach(row -> {
String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN);
AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row);
if (kvEntry != null) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 79134f7..851c770 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -143,7 +143,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
List<TsKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) {
- rows.stream().forEach(row -> {
+ rows.forEach(row -> {
TsKvEntry kvEntry = convertResultToTsKvEntry(row);
if (kvEntry != null) {
entries.add(kvEntry);
diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
index 1642fb5..7321ad7 100644
--- a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
+++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
@@ -47,7 +47,7 @@ public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> {
properties.put("buffer.memory", configuration.getBufferMemory());
if (configuration.getOtherProperties() != null) {
configuration.getOtherProperties()
- .stream().forEach(p -> properties.put(p.getKey(), p.getValue()));
+ .forEach(p -> properties.put(p.getKey(), p.getValue()));
}
init();
}
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/handlers/DefaultWebsocketMsgHandler.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/handlers/DefaultWebsocketMsgHandler.java
index fab11bb..1633c7f 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/handlers/DefaultWebsocketMsgHandler.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/handlers/DefaultWebsocketMsgHandler.java
@@ -91,7 +91,7 @@ public class DefaultWebsocketMsgHandler implements WebsocketMsgHandler {
}
public void clear(PluginContext ctx) {
- wsSessionsMap.values().stream().forEach(v -> {
+ wsSessionsMap.values().forEach(v -> {
try {
ctx.close(v.getSessionRef());
} catch (IOException e) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MethodNameFilter.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MethodNameFilter.java
index 21180d7..7123f3e 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MethodNameFilter.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MethodNameFilter.java
@@ -40,7 +40,9 @@ public class MethodNameFilter extends SimpleRuleLifecycleComponent implements Ru
@Override
public void init(MethodNameFilterConfiguration configuration) {
- methods = Arrays.asList(configuration.getMethodNames()).stream().map(m -> m.getName()).collect(Collectors.toSet());
+ methods = Arrays.stream(configuration.getMethodNames())
+ .map(m -> m.getName())
+ .collect(Collectors.toSet());
}
@Override
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MsgTypeFilter.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MsgTypeFilter.java
index 84deea5..737bee6 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MsgTypeFilter.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/filter/MsgTypeFilter.java
@@ -39,7 +39,7 @@ public class MsgTypeFilter extends SimpleRuleLifecycleComponent implements RuleF
@Override
public void init(MsgTypeFilterConfiguration configuration) {
- msgTypes = Arrays.asList(configuration.getMessageTypes()).stream().map(type -> {
+ msgTypes = Arrays.stream(configuration.getMessageTypes()).map(type -> {
switch (type) {
case "GET_ATTRIBUTES":
return MsgType.GET_ATTRIBUTES_REQUEST;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java
index 52fd2e9..c8a7ad8 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java
@@ -75,7 +75,7 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
if (configuration.getOtherProperties() != null) {
Properties mailProperties = new Properties();
configuration.getOtherProperties()
- .stream().forEach(p -> mailProperties.put(p.getKey(), p.getValue()));
+ .forEach(p -> mailProperties.put(p.getKey(), p.getValue()));
mail.setJavaMailProperties(mailProperties);
}
mailSender = mail;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index b166dae..06467fe 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -97,7 +97,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
builder.setDeviceId(cmd.getDeviceId().toString());
builder.setType(cmd.getType().name());
builder.setAllKeys(cmd.isAllKeys());
- cmd.getKeyStates().entrySet().stream().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
+ cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
}
@@ -144,7 +144,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
if (update.getErrorMsg() != null) {
builder.setErrorMsg(update.getErrorMsg());
}
- update.getData().entrySet().stream().forEach(
+ update.getData().entrySet().forEach(
e -> {
SubscriptionUpdateValueListProto.Builder dataBuilder = SubscriptionUpdateValueListProto.newBuilder();
@@ -166,7 +166,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
} else {
Map<String, List<Object>> data = new TreeMap<>();
- proto.getDataList().stream().forEach(v -> {
+ proto.getDataList().forEach(v -> {
List<Object> values = data.get(v.getKey());
if (values == null) {
values = new ArrayList<>();
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 6ea7489..8e2d62a 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -109,8 +109,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
Map<String, Long> subState = new HashMap<>(keys.size());
- keys.stream().forEach(key -> subState.put(key, 0L));
- attributesData.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
+ keys.forEach(key -> subState.put(key, 0L));
+ attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
} else {
@@ -119,7 +119,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
Map<String, Long> subState = new HashMap<>(attributesData.size());
- attributesData.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
+ attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
}
@@ -154,8 +154,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(keys.size());
- keys.stream().forEach(key -> subState.put(key, startTs));
- data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
+ keys.forEach(key -> subState.put(key, startTs));
+ data.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
} else {
@@ -168,8 +168,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(keys.size());
- keys.stream().forEach(key -> subState.put(key, startTs));
- data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
+ keys.forEach(key -> subState.put(key, startTs));
+ data.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
}
@@ -188,7 +188,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(data.size());
- data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
+ data.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, true, subState);
subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 637500e..190d9ff 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -68,7 +68,7 @@ public class SubscriptionManager {
registerSubscription(sessionId, deviceId, subscription);
List<TsKvEntry> missedUpdates = new ArrayList<>();
if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
- subscription.getKeyStates().entrySet().stream().forEach(e -> {
+ subscription.getKeyStates().entrySet().forEach(e -> {
Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
if (latestOpt.isPresent()) {
AttributeKvEntry latestEntry = latestOpt.get();