Details
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 38db1a4..1e9e698 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -17,6 +17,11 @@ package org.thingsboard.server.service.transport;
import akka.actor.ActorRef;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.BlockingBucket;
+import io.github.bucket4j.Bucket4j;
+import io.github.bucket4j.local.LocalBucket;
+import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
@@ -49,6 +54,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
@@ -68,6 +74,13 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
@Value("${transport.remote.rule_engine.auto_commit_interval}")
private int autoCommitInterval;
+ @Value("${transport.remote.rule_engine.poll_records_pack_size}")
+ private long pollRecordsPackSize;
+ @Value("${transport.remote.rule_engine.max_poll_records_per_second}")
+ private long pollRecordsPerSecond;
+ @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
+ private long pollRecordsPerMinute;
+
@Autowired
private TbKafkaSettings kafkaSettings;
@@ -109,15 +122,29 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
ruleEngineConsumerBuilder.groupId("tb-node");
ruleEngineConsumerBuilder.autoCommit(true);
ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
+ ruleEngineConsumerBuilder.maxPollRecords(pollRecordsPackSize);
ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder());
ruleEngineConsumer = ruleEngineConsumerBuilder.build();
ruleEngineConsumer.subscribe();
+ LocalBucketBuilder builder = Bucket4j.builder();
+ builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
+ builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
+ LocalBucket pollRateBucket = builder.build();
+ BlockingBucket blockingPollRateBucket = pollRateBucket.asScheduler();
+
mainConsumerExecutor.execute(() -> {
while (!stopped) {
try {
ConsumerRecords<String, byte[]> records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration));
+ int recordsCount = records.count();
+ if (recordsCount > 0) {
+ while (!blockingPollRateBucket.tryConsume(recordsCount, TimeUnit.SECONDS.toNanos(5))) {
+ log.info("Rule Engine consumer is busy. Required tokens: [{}]. Available tokens: [{}].", recordsCount, pollRateBucket.getAvailableTokens());
+ }
+ log.trace("Processing {} records", recordsCount);
+ }
records.forEach(record -> {
try {
ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 5824293..c53f54b 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -393,6 +393,9 @@ transport:
topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}"
auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
+ poll_records_pack_size: "${TB_RULE_ENGINE_MAX_POLL_RECORDS:1000}"
+ max_poll_records_per_second: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:10000}"
+ max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:120000}"
notifications:
topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
sessions:
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 86be3a3..549c25e 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -46,7 +46,8 @@ public class TBKafkaConsumerTemplate<T> {
private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
TbKafkaRequestIdExtractor<T> requestIdExtractor,
String clientId, String groupId, String topic,
- boolean autoCommit, int autoCommitIntervalMs) {
+ boolean autoCommit, int autoCommitIntervalMs,
+ long maxPollRecords) {
Properties props = settings.toProps();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
if (groupId != null) {
@@ -56,6 +57,7 @@ public class TBKafkaConsumerTemplate<T> {
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
this.requestIdExtractor = requestIdExtractor;