thingsboard-aplcache

Rate limitter for Kafka consumer

10/30/2018 7:15:30 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 38db1a4..1e9e698 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -17,6 +17,11 @@ package org.thingsboard.server.service.transport;
 
 import akka.actor.ActorRef;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.BlockingBucket;
+import io.github.bucket4j.Bucket4j;
+import io.github.bucket4j.local.LocalBucket;
+import io.github.bucket4j.local.LocalBucketBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Callback;
@@ -49,6 +54,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 /**
@@ -68,6 +74,13 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
     @Value("${transport.remote.rule_engine.auto_commit_interval}")
     private int autoCommitInterval;
 
+    @Value("${transport.remote.rule_engine.poll_records_pack_size}")
+    private long pollRecordsPackSize;
+    @Value("${transport.remote.rule_engine.max_poll_records_per_second}")
+    private long pollRecordsPerSecond;
+    @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
+    private long pollRecordsPerMinute;
+
     @Autowired
     private TbKafkaSettings kafkaSettings;
 
@@ -109,15 +122,29 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
         ruleEngineConsumerBuilder.groupId("tb-node");
         ruleEngineConsumerBuilder.autoCommit(true);
         ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
+        ruleEngineConsumerBuilder.maxPollRecords(pollRecordsPackSize);
         ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder());
 
         ruleEngineConsumer = ruleEngineConsumerBuilder.build();
         ruleEngineConsumer.subscribe();
 
+        LocalBucketBuilder builder = Bucket4j.builder();
+        builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
+        builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
+        LocalBucket pollRateBucket = builder.build();
+        BlockingBucket blockingPollRateBucket = pollRateBucket.asScheduler();
+
         mainConsumerExecutor.execute(() -> {
             while (!stopped) {
                 try {
                     ConsumerRecords<String, byte[]> records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration));
+                    int recordsCount = records.count();
+                    if (recordsCount > 0) {
+                        while (!blockingPollRateBucket.tryConsume(recordsCount, TimeUnit.SECONDS.toNanos(5))) {
+                            log.info("Rule Engine consumer is busy. Required tokens: [{}]. Available tokens: [{}].", recordsCount, pollRateBucket.getAvailableTokens());
+                        }
+                        log.trace("Processing {} records", recordsCount);
+                    }
                     records.forEach(record -> {
                         try {
                             ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 5824293..c53f54b 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -393,6 +393,9 @@ transport:
       topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
       poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}"
       auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
+      poll_records_pack_size: "${TB_RULE_ENGINE_MAX_POLL_RECORDS:1000}"
+      max_poll_records_per_second: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:10000}"
+      max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:120000}"
     notifications:
       topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
   sessions:
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 86be3a3..549c25e 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -46,7 +46,8 @@ public class TBKafkaConsumerTemplate<T> {
     private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
                                     TbKafkaRequestIdExtractor<T> requestIdExtractor,
                                     String clientId, String groupId, String topic,
-                                    boolean autoCommit, int autoCommitIntervalMs) {
+                                    boolean autoCommit, int autoCommitIntervalMs,
+                                    long maxPollRecords) {
         Properties props = settings.toProps();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
         if (groupId != null) {
@@ -56,6 +57,7 @@ public class TBKafkaConsumerTemplate<T> {
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
         this.consumer = new KafkaConsumer<>(props);
         this.decoder = decoder;
         this.requestIdExtractor = requestIdExtractor;