thingsboard-aplcache

Rate Limiter for Kafka Consumer

10/30/2018 8:32:55 AM

Details

diff --git a/application/src/main/data/upgrade/2.2.0/schema_update.sql b/application/src/main/data/upgrade/2.2.0/schema_update.sql
new file mode 100644
index 0000000..1832b79
--- /dev/null
+++ b/application/src/main/data/upgrade/2.2.0/schema_update.sql
@@ -0,0 +1,17 @@
+--
+-- Copyright © 2016-2018 The Thingsboard Authors
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+ALTER TABLE component_descriptor ADD UNIQUE (clazz);
diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
index 45eb66a..2486cd8 100644
--- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
+++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
@@ -101,6 +101,10 @@ public class ThingsboardInstallService {
                         log.info("Upgrading ThingsBoard from version 2.1.1 to 2.1.2 ...");
 
                         databaseUpgradeService.upgradeDatabase("2.1.1");
+                    case "2.1.3":
+                        log.info("Upgrading ThingsBoard from version 2.1.3 to 2.2.0 ...");
+
+                        databaseUpgradeService.upgradeDatabase("2.1.3");
 
                         log.info("Updating system data...");
 
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index 5a1e559..545baf8 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -97,6 +97,11 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
                     log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
                     cause = e;
                     retryCount++;
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        throw new RuntimeException(e1);
+                    }
                 }
             }
             if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) {
diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
index 5526953..a2aeefa 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
@@ -251,7 +251,8 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
                 log.info("Entity views restored.");
 
                 break;
-
+            case "2.1.3":
+                break;
             default:
                 throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion);
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
index 2c7f89c..b4a725d 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
@@ -149,7 +149,14 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
                     log.info("Entity views restored.");
                 }
                 break;
-
+            case "2.1.3":
+                try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
+                    log.info("Updating schema ...");
+                    schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.2.0", SCHEMA_UPDATE_SQL);
+                    loadSql(schemaUpdateFile, conn);
+                    log.info("Schema updated.");
+                }
+                break;
             default:
                 throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 1e9e698..9beeebb 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -75,7 +75,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
     private int autoCommitInterval;
 
     @Value("${transport.remote.rule_engine.poll_records_pack_size}")
-    private long pollRecordsPackSize;
+    private int pollRecordsPackSize;
     @Value("${transport.remote.rule_engine.max_poll_records_per_second}")
     private long pollRecordsPerSecond;
     @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index c53f54b..51ddaa9 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -395,7 +395,7 @@ transport:
       auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
       poll_records_pack_size: "${TB_RULE_ENGINE_MAX_POLL_RECORDS:1000}"
       max_poll_records_per_second: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:10000}"
-      max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:120000}"
+      max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_MINUTE:120000}"
     notifications:
       topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
   sessions:
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 549c25e..c8a1706 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -47,7 +47,7 @@ public class TBKafkaConsumerTemplate<T> {
                                     TbKafkaRequestIdExtractor<T> requestIdExtractor,
                                     String clientId, String groupId, String topic,
                                     boolean autoCommit, int autoCommitIntervalMs,
-                                    long maxPollRecords) {
+                                    int maxPollRecords) {
         Properties props = settings.toProps();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
         if (groupId != null) {
@@ -57,7 +57,9 @@ public class TBKafkaConsumerTemplate<T> {
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+        if (maxPollRecords > 0) {
+            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+        }
         this.consumer = new KafkaConsumer<>(props);
         this.decoder = decoder;
         this.requestIdExtractor = requestIdExtractor;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java
index 66fbcc3..e4c6bb5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ComponentDescriptorEntity.java
@@ -34,6 +34,7 @@ import javax.persistence.Entity;
 import javax.persistence.EnumType;
 import javax.persistence.Enumerated;
 import javax.persistence.Table;
+import javax.persistence.UniqueConstraint;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
@@ -53,7 +54,7 @@ public class ComponentDescriptorEntity extends BaseSqlEntity<ComponentDescriptor
     @Column(name = ModelConstants.COMPONENT_DESCRIPTOR_NAME_PROPERTY)
     private String name;
 
-    @Column(name = ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY)
+    @Column(name = ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, unique = true)
     private String clazz;
 
     @Type(type = "json")
diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql
index 0b9c853..fc23832 100644
--- a/dao/src/main/resources/sql/schema-entities.sql
+++ b/dao/src/main/resources/sql/schema-entities.sql
@@ -78,7 +78,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv (
 CREATE TABLE IF NOT EXISTS component_descriptor (
     id varchar(31) NOT NULL CONSTRAINT component_descriptor_pkey PRIMARY KEY,
     actions varchar(255),
-    clazz varchar,
+    clazz varchar UNIQUE,
     configuration_descriptor varchar,
     name varchar(255),
     scope varchar(255),