thingsboard-aplcache
Changes
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java 12(+0 -12)
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java 2(+0 -2)
common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 31(+0 -31)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 1(+0 -1)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 10(+0 -10)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/AbstractQuotaService.java 67(+0 -67)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java 45(+0 -45)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryCleaner.java 29(+0 -29)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryLogger.java 52(+0 -52)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestIntervalRegistry.java 37(+0 -37)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestLimitPolicy.java 33(+0 -33)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestsQuotaService.java 37(+0 -37)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java 68(+0 -68)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java 62(+0 -62)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java 79(+0 -79)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/KeyBasedIntervalRegistry.java 73(+0 -73)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java 25(+0 -25)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/RequestLimitPolicy.java 30(+0 -30)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java 31(+0 -31)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java 54(+0 -54)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java 33(+0 -33)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java 32(+0 -32)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java 30(+0 -30)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java 4(+0 -4)
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java 66(+0 -66)
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java 47(+0 -47)
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java 78(+0 -78)
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java 83(+0 -83)
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java 65(+0 -65)
Details
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 17123b5..6cb3621 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -82,46 +82,6 @@ dashboard:
# Maximum allowed datapoints fetched by widgets
max_datapoints_limit: "${DASHBOARD_MAX_DATAPOINTS_LIMIT:50000}"
-#Quota parameters
-quota:
- host:
- # Max allowed number of API requests in interval for single host
- limit: "${QUOTA_HOST_LIMIT:10000}"
- # Interval duration
- intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
- # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
- ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
- # Interval for scheduled task that cleans expired records. TTL is used for expiring
- cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
- # Enable Host API Limits
- enabled: "${QUOTA_HOST_ENABLED:false}"
- # Array of whitelist hosts
- whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
- # Array of blacklist hosts
- blacklist: "${QUOTA_HOST_BLACKLIST:}"
- log:
- topSize: "${QUOTA_HOST_LOG_TOP_SIZE:10}"
- intervalMin: "${QUOTA_HOST_LOG_INTERVAL_MIN:2}"
- rule:
- tenant:
- # Max allowed number of API requests in interval for single tenant
- limit: "${QUOTA_TENANT_LIMIT:100000}"
- # Interval duration
- intervalMs: "${QUOTA_TENANT_INTERVAL_MS:60000}"
- # Maximum silence duration for tenant after which Tenant removed from QuotaService. Must be bigger than intervalMs
- ttlMs: "${QUOTA_TENANT_TTL_MS:60000}"
- # Interval for scheduled task that cleans expired records. TTL is used for expiring
- cleanPeriodMs: "${QUOTA_TENANT_CLEAN_PERIOD_MS:300000}"
- # Enable Host API Limits
- enabled: "${QUOTA_TENANT_ENABLED:true}"
- # Array of whitelist tenants
- whitelist: "${QUOTA_TENANT_WHITELIST:}"
- # Array of blacklist tenants
- blacklist: "${QUOTA_HOST_TENANT_BLACKLIST:}"
- log:
- topSize: "${QUOTA_TENANT_LOG_TOP_SIZE:10}"
- intervalMin: "${QUOTA_TENANT_LOG_INTERVAL_MIN:2}"
-
database:
entities:
type: "${DATABASE_ENTITIES_TYPE:sql}" # cassandra OR sql
diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 0e2a794..9cf36e3 100644
--- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -72,12 +72,6 @@ public class CoapTransportResource extends CoapResource {
@Override
public void handleGET(CoapExchange exchange) {
- if (transportContext.getQuotaService().isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) {
- log.warn("CoAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort());
- exchange.respond(ResponseCode.BAD_REQUEST);
- return;
- }
-
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
if (!featureType.isPresent()) {
log.trace("Missing feature type parameter");
@@ -108,12 +102,6 @@ public class CoapTransportResource extends CoapResource {
@Override
public void handlePOST(CoapExchange exchange) {
- if (transportContext.getQuotaService().isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) {
- log.warn("CoAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort());
- exchange.respond(ResponseCode.BAD_REQUEST);
- return;
- }
-
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
if (!featureType.isPresent()) {
log.trace("Missing feature type parameter");
diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
index 278ca6d..c0bd7f9 100644
--- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
+++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
@@ -27,8 +27,6 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.common.transport.quota.QuotaService;
-import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import javax.annotation.PostConstruct;
diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index b40fbc1..18458d1 100644
--- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -75,9 +75,6 @@ public class DeviceApiController {
@RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
- if (quotaExceeded(httpRequest, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
GetAttributeRequestMsg.Builder request = GetAttributeRequestMsg.newBuilder().setRequestId(0);
@@ -100,9 +97,6 @@ public class DeviceApiController {
public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
- if (quotaExceeded(request, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
@@ -116,9 +110,6 @@ public class DeviceApiController {
public DeferredResult<ResponseEntity> postTelemetry(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
- if (quotaExceeded(request, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
@@ -133,9 +124,6 @@ public class DeviceApiController {
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
- if (quotaExceeded(httpRequest, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
@@ -153,9 +141,6 @@ public class DeviceApiController {
@PathVariable("requestId") Integer requestId,
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
- if (quotaExceeded(request, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
@@ -168,9 +153,6 @@ public class DeviceApiController {
public DeferredResult<ResponseEntity> postRpcRequest(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json, HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
- if (quotaExceeded(httpRequest, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
@@ -189,9 +171,6 @@ public class DeviceApiController {
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
- if (quotaExceeded(httpRequest, responseWriter)) {
- return responseWriter;
- }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
@@ -204,16 +183,6 @@ public class DeviceApiController {
return responseWriter;
}
- private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
- if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) {
- log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
- return true;
- } else {
- return false;
- }
- }
-
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
private final TransportContext transportContext;
private final DeferredResult<ResponseEntity> responseWriter;
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 538daa1..184ac49 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -28,7 +28,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
-import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 46a890f..da8b3a6 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -42,7 +42,6 @@ import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
-import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
@@ -92,7 +91,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final MqttTransportContext context;
private final MqttTransportAdaptor adaptor;
private final TransportService transportService;
- private final QuotaService quotaService;
private final SslHandler sslHandler;
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
@@ -106,7 +104,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.context = context;
this.transportService = context.getTransportService();
this.adaptor = context.getAdaptor();
- this.quotaService = context.getQuotaService();
this.sslHandler = context.getSslHandler();
this.mqttQoSMap = new ConcurrentHashMap<>();
this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap);
@@ -129,13 +126,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processDisconnect(ctx);
return;
}
-
- if (quotaService.isQuotaExceeded(address.getHostName())) {
- log.warn("MQTT Quota exceeded for [{}:{}] . Disconnect", address.getHostName(), address.getPort());
- processDisconnect(ctx);
- return;
- }
-
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
index ab42982..58aaa78 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
@@ -20,7 +20,6 @@ import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
-import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import javax.annotation.PostConstruct;
@@ -43,9 +42,6 @@ public class TransportContext {
@Autowired
private TbNodeIdProvider nodeIdProvider;
- @Autowired(required = false)
- private HostRequestsQuotaService quotaService;
-
@Getter
private ExecutorService executor;
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index 822d104..30351b0 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -28,27 +28,6 @@ transport:
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
-#Quota parameters
-quota:
- host:
- # Max allowed number of API requests in interval for single host
- limit: "${QUOTA_HOST_LIMIT:10000}"
- # Interval duration
- intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
- # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
- ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
- # Interval for scheduled task that cleans expired records. TTL is used for expiring
- cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
- # Enable Host API Limits
- enabled: "${QUOTA_HOST_ENABLED:true}"
- # Array of whitelist hosts
- whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
- # Array of blacklist hosts
- blacklist: "${QUOTA_HOST_BLACKLIST:}"
- log:
- topSize: 10
- intervalMin: 2
-
kafka:
enabled: true
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index 650bfdf..001e08a 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -29,27 +29,6 @@ transport:
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
-#Quota parameters
-quota:
- host:
- # Max allowed number of API requests in interval for single host
- limit: "${QUOTA_HOST_LIMIT:10000}"
- # Interval duration
- intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
- # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
- ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
- # Interval for scheduled task that cleans expired records. TTL is used for expiring
- cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
- # Enable Host API Limits
- enabled: "${QUOTA_HOST_ENABLED:true}"
- # Array of whitelist hosts
- whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
- # Array of blacklist hosts
- blacklist: "${QUOTA_HOST_BLACKLIST:}"
- log:
- topSize: 10
- intervalMin: 2
-
kafka:
enabled: true
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index a0e86bb..e37d14e 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -51,27 +51,6 @@ transport:
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
-#Quota parameters
-quota:
- host:
- # Max allowed number of API requests in interval for single host
- limit: "${QUOTA_HOST_LIMIT:10000}"
- # Interval duration
- intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
- # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
- ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
- # Interval for scheduled task that cleans expired records. TTL is used for expiring
- cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
- # Enable Host API Limits
- enabled: "${QUOTA_HOST_ENABLED:true}"
- # Array of whitelist hosts
- whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
- # Array of blacklist hosts
- blacklist: "${QUOTA_HOST_BLACKLIST:}"
- log:
- topSize: 10
- intervalMin: 2
-
kafka:
enabled: true
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"