thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 45f8433..4b4fec3 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -112,7 +112,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
     public void init() {
         TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
         notificationsProducerBuilder.settings(kafkaSettings);
-        notificationsProducerBuilder.defaultTopic(notificationsTopic);
         notificationsProducerBuilder.encoder(new ToTransportMsgEncoder());
 
         notificationsProducer = notificationsProducerBuilder.build();
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 88033aa..365413c 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -43,8 +43,6 @@ public class RemoteTransportApiService {
 
     @Value("${transport.remote.transport_api.requests_topic}")
     private String transportApiRequestsTopic;
-    @Value("${transport.remote.transport_api.responses_topic}")
-    private String transportApiResponsesTopic;
     @Value("${transport.remote.transport_api.max_pending_requests}")
     private int maxPendingRequests;
     @Value("${transport.remote.transport_api.request_timeout}")
@@ -73,7 +71,6 @@ public class RemoteTransportApiService {
 
         TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
         responseBuilder.settings(kafkaSettings);
-        responseBuilder.defaultTopic(transportApiResponsesTopic);
         responseBuilder.encoder(new TransportApiResponseEncoder());
 
         TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder();
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 52f6395..ed18a0a 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -55,6 +55,8 @@ rpc:
 
 # Clustering properties related to consistent-hashing. See architecture docs for more details.
 cluster:
+  # Unique id for this node (autogenerated if empty)
+  node_id: "${CLUSTER_NODE_ID:}"
   # Name of hash function used for consistent hash ring.
   hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
   # Amount of virtual nodes in consistent hash ring.
@@ -392,7 +394,6 @@ transport:
   remote:
     transport_api:
       requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
-      responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
       max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
       request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
       request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
index f6bafef..7cc90a0 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
@@ -15,15 +15,17 @@
  */
 package org.thingsboard.server.kafka;
 
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
 
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Created by ashvayka on 24.09.18.
@@ -36,7 +38,32 @@ public class TBKafkaAdmin {
         client = AdminClient.create(settings.toProps());
     }
 
+    public void waitForTopic(String topic, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException {
+        synchronized (this) {
+            long timeoutExpiredMs = System.currentTimeMillis() + timeoutUnit.toMillis(timeout);
+            while (!topicExists(topic)) {
+                long waitMs = timeoutExpiredMs - System.currentTimeMillis();
+                if (waitMs <= 0) {
+                    throw new TimeoutException("Timeout occurred while waiting for topic [" + topic + "] to be available!");
+                } else {
+                    wait(1000);
+                }
+            }
+        }
+    }
+
     public CreateTopicsResult createTopic(NewTopic topic){
         return client.createTopics(Collections.singletonList(topic));
     }
+
+    private boolean topicExists(String topic) throws InterruptedException {
+        KafkaFuture<TopicDescription> topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic);
+        try {
+            topicDescriptionFuture.get();
+            return true;
+        } catch (ExecutionException e) {
+            return false;
+        }
+    }
+
 }
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index ee652f4..6cf507f 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -20,14 +20,17 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Header;
+import org.springframework.util.StringUtils;
 
 import java.util.List;
 import java.util.Properties;
@@ -35,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by ashvayka on 24.09.18.
@@ -71,21 +75,19 @@ public class TBKafkaProducerTemplate<T> {
     }
 
     public void init() {
-        try {
-            TBKafkaAdmin admin = new TBKafkaAdmin(this.settings);
-            CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
-            result.all().get();
-        } catch (Exception e) {
-            if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
-                log.trace("[{}] Topic already exists.", defaultTopic);
-            } else {
-                log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+        this.partitionInfoMap = new ConcurrentHashMap<>();
+        if (!StringUtils.isEmpty(defaultTopic)) {
+            try {
+                TBKafkaAdmin admin = new TBKafkaAdmin(this.settings);
+                admin.waitForTopic(defaultTopic, 30, TimeUnit.SECONDS);
+                log.info("[{}] Topic exists.", defaultTopic);
+            } catch (Exception e) {
+                log.info("[{}] Failed to wait for topic: {}", defaultTopic, e.getMessage(), e);
                 throw new RuntimeException(e);
             }
+            //Maybe this should not be cached, but we don't plan to change size of partitions
+            this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
         }
-        //Maybe this should not be cached, but we don't plan to change size of partitions
-        this.partitionInfoMap = new ConcurrentHashMap<>();
-        this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
     }
 
     T enrich(T value, String responseTopic, UUID requestId) {
@@ -105,7 +107,11 @@ public class TBKafkaProducerTemplate<T> {
     }
 
     public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
-        return send(this.defaultTopic, key, value, timestamp, headers, callback);
+        if (!StringUtils.isEmpty(this.defaultTopic)) {
+            return send(this.defaultTopic, key, value, timestamp, headers, callback);
+        } else {
+            throw new RuntimeException("Failed to send message! Default topic is not specified!");
+        }
     }
 
     public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers, Callback callback) {
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 267d560..944ed66 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -56,6 +56,7 @@ services:
         max-file: "30"
     environment:
       TB_HOST: tb1
+      CLUSTER_NODE_ID: tb1
     env_file:
       - tb-node.env
     volumes:
@@ -77,6 +78,7 @@ services:
         max-file: "30"
     environment:
       TB_HOST: tb2
+      CLUSTER_NODE_ID: tb2
     env_file:
       - tb-node.env
     volumes:
@@ -93,6 +95,7 @@ services:
       - "1883"
     environment:
       TB_HOST: tb-mqtt-transport1
+      CLUSTER_NODE_ID: tb-mqtt-transport1
     env_file:
       - tb-mqtt-transport.env
     volumes:
@@ -107,6 +110,7 @@ services:
       - "1883"
     environment:
       TB_HOST: tb-mqtt-transport2
+      CLUSTER_NODE_ID: tb-mqtt-transport2
     env_file:
       - tb-mqtt-transport.env
     volumes:
@@ -121,6 +125,7 @@ services:
       - "8081"
     environment:
       TB_HOST: tb-http-transport1
+      CLUSTER_NODE_ID: tb-http-transport1
     env_file:
       - tb-http-transport.env
     volumes:
@@ -135,6 +140,7 @@ services:
       - "8081"
     environment:
       TB_HOST: tb-http-transport2
+      CLUSTER_NODE_ID: tb-http-transport2
     env_file:
       - tb-http-transport.env
     volumes:
@@ -149,6 +155,7 @@ services:
       - "5683:5683/udp"
     environment:
       TB_HOST: tb-coap-transport
+      CLUSTER_NODE_ID: tb-coap-transport
     env_file:
       - tb-coap-transport.env
     volumes:

docker/kafka.env 2(+1 -1)

diff --git a/docker/kafka.env b/docker/kafka.env
index 87dad07..485b3c0 100644
--- a/docker/kafka.env
+++ b/docker/kafka.env
@@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092
 KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092
 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
 KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
-KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1
+KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600
 KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
 KAFKA_LOG_RETENTION_BYTES=1073741824
 KAFKA_LOG_SEGMENT_BYTES=268435456
diff --git a/docker/tb-node.env b/docker/tb-node.env
index ca945ab..963943d 100644
--- a/docker/tb-node.env
+++ b/docker/tb-node.env
@@ -8,3 +8,5 @@ JS_EVALUATOR=remote
 TRANSPORT_TYPE=remote
 CACHE_TYPE=redis
 REDIS_HOST=redis
+
+HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
index 9918963..324f3cf 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
@@ -66,7 +66,7 @@ public class MqttClientTest extends AbstractContainerTest {
 
         WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
         MqttClient mqttClient = getMqttClient(deviceCredentials, null);
-        mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
+        mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())).get();
         WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
         log.info("Received telemetry: {}", actualLatestTelemetry);
         wsClient.closeBlocking();
@@ -93,7 +93,7 @@ public class MqttClientTest extends AbstractContainerTest {
 
         WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
         MqttClient mqttClient = getMqttClient(deviceCredentials, null);
-        mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
+        mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())).get();
         WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
         log.info("Received telemetry: {}", actualLatestTelemetry);
         wsClient.closeBlocking();
@@ -123,7 +123,7 @@ public class MqttClientTest extends AbstractContainerTest {
         clientAttributes.addProperty("attr2", true);
         clientAttributes.addProperty("attr3", 42.0);
         clientAttributes.addProperty("attr4", 73);
-        mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+        mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())).get();
         WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
         log.info("Received telemetry: {}", actualLatestTelemetry);
         wsClient.closeBlocking();
@@ -146,6 +146,7 @@ public class MqttClientTest extends AbstractContainerTest {
         Device device = createDevice("mqtt_");
         DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
 
+        WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
         MqttMessageListener listener = new MqttMessageListener();
         MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
 
@@ -153,7 +154,17 @@ public class MqttClientTest extends AbstractContainerTest {
         JsonObject clientAttributes = new JsonObject();
         String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8);
         clientAttributes.addProperty("clientAttr", clientAttributeValue);
-        mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+        mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())).get();
+
+        WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+        log.info("Received ws telemetry: {}", actualLatestTelemetry);
+        wsClient.closeBlocking();
+
+        Assert.assertEquals(1, actualLatestTelemetry.getData().size());
+        Assert.assertEquals(Sets.newHashSet("clientAttr"),
+                actualLatestTelemetry.getLatestValues().keySet());
+
+        Assert.assertTrue(verify(actualLatestTelemetry, "clientAttr", clientAttributeValue));
 
         // Add a new shared attribute
         JsonObject sharedAttributes = new JsonObject();
@@ -166,12 +177,16 @@ public class MqttClientTest extends AbstractContainerTest {
         Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
 
         // Subscribe to attributes response
-        mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE);
+        mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get();
+
+        // Wait until subscription is processed
+        TimeUnit.SECONDS.sleep(3);
+
         // Request attributes
         JsonObject request = new JsonObject();
         request.addProperty("clientKeys", "clientAttr");
         request.addProperty("sharedKeys", "sharedAttr");
-        mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
+        mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get();
         MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
         AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
         log.info("Received telemetry: {}", attributes);
@@ -193,7 +208,10 @@ public class MqttClientTest extends AbstractContainerTest {
 
         MqttMessageListener listener = new MqttMessageListener();
         MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
-        mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE);
+        mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get();
+
+        // Wait until subscription is processed
+        TimeUnit.SECONDS.sleep(3);
 
         String sharedAttributeName = "sharedAttr";
 
@@ -236,7 +254,10 @@ public class MqttClientTest extends AbstractContainerTest {
 
         MqttMessageListener listener = new MqttMessageListener();
         MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
-        mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE);
+        mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
+
+        // Wait until subscription is processed
+        TimeUnit.SECONDS.sleep(3);
 
         // Send an RPC from the server
         JsonObject serverRpcPayload = new JsonObject();
@@ -263,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest {
         JsonObject clientResponse = new JsonObject();
         clientResponse.addProperty("response", "someResponse");
         // Send a response to the server's RPC request
-        mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes()));
+        mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
 
         ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
         Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
@@ -280,7 +301,7 @@ public class MqttClientTest extends AbstractContainerTest {
 
         MqttMessageListener listener = new MqttMessageListener();
         MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
-        mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE);
+        mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
 
         // Get the default rule chain id to make it root again after test finished
         RuleChainId defaultRuleChainId = getDefaultRuleChainId();
@@ -294,7 +315,7 @@ public class MqttClientTest extends AbstractContainerTest {
         clientRequest.addProperty("method", "getResponse");
         clientRequest.addProperty("params", true);
         Integer requestId = 42;
-        mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes()));
+        mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get();
 
         // Check the response from the server
         TimeUnit.SECONDS.sleep(1);
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index 95e7b75..17f996e 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -17,7 +17,12 @@
 spring.main.web-environment: false
 spring.main.web-application-type: none
 
-# MQTT server parameters
+# Clustering properties
+cluster:
+  # Unique id for this node (autogenerated if empty)
+  node_id: "${CLUSTER_NODE_ID:}"
+
+# COAP server parameters
 transport:
   coap:
     bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index bb1d3c4..31e0194 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -20,6 +20,11 @@ server:
   # Server bind port
   port: "${HTTP_BIND_PORT:8081}"
 
+# Clustering properties
+cluster:
+  # Unique id for this node (autogenerated if empty)
+  node_id: "${CLUSTER_NODE_ID:}"
+
 # HTTP server parameters
 transport:
   http:
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index 719530c..46e9eb9 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -17,6 +17,11 @@
 spring.main.web-environment: false
 spring.main.web-application-type: none
 
+# Clustering properties
+cluster:
+  # Unique id for this node (autogenerated if empty)
+  node_id: "${CLUSTER_NODE_ID:}"
+
 # MQTT server parameters
 transport:
   mqtt: