thingsboard-aplcache

Details

diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index b18ccc2..b2df8df 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
@@ -100,58 +101,71 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
         executor.submit(() -> {
             long nextCleanupMs = 0L;
             while (!stopped) {
-                ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
-                if (responses.count() > 0) {
-                    log.trace("Polling responses completed, consumer records count [{}]", responses.count());
-                }
-                responses.forEach(response -> {
-                    log.trace("Received response to Kafka Template request: {}", response);
-                    Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
-                    Response decodedResponse = null;
-                    UUID requestId = null;
-                    if (requestIdHeader == null) {
-                        try {
-                            decodedResponse = responseTemplate.decode(response);
-                            requestId = responseTemplate.extractRequestId(decodedResponse);
-                        } catch (IOException e) {
-                            log.error("Failed to decode response", e);
-                        }
-                    } else {
-                        requestId = bytesToUuid(requestIdHeader.value());
+                try {
+                    ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+                    if (responses.count() > 0) {
+                        log.trace("Polling responses completed, consumer records count [{}]", responses.count());
                     }
-                    if (requestId == null) {
-                        log.error("[{}] Missing requestId in header and body", response);
-                    } else {
-                        log.trace("[{}] Response received", requestId);
-                        ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
-                        if (expectedResponse == null) {
-                            log.trace("[{}] Invalid or stale request", requestId);
-                        } else {
+                    responses.forEach(response -> {
+                        log.trace("Received response to Kafka Template request: {}", response);
+                        Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+                        Response decodedResponse = null;
+                        UUID requestId = null;
+                        if (requestIdHeader == null) {
                             try {
-                                if (decodedResponse == null) {
-                                    decodedResponse = responseTemplate.decode(response);
-                                }
-                                expectedResponse.future.set(decodedResponse);
+                                decodedResponse = responseTemplate.decode(response);
+                                requestId = responseTemplate.extractRequestId(decodedResponse);
                             } catch (IOException e) {
-                                expectedResponse.future.setException(e);
+                                log.error("Failed to decode response", e);
                             }
+                        } else {
+                            requestId = bytesToUuid(requestIdHeader.value());
                         }
-                    }
-                });
-                tickTs = System.currentTimeMillis();
-                tickSize = pendingRequests.size();
-                if (nextCleanupMs < tickTs) {
-                    //cleanup;
-                    pendingRequests.entrySet().forEach(kv -> {
-                        if (kv.getValue().expTime < tickTs) {
-                            ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
-                            if (staleRequest != null) {
-                                log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
-                                staleRequest.future.setException(new TimeoutException());
+                        if (requestId == null) {
+                            log.error("[{}] Missing requestId in header and body", response);
+                        } else {
+                            log.trace("[{}] Response received", requestId);
+                            ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
+                            if (expectedResponse == null) {
+                                log.trace("[{}] Invalid or stale request", requestId);
+                            } else {
+                                try {
+                                    if (decodedResponse == null) {
+                                        decodedResponse = responseTemplate.decode(response);
+                                    }
+                                    expectedResponse.future.set(decodedResponse);
+                                } catch (IOException e) {
+                                    expectedResponse.future.setException(e);
+                                }
                             }
                         }
                     });
-                    nextCleanupMs = tickTs + maxRequestTimeout;
+                    tickTs = System.currentTimeMillis();
+                    tickSize = pendingRequests.size();
+                    if (nextCleanupMs < tickTs) {
+                        //cleanup;
+                        pendingRequests.entrySet().forEach(kv -> {
+                            if (kv.getValue().expTime < tickTs) {
+                                ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
+                                if (staleRequest != null) {
+                                    log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
+                                    staleRequest.future.setException(new TimeoutException());
+                                }
+                            }
+                        });
+                        nextCleanupMs = tickTs + maxRequestTimeout;
+                    }
+                } catch (InterruptException ie) {
+                    if (!stopped) {
+                        log.warn("Fetching data from kafka was interrupted.", ie);
+                    }
+                } catch (Throwable e) {
+                    log.warn("Failed to obtain responses from queue.", e);
+                    try {
+                        Thread.sleep(pollInterval);
+                    } catch (InterruptedException e2) {
+                        log.trace("Failed to wait until the server has capacity to handle new responses", e2);
+                    }
                 }
             }
         });