diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index b18ccc2..b2df8df 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -100,58 +101,71 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
- ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
- if (responses.count() > 0) {
- log.trace("Polling responses completed, consumer records count [{}]", responses.count());
- }
- responses.forEach(response -> {
- log.trace("Received response to Kafka Template request: {}", response);
- Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
- Response decodedResponse = null;
- UUID requestId = null;
- if (requestIdHeader == null) {
- try {
- decodedResponse = responseTemplate.decode(response);
- requestId = responseTemplate.extractRequestId(decodedResponse);
- } catch (IOException e) {
- log.error("Failed to decode response", e);
- }
- } else {
- requestId = bytesToUuid(requestIdHeader.value());
+ try {
+ ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+ if (responses.count() > 0) {
+ log.trace("Polling responses completed, consumer records count [{}]", responses.count());
}
- if (requestId == null) {
- log.error("[{}] Missing requestId in header and body", response);
- } else {
- log.trace("[{}] Response received", requestId);
- ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
- if (expectedResponse == null) {
- log.trace("[{}] Invalid or stale request", requestId);
- } else {
+ responses.forEach(response -> {
+ log.trace("Received response to Kafka Template request: {}", response);
+ Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+ Response decodedResponse = null;
+ UUID requestId = null;
+ if (requestIdHeader == null) {
try {
- if (decodedResponse == null) {
- decodedResponse = responseTemplate.decode(response);
- }
- expectedResponse.future.set(decodedResponse);
+ decodedResponse = responseTemplate.decode(response);
+ requestId = responseTemplate.extractRequestId(decodedResponse);
} catch (IOException e) {
- expectedResponse.future.setException(e);
+ log.error("Failed to decode response", e);
}
+ } else {
+ requestId = bytesToUuid(requestIdHeader.value());
}
- }
- });
- tickTs = System.currentTimeMillis();
- tickSize = pendingRequests.size();
- if (nextCleanupMs < tickTs) {
- //cleanup;
- pendingRequests.entrySet().forEach(kv -> {
- if (kv.getValue().expTime < tickTs) {
- ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
- if (staleRequest != null) {
- log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
- staleRequest.future.setException(new TimeoutException());
+ if (requestId == null) {
+ log.error("[{}] Missing requestId in header and body", response);
+ } else {
+ log.trace("[{}] Response received", requestId);
+ ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
+ if (expectedResponse == null) {
+ log.trace("[{}] Invalid or stale request", requestId);
+ } else {
+ try {
+ if (decodedResponse == null) {
+ decodedResponse = responseTemplate.decode(response);
+ }
+ expectedResponse.future.set(decodedResponse);
+ } catch (IOException e) {
+ expectedResponse.future.setException(e);
+ }
}
}
});
- nextCleanupMs = tickTs + maxRequestTimeout;
+ tickTs = System.currentTimeMillis();
+ tickSize = pendingRequests.size();
+ if (nextCleanupMs < tickTs) {
+ //cleanup;
+ pendingRequests.entrySet().forEach(kv -> {
+ if (kv.getValue().expTime < tickTs) {
+ ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
+ if (staleRequest != null) {
+ log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
+ staleRequest.future.setException(new TimeoutException());
+ }
+ }
+ });
+ nextCleanupMs = tickTs + maxRequestTimeout;
+ }
+ } catch (InterruptException ie) {
+ if (!stopped) {
+ log.warn("Fetching data from kafka was interrupted.", ie);
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to obtain responses from queue.", e);
+ try {
+ Thread.sleep(pollInterval);
+ } catch (InterruptedException e2) {
+ log.trace("Failed to wait until the server has capacity to handle new responses", e2);
+ }
}
}
});