TBKafkaConsumerTemplate.java
Home
/
common /
queue /
src /
main /
java /
org /
thingsboard /
server /
kafka /
TBKafkaConsumerTemplate.java
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.kafka;
import lombok.Builder;
import lombok.Getter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
/**
* Created by ashvayka on 24.09.18.
*/
public class TBKafkaConsumerTemplate<T> {
private final KafkaConsumer<String, byte[]> consumer;
private final TbKafkaDecoder<T> decoder;
@Builder.Default
private TbKafkaRequestIdExtractor<T> requestIdExtractor = ((response) -> null);
@Getter
private final String topic;
@Builder
private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
TbKafkaRequestIdExtractor<T> requestIdExtractor,
String clientId, String groupId, String topic,
boolean autoCommit, int autoCommitIntervalMs,
int maxPollRecords) {
Properties props = settings.toProps();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
if (groupId != null) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
if (maxPollRecords > 0) {
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
}
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;
this.requestIdExtractor = requestIdExtractor;
this.topic = topic;
}
public void subscribe() {
consumer.subscribe(Collections.singletonList(topic));
}
public void unsubscribe() {
consumer.unsubscribe();
}
public ConsumerRecords<String, byte[]> poll(Duration duration) {
return consumer.poll(duration);
}
public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
return decoder.decode(record.value());
}
public UUID extractRequestId(T value) {
return requestIdExtractor.extractRequestId(value);
}
}