thingsboard-developers
Changes
application/pom.xml 2(+1 -1)
common/transport/pom.xml 5(+5 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java 2(+2 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java 2(+2 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java 2(+2 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java 2(+2 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java 2(+2 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java 8(+4 -4)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 15(+15 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java 17(+16 -1)
common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java 6(+5 -1)
pom.xml 2(+1 -1)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java 280(+0 -280)
transport/mqtt-common/pom.xml 4(+2 -2)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 6(+4 -2)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java 12(+6 -6)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java 6(+3 -3)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java 284(+284 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java 4(+2 -2)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionId.java 0(+0 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java 0(+0 -0)
transport/mqtt-transport/pom.xml 28(+4 -24)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java 45(+45 -0)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java 110(+110 -0)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java 14(+14 -0)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java 16(+16 -0)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java 3(+1 -2)
transport/pom.xml 2(+1 -1)
Details
application/pom.xml 2(+1 -1)
diff --git a/application/pom.xml b/application/pom.xml
index c4766f2..bff96ec 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -70,7 +70,7 @@
</dependency>
<dependency>
<groupId>org.thingsboard.transport</groupId>
- <artifactId>mqtt</artifactId>
+ <artifactId>mqtt-common</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 3972264..3adb1c3 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -43,7 +43,8 @@ public class TBKafkaConsumerTemplate<T> {
private final String topic;
@Builder
- private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, TbKafkaRequestIdExtractor<T> requestIdExtractor,
+ private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
+ TbKafkaRequestIdExtractor<T> requestIdExtractor,
String clientId, String groupId, String topic,
boolean autoCommit, int autoCommitIntervalMs) {
Properties props = settings.toProps();
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
new file mode 100644
index 0000000..66d53c3
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
@@ -0,0 +1,12 @@
+package org.thingsboard.server.kafka;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public interface TbKafkaHandler<Request, Response> {
+
+ void handle(Request request, Consumer<Response> onSuccess, Consumer<Throwable> onFailure);
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 7e24ad0..2611e94 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,13 +27,12 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.Header;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
/**
* Created by ashvayka on 24.09.18.
@@ -48,7 +47,7 @@ public class TBKafkaProducerTemplate<T> {
private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value);
private final TbKafkaPartitioner<T> partitioner;
- private List<PartitionInfo> partitionInfoList;
+ private ConcurrentMap<String, List<PartitionInfo>> partitionInfoMap;
@Getter
private final String defaultTopic;
@@ -78,11 +77,16 @@ public class TBKafkaProducerTemplate<T> {
log.trace("Failed to create topic: {}", e.getMessage(), e);
}
//Maybe this should not be cached, but we don't plan to change size of partitions
- this.partitionInfoList = producer.partitionsFor(defaultTopic);
+ this.partitionInfoMap = new ConcurrentHashMap<>();
+ this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
}
- public T enrich(T value, String responseTopic, UUID requestId) {
- return enricher.enrich(value, responseTopic, requestId);
+ T enrich(T value, String responseTopic, UUID requestId) {
+ if (enricher != null) {
+ return enricher.enrich(value, responseTopic, requestId);
+ } else {
+ return value;
+ }
}
public Future<RecordMetadata> send(String key, T value) {
@@ -101,7 +105,7 @@ public class TBKafkaProducerTemplate<T> {
byte[] data = encoder.encode(value);
ProducerRecord<String, byte[]> record;
Integer partition = getPartition(topic, key, value, data);
- record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers);
+ record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers);
return producer.send(record);
}
@@ -109,7 +113,7 @@ public class TBKafkaProducerTemplate<T> {
if (partitioner == null) {
return null;
} else {
- return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList);
+ return partitioner.partition(topic, key, value, data, partitionInfoMap.computeIfAbsent(topic, producer::partitionsFor));
}
}
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index 8a0f529..30b20e7 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -93,13 +93,12 @@ public class TbKafkaRequestTemplate<Request, Response> {
ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
responses.forEach(response -> {
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
- Response decocedResponse = null;
+ Response decodedResponse = null;
UUID requestId = null;
if (requestIdHeader == null) {
try {
- decocedResponse = responseTemplate.decode(response);
- requestId = responseTemplate.extractRequestId(decocedResponse);
-
+ decodedResponse = responseTemplate.decode(response);
+ requestId = responseTemplate.extractRequestId(decodedResponse);
} catch (IOException e) {
log.error("Failed to decode response", e);
}
@@ -107,17 +106,17 @@ public class TbKafkaRequestTemplate<Request, Response> {
requestId = bytesToUuid(requestIdHeader.value());
}
if (requestId == null) {
- log.error("[{}] Missing requestId in header and response", response);
+ log.error("[{}] Missing requestId in header and body", response);
} else {
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
try {
- if (decocedResponse == null) {
- decocedResponse = responseTemplate.decode(response);
+ if (decodedResponse == null) {
+ decodedResponse = responseTemplate.decode(response);
}
- expectedResponse.future.set(decocedResponse);
+ expectedResponse.future.set(decodedResponse);
} catch (IOException e) {
expectedResponse.future.setException(e);
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
new file mode 100644
index 0000000..536c4b9
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+public class TbKafkaResponseTemplate<Request, Response> {
+
+ private final TBKafkaConsumerTemplate<Request> requestTemplate;
+ private final TBKafkaProducerTemplate<Response> responseTemplate;
+ private final TbKafkaHandler<Request, Response> handler;
+ private final ConcurrentMap<UUID, String> pendingRequests;
+ private final ExecutorService executor;
+ private final long maxPendingRequests;
+
+ private final long pollInterval;
+ private volatile boolean stopped = false;
+
+ @Builder
+ public TbKafkaResponseTemplate(TBKafkaConsumerTemplate<Request> requestTemplate,
+ TBKafkaProducerTemplate<Response> responseTemplate,
+ TbKafkaHandler<Request, Response> handler,
+ long pollInterval,
+ long maxPendingRequests,
+ ExecutorService executor) {
+ this.requestTemplate = requestTemplate;
+ this.responseTemplate = responseTemplate;
+ this.handler = handler;
+ this.pendingRequests = new ConcurrentHashMap<>();
+ this.maxPendingRequests = maxPendingRequests;
+ this.pollInterval = pollInterval;
+ this.executor = executor;
+ }
+
+ public void init() {
+ this.responseTemplate.init();
+ requestTemplate.subscribe();
+ executor.submit(() -> {
+ long nextCleanupMs = 0L;
+ while (!stopped) {
+ ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
+ requests.forEach(request -> {
+ Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+ if (requestIdHeader == null) {
+ log.error("[{}] Missing requestId in header", request);
+ return;
+ }
+ UUID requestId = bytesToUuid(requestIdHeader.value());
+ if (requestId == null) {
+ log.error("[{}] Missing requestId in header and body", request);
+ return;
+ }
+ Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
+ if (responseTopicHeader == null) {
+ log.error("[{}] Missing response topic in header", request);
+ return;
+ }
+ String responseTopic = bytesToUuid(responseTopicHeader.value());
+ if (requestId == null) {
+ log.error("[{}] Missing requestId in header and body", request);
+ return;
+ }
+
+ Request decodedRequest = null;
+ String responseTopic = null;
+
+ try {
+ if (decodedRequest == null) {
+ decodedRequest = requestTemplate.decode(request);
+ }
+ executor.submit(() -> {
+ handler.handle(decodedRequest, );
+ });
+ } catch (IOException e) {
+ expectedRequest.future.setException(e);
+ }
+
+ });
+ }
+ });
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public ListenableFuture<Response> post(String key, Request request) {
+ if (tickSize > maxPendingRequests) {
+ return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
+ }
+ UUID requestId = UUID.randomUUID();
+ List<Header> headers = new ArrayList<>(2);
+ headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
+ headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
+ SettableFuture<Response> future = SettableFuture.create();
+ pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+ request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
+ requestTemplate.send(key, request, headers);
+ return future;
+ }
+
+ private byte[] uuidToBytes(UUID uuid) {
+ ByteBuffer buf = ByteBuffer.allocate(16);
+ buf.putLong(uuid.getMostSignificantBits());
+ buf.putLong(uuid.getLeastSignificantBits());
+ return buf.array();
+ }
+
+ private static UUID bytesToUuid(byte[] bytes) {
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ long firstLong = bb.getLong();
+ long secondLong = bb.getLong();
+ return new UUID(firstLong, secondLong);
+ }
+
+ private byte[] stringToBytes(String string) {
+ return string.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private String bytesToString(byte[] data) {
+ return new String(data, StandardCharsets.UTF_8);
+ }
+
+ private static class ResponseMetaData<T> {
+ private final long expTime;
+ private final SettableFuture<T> future;
+
+ ResponseMetaData(long ts, SettableFuture<T> future) {
+ this.expTime = ts;
+ this.future = future;
+ }
+ }
+
+}
common/transport/pom.xml 5(+5 -0)
diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index b8a7ab5..46efbf4 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -79,6 +79,11 @@
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
index c481170..47b4278 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
@@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
@Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantIntervalRegistryCleaner extends IntervalRegistryCleaner {
public TenantIntervalRegistryCleaner(TenantMsgsIntervalRegistry intervalRegistry,
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
index c56f457..3bdcf93 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.quota.tenant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
@@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantIntervalRegistryLogger extends IntervalRegistryLogger {
private final long logIntervalMin;
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
index 6e8402c..3402e62 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
@@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
@Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantMsgsIntervalRegistry extends KeyBasedIntervalRegistry {
public TenantMsgsIntervalRegistry(@Value("${quota.rule.tenant.intervalMs}") long intervalDurationMs,
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
index a68860a..875a666 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
@@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
@Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantQuotaService extends AbstractQuotaService {
public TenantQuotaService(TenantMsgsIntervalRegistry requestRegistry, TenantRequestLimitPolicy requestsPolicy,
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
index cc32c81..c5dfc40 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
@@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.RequestLimitPolicy;
@Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantRequestLimitPolicy extends RequestLimitPolicy {
public TenantRequestLimitPolicy(@Value("${quota.rule.tenant.limit}") long limit) {
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 2826945..7b2c05e 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index ff0debc..e72a626 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos;
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
index bfc7ac8..5443f7e 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.common.transport;
/**
@@ -6,6 +21,6 @@ package org.thingsboard.server.common.transport;
public interface TransportServiceCallback<T> {
void onSuccess(T msg);
- void onError(Exception e);
+ void onError(Throwable e);
}
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index 65d1468..f0cc3c7 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -95,3 +95,17 @@ message ValidateDeviceTokenResponseMsg {
DeviceInfoProto deviceInfo = 1;
}
+/**
+ * Main messages;
+ */
+message TransportToRuleEngineMsg {
+
+}
+
+message TransportApiRequestMsg {
+ ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
+}
+
+message TransportApiResponseMsg {
+ ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1;
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
index 20f8a55..37441e9 100644
--- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
@@ -17,7 +17,11 @@ package org.thingsboard.server.common.transport.quota;
import org.junit.Before;
import org.junit.Test;
-import org.thingsboard.server.common.transport.quota.host.*;
+import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryCleaner;
+import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryLogger;
+import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
+import org.thingsboard.server.common.transport.quota.host.HostRequestLimitPolicy;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
pom.xml 2(+1 -1)
diff --git a/pom.xml b/pom.xml
index d6db123..62fda36 100755
--- a/pom.xml
+++ b/pom.xml
@@ -366,7 +366,7 @@
</dependency>
<dependency>
<groupId>org.thingsboard.transport</groupId>
- <artifactId>mqtt</artifactId>
+ <artifactId>mqtt-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
new file mode 100644
index 0000000..fafbc4e
--- /dev/null
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -0,0 +1,284 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.msg.core.*;
+import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
+import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+@Slf4j
+public class GatewaySessionCtx {
+
+ private static final String DEFAULT_DEVICE_TYPE = "default";
+ public static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
+ public static final String DEVICE_PROPERTY = "device";
+// private final Device gateway;
+// private final SessionId gatewaySessionId;
+// private final SessionMsgProcessor processor;
+// private final DeviceService deviceService;
+// private final DeviceAuthService authService;
+// private final RelationService relationService;
+// private final Map<String, GatewayDeviceSessionCtx> devices;
+// private final ConcurrentMap<String, Integer> mqttQoSMap;
+ private ChannelHandlerContext channel;
+
+// public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
+// this.processor = processor;
+// this.deviceService = deviceService;
+// this.authService = authService;
+// this.relationService = relationService;
+// this.gateway = gatewaySessionCtx.getDevice();
+// this.gatewaySessionId = gatewaySessionCtx.getSessionId();
+// this.devices = new HashMap<>();
+// this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
+// }
+
+ public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
+
+ }
+
+ public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
+ JsonElement json = getJson(msg);
+ String deviceName = checkDeviceName(getDeviceName(json));
+ String deviceType = getDeviceType(json);
+ onDeviceConnect(deviceName, deviceType);
+ ack(msg);
+ }
+
+ private void onDeviceConnect(String deviceName, String deviceType) {
+// if (!devices.containsKey(deviceName)) {
+// Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
+// if (device == null) {
+// device = new Device();
+// device.setTenantId(gateway.getTenantId());
+// device.setName(deviceName);
+// device.setType(deviceType);
+// device.setCustomerId(gateway.getCustomerId());
+// device = deviceService.saveDevice(device);
+// relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
+// processor.onDeviceAdded(device);
+// }
+// GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);
+// devices.put(deviceName, ctx);
+// log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
+// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+// }
+ }
+
+ public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
+// String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
+// GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
+// if (deviceSessionCtx != null) {
+// processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+// deviceSessionCtx.setClosed(true);
+// log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
+// } else {
+// log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
+// }
+// ack(msg);
+ }
+
+ public void onGatewayDisconnect() {
+// devices.forEach((k, v) -> {
+// processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
+// });
+ }
+
+ public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
+// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+// int requestId = mqttMsg.variableHeader().messageId();
+// if (json.isJsonObject()) {
+// JsonObject jsonObj = json.getAsJsonObject();
+// for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+// String deviceName = checkDeviceConnected(deviceEntry.getKey());
+// if (!deviceEntry.getValue().isJsonArray()) {
+// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+// }
+// BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
+// JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
+// for (JsonElement element : deviceData) {
+// JsonConverter.parseWithTs(request, element.getAsJsonObject());
+// }
+// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+// }
+// } else {
+// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+// }
+ }
+
+ public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
+// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+// if (json.isJsonObject()) {
+// JsonObject jsonObj = json.getAsJsonObject();
+// String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString());
+// Integer requestId = jsonObj.get("id").getAsInt();
+// String data = jsonObj.get("data").toString();
+// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
+// ack(mqttMsg);
+// } else {
+// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+// }
+ }
+
+ public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
+// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+// int requestId = mqttMsg.variableHeader().messageId();
+// if (json.isJsonObject()) {
+// JsonObject jsonObj = json.getAsJsonObject();
+// for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+// String deviceName = checkDeviceConnected(deviceEntry.getKey());
+// if (!deviceEntry.getValue().isJsonObject()) {
+// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+// }
+// long ts = System.currentTimeMillis();
+// BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
+// JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
+// request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
+// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+// }
+// } else {
+// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+// }
+ }
+
+ public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException {
+// JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload());
+// if (json.isJsonObject()) {
+// JsonObject jsonObj = json.getAsJsonObject();
+// int requestId = jsonObj.get("id").getAsInt();
+// String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
+// boolean clientScope = jsonObj.get("client").getAsBoolean();
+// Set<String> keys;
+// if (jsonObj.has("key")) {
+// keys = Collections.singleton(jsonObj.get("key").getAsString());
+// } else {
+// JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
+// keys = new HashSet<>();
+// for (JsonElement keyObj : keysArray) {
+// keys.add(keyObj.getAsString());
+// }
+// }
+//
+// BasicGetAttributesRequest request;
+// if (clientScope) {
+// request = new BasicGetAttributesRequest(requestId, keys, null);
+// } else {
+// request = new BasicGetAttributesRequest(requestId, null, keys);
+// }
+// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+// ack(msg);
+// } else {
+// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+// }
+ }
+
+ private String checkDeviceConnected(String deviceName) {
+// if (!devices.containsKey(deviceName)) {
+// log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName);
+// onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
+// }
+// return deviceName;
+ return null;
+ }
+
+ private String checkDeviceName(String deviceName) {
+ if (StringUtils.isEmpty(deviceName)) {
+ throw new RuntimeException("Device name is empty!");
+ } else {
+ return deviceName;
+ }
+ }
+
+ private String getDeviceName(JsonElement json) throws AdaptorException {
+ return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
+ }
+
+ private String getDeviceType(JsonElement json) throws AdaptorException {
+ JsonElement type = json.getAsJsonObject().get("type");
+ return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
+ }
+
+ private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
+// return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+ return null;
+ }
+
+ protected SessionMsgProcessor getProcessor() {
+// return processor;
+ return null;
+ }
+
+ DeviceAuthService getAuthService() {
+// return authService;
+ return null;
+ }
+
+ public void setChannel(ChannelHandlerContext channel) {
+ this.channel = channel;
+ }
+
+ private void ack(MqttPublishMessage msg) {
+ if (msg.variableHeader().messageId() > 0) {
+ writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+ }
+ }
+
+ void writeAndFlush(MqttMessage mqttMessage) {
+ channel.writeAndFlush(mqttMessage);
+ }
+
+}
transport/mqtt-transport/pom.xml 28(+4 -24)
diff --git a/transport/mqtt-transport/pom.xml b/transport/mqtt-transport/pom.xml
index 8d8b24d..012aabd 100644
--- a/transport/mqtt-transport/pom.xml
+++ b/transport/mqtt-transport/pom.xml
@@ -41,32 +41,12 @@
<artifactId>transport</artifactId>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
+ <groupId>org.thingsboard.transport</groupId>
+ <artifactId>mqtt-common</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>queue</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java
new file mode 100644
index 0000000..493c0c8
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java
@@ -0,0 +1,45 @@
+package org.thingsboard.server.mqtt.service;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class AsyncCallbackTemplate {
+
+ public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
+ Consumer<Throwable> onFailure) {
+ withCallback(future, onSuccess, onFailure, null);
+ }
+
+ public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
+ Consumer<Throwable> onFailure, Executor executor) {
+ FutureCallback<T> callback = new FutureCallback<T>() {
+ @Override
+ public void onSuccess(@Nullable T result) {
+ try {
+ onSuccess.accept(result);
+ } catch (Throwable th) {
+ onFailure(th);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ onFailure.accept(t);
+ }
+ };
+ if (executor != null) {
+ Futures.addCallback(future, callback, executor);
+ } else {
+ Futures.addCallback(future, callback);
+ }
+ }
+
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
new file mode 100644
index 0000000..a1641f1
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -0,0 +1,110 @@
+package org.thingsboard.server.mqtt.service;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.transport.mqtt.MqttTransportContext;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+@Service
+public class MqttTransportService implements TransportService {
+
+ @Value("${kafka.rule-engine.topic}")
+ private String ruleEngineTopic;
+ @Value("${kafka.transport-api.requests-topic}")
+ private String transportApiRequestsTopic;
+ @Value("${kafka.transport-api.responses-topic}")
+ private String transportApiResponsesTopic;
+ @Value("${kafka.transport-api.max_pending_requests}")
+ private long maxPendingRequests;
+ @Value("${kafka.transport-api.max_requests_timeout}")
+ private long maxRequestsTimeout;
+ @Value("${kafka.transport-api.response_poll_interval}")
+ private int responsePollDuration;
+ @Value("${kafka.transport-api.response_auto_commit_interval}")
+ private int autoCommitInterval;
+
+ @Autowired
+ private TbKafkaSettings kafkaSettings;
+ //We use this to get the node id. We should replace this with a component that provides the node id.
+ @Autowired
+ private MqttTransportContext transportContext;
+
+ private ExecutorService transportCallbackExecutor;
+
+ private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
+
+ @PostConstruct
+ public void init() {
+ this.transportCallbackExecutor = Executors.newCachedThreadPool();
+
+ TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.defaultTopic(transportApiRequestsTopic);
+ requestBuilder.encoder(new TransportApiRequestEncoder());
+
+ TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
+ responseBuilder.settings(kafkaSettings);
+ responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
+ responseBuilder.clientId(transportContext.getNodeId());
+ responseBuilder.groupId("transport-node");
+ responseBuilder.autoCommit(true);
+ responseBuilder.autoCommitIntervalMs(autoCommitInterval);
+ responseBuilder.decoder(new TransportApiResponseDecoder());
+
+ TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
+ <TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaRequestTemplate.builder();
+ builder.requestTemplate(requestBuilder.build());
+ builder.responseTemplate(responseBuilder.build());
+ builder.maxPendingRequests(maxPendingRequests);
+ builder.maxRequestTimeout(maxRequestsTimeout);
+ builder.pollInterval(responsePollDuration);
+ transportApiTemplate = builder.build();
+ transportApiTemplate.init();
+ }
+
+ @PreDestroy
+ public void destroy() {
+ if (transportApiTemplate != null) {
+ transportApiTemplate.stop();
+ }
+ if (transportCallbackExecutor != null) {
+ transportCallbackExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceTokenResponseMsg> callback) {
+ AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+ response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
+ }
+
+ @Override
+ public void process(SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+
+ }
+
+ @Override
+ public void process(PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+
+ }
+
+ @Override
+ public void process(PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+
+ }
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
new file mode 100644
index 0000000..f931db6
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
@@ -0,0 +1,14 @@
+package org.thingsboard.server.mqtt.service;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiRequestEncoder implements TbKafkaEncoder<TransportApiRequestMsg> {
+ @Override
+ public byte[] encode(TransportApiRequestMsg value) {
+ return value.toByteArray();
+ }
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
new file mode 100644
index 0000000..22e1647
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
@@ -0,0 +1,16 @@
+package org.thingsboard.server.mqtt.service;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.kafka.TbKafkaDecoder;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiResponseDecoder implements TbKafkaDecoder<TransportApiResponseMsg> {
+ @Override
+ public TransportApiResponseMsg decode(byte[] data) throws IOException {
+ return TransportApiResponseMsg.parseFrom(data);
+ }
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
index 3c54938..4740342 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
@@ -19,14 +19,13 @@ import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.Arrays;
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
-@ComponentScan({"org.thingsboard.server"})
+@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"})
public class ThingsboardMqttTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
index c12f25e..a2a0d54 100644
--- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -44,6 +44,27 @@ mqtt:
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
+#Quota parameters
+quota:
+ host:
+ # Max allowed number of API requests in interval for single host
+ limit: "${QUOTA_HOST_LIMIT:10000}"
+ # Interval duration
+ intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
+ # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
+ ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
+ # Interval for scheduled task that cleans expired records. TTL is used for expiring
+ cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
+ # Enable Host API Limits
+ enabled: "${QUOTA_HOST_ENABLED:false}"
+ # Array of whitelist hosts
+ whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
+ # Array of blacklist hosts
+ blacklist: "${QUOTA_HOST_BLACKLIST:}"
+ log:
+ topSize: 10
+ intervalMin: 2
+
kafka:
enabled: true
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
@@ -52,6 +73,13 @@ kafka:
batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
linger.ms: "${TB_KAFKA_LINGER_MS:1}"
buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
- topic:
- telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}"
- requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}"
\ No newline at end of file
+ transport-api:
+ requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
+ responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
+ max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
+ max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
+ response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
+ response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
+ # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
+ rule-engine:
+ topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
transport/pom.xml 2(+1 -1)
diff --git a/transport/pom.xml b/transport/pom.xml
index a01c7ac..2401d3f 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -37,7 +37,7 @@
<modules>
<module>http</module>
<module>coap</module>
- <module>mqtt</module>
+ <module>mqtt-common</module>
<module>mqtt-transport</module>
</modules>