thingsboard-developers

Transport

10/5/2018 1:45:10 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
new file mode 100644
index 0000000..7831b34
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -0,0 +1,14 @@
+package org.thingsboard.server.service.transport;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
+public class RemoteTransportApiService implements TransportApiService {
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
new file mode 100644
index 0000000..28a769a
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
@@ -0,0 +1,7 @@
+package org.thingsboard.server.service.transport;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public interface TransportApiService {
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
new file mode 100644
index 0000000..8d851bc
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+public abstract class AbstractTbKafkaTemplate {
+    protected byte[] uuidToBytes(UUID uuid) {
+        ByteBuffer buf = ByteBuffer.allocate(16);
+        buf.putLong(uuid.getMostSignificantBits());
+        buf.putLong(uuid.getLeastSignificantBits());
+        return buf.array();
+    }
+
+    protected static UUID bytesToUuid(byte[] bytes) {
+        ByteBuffer bb = ByteBuffer.wrap(bytes);
+        long firstLong = bb.getLong();
+        long secondLong = bb.getLong();
+        return new UUID(firstLong, secondLong);
+    }
+
+    protected byte[] stringToBytes(String string) {
+        return string.getBytes(StandardCharsets.UTF_8);
+    }
+
+    protected String bytesToString(byte[] data) {
+        return new String(data, StandardCharsets.UTF_8);
+    }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index 30b20e7..ba08bf8 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -23,24 +23,25 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Created by ashvayka on 25.09.18.
  */
 @Slf4j
-public class TbKafkaRequestTemplate<Request, Response> {
+public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTemplate {
 
     private final TBKafkaProducerTemplate<Request> requestTemplate;
     private final TBKafkaConsumerTemplate<Response> responseTemplate;
@@ -163,24 +164,6 @@ public class TbKafkaRequestTemplate<Request, Response> {
         return future;
     }
 
-    private byte[] uuidToBytes(UUID uuid) {
-        ByteBuffer buf = ByteBuffer.allocate(16);
-        buf.putLong(uuid.getMostSignificantBits());
-        buf.putLong(uuid.getLeastSignificantBits());
-        return buf.array();
-    }
-
-    private static UUID bytesToUuid(byte[] bytes) {
-        ByteBuffer bb = ByteBuffer.wrap(bytes);
-        long firstLong = bb.getLong();
-        long secondLong = bb.getLong();
-        return new UUID(firstLong, secondLong);
-    }
-
-    private byte[] stringToBytes(String string) {
-        return string.getBytes(StandardCharsets.UTF_8);
-    }
-
     private static class ResponseMetaData<T> {
         private final long expTime;
         private final SettableFuture<T> future;
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 536c4b9..1f53387 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -15,52 +15,45 @@
  */
 package org.thingsboard.server.kafka;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Created by ashvayka on 25.09.18.
  */
 @Slf4j
-public class TbKafkaResponseTemplate<Request, Response> {
+public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaTemplate {
 
     private final TBKafkaConsumerTemplate<Request> requestTemplate;
     private final TBKafkaProducerTemplate<Response> responseTemplate;
     private final TbKafkaHandler<Request, Response> handler;
     private final ConcurrentMap<UUID, String> pendingRequests;
     private final ExecutorService executor;
-    private final long maxPendingRequests;
+    private final int maxPendingRequests;
 
     private final long pollInterval;
     private volatile boolean stopped = false;
+    //TODO:
+    private final AtomicInteger pendingRequestCount = new AtomicInteger();
 
     @Builder
     public TbKafkaResponseTemplate(TBKafkaConsumerTemplate<Request> requestTemplate,
                                    TBKafkaProducerTemplate<Response> responseTemplate,
                                    TbKafkaHandler<Request, Response> handler,
                                    long pollInterval,
-                                   long maxPendingRequests,
+                                   int maxPendingRequests,
                                    ExecutorService executor) {
         this.requestTemplate = requestTemplate;
         this.responseTemplate = responseTemplate;
@@ -75,8 +68,11 @@ public class TbKafkaResponseTemplate<Request, Response> {
         this.responseTemplate.init();
         requestTemplate.subscribe();
         executor.submit(() -> {
-            long nextCleanupMs = 0L;
             while (!stopped) {
+                if(pendingRequestCount.get() > maxPendingRequests){
+
+                }
+                //TODO: we need to protect from reading too much requests.
                 ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
                 requests.forEach(request -> {
                     Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
@@ -94,26 +90,15 @@ public class TbKafkaResponseTemplate<Request, Response> {
                         log.error("[{}] Missing response topic in header", request);
                         return;
                     }
-                    String responseTopic = bytesToUuid(responseTopicHeader.value());
-                    if (requestId == null) {
-                        log.error("[{}] Missing requestId in header and body", request);
-                        return;
-                    }
-
-                    Request decodedRequest = null;
-                    String responseTopic = null;
-
+                    String responseTopic = bytesToString(responseTopicHeader.value());
                     try {
-                        if (decodedRequest == null) {
-                            decodedRequest = requestTemplate.decode(request);
-                        }
-                        executor.submit(() -> {
-                            handler.handle(decodedRequest, );
-                        });
-                    } catch (IOException e) {
-                        expectedRequest.future.setException(e);
+                        Request decodedRequest = requestTemplate.decode(request);
+                        executor.submit(() -> handler.handle(decodedRequest,
+                                response -> reply(requestId, responseTopic, response),
+                                e -> log.error("[{}] Failed to process the request: {}", requestId, request, e)));
+                    } catch (Throwable e) {
+                        log.error("[{}] Failed to process the request: {}", requestId, request, e);
                     }
-
                 });
             }
         });
@@ -123,51 +108,8 @@ public class TbKafkaResponseTemplate<Request, Response> {
         stopped = true;
     }
 
-    public ListenableFuture<Response> post(String key, Request request) {
-        if (tickSize > maxPendingRequests) {
-            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
-        }
-        UUID requestId = UUID.randomUUID();
-        List<Header> headers = new ArrayList<>(2);
-        headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
-        headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
-        SettableFuture<Response> future = SettableFuture.create();
-        pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
-        request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
-        requestTemplate.send(key, request, headers);
-        return future;
-    }
-
-    private byte[] uuidToBytes(UUID uuid) {
-        ByteBuffer buf = ByteBuffer.allocate(16);
-        buf.putLong(uuid.getMostSignificantBits());
-        buf.putLong(uuid.getLeastSignificantBits());
-        return buf.array();
-    }
-
-    private static UUID bytesToUuid(byte[] bytes) {
-        ByteBuffer bb = ByteBuffer.wrap(bytes);
-        long firstLong = bb.getLong();
-        long secondLong = bb.getLong();
-        return new UUID(firstLong, secondLong);
-    }
-
-    private byte[] stringToBytes(String string) {
-        return string.getBytes(StandardCharsets.UTF_8);
-    }
-
-    private String bytesToString(byte[] data) {
-        return new String(data, StandardCharsets.UTF_8);
-    }
-
-    private static class ResponseMetaData<T> {
-        private final long expTime;
-        private final SettableFuture<T> future;
-
-        ResponseMetaData(long ts, SettableFuture<T> future) {
-            this.expTime = ts;
-            this.future = future;
-        }
+    private void reply(UUID requestId, String topic, Response response) {
+        responseTemplate.send(topic, response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
     }
 
 }