thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java 153(+152 -1)
application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java 31(+31 -0)
application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java 30(+30 -0)
application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java 20(+19 -1)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java 10(+5 -5)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java 4(+2 -2)
transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 134(+67 -67)
transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java 2(+1 -1)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 15(+15 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 10(+5 -5)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java 30(+23 -7)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java 15(+15 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
index 9275847..416a4d6 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -49,7 +49,7 @@ public class DefaultMsgQueueService implements MsgQueueService {
@Autowired
private MsgQueue msgQueue;
- @Autowired
+ @Autowired(required = false)
private TenantQuotaService quotaService;
private ScheduledExecutorService cleanupExecutor;
@@ -74,7 +74,7 @@ public class DefaultMsgQueueService implements MsgQueueService {
@Override
public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
- if(quotaService.isQuotaExceeded(tenantId.getId().toString())) {
+ if(quotaService != null && quotaService.isQuotaExceeded(tenantId.getId().toString())) {
log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId());
return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded"));
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index 37a6211..b334ff1 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -103,9 +103,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new RemoteJsResponseDecoder());
- responseBuilder.requestIdExtractor((response) -> {
- return new UUID(response.getRequestIdMSB(), response.getRequestIdLSB());
- });
+ responseBuilder.requestIdExtractor((response) -> new UUID(response.getRequestIdMSB(), response.getRequestIdLSB()));
TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder();
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 7831b34..0c7b5f4 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -1,14 +1,165 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.transport;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.common.data.security.DeviceCredentialsType;
+import org.thingsboard.server.dao.device.DeviceCredentialsService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenResponseMsg;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Created by ashvayka on 05.10.18.
*/
@Slf4j
@Service
-@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
+@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
public class RemoteTransportApiService implements TransportApiService {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Value("${transport.remote.transport_api.requests_topic}")
+ private String transportApiRequestsTopic;
+ @Value("${transport.remote.transport_api.responses_topic}")
+ private String transportApiResponsesTopic;
+ @Value("${transport.remote.transport_api.max_pending_requests}")
+ private int maxPendingRequests;
+ @Value("${transport.remote.transport_api.request_timeout}")
+ private long requestTimeout;
+ @Value("${transport.remote.transport_api.request_poll_interval}")
+ private int responsePollDuration;
+ @Value("${transport.remote.transport_api.request_auto_commit_interval}")
+ private int autoCommitInterval;
+
+ @Autowired
+ private TbKafkaSettings kafkaSettings;
+
+ @Autowired
+ private DiscoveryService discoveryService;
+
+ @Autowired
+ private DeviceService deviceService;
+
+ @Autowired
+ private DeviceCredentialsService deviceCredentialsService;
+
+ private ExecutorService transportCallbackExecutor;
+
+ private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
+
+ @PostConstruct
+ public void init() {
+ this.transportCallbackExecutor = Executors.newCachedThreadPool();
+
+ TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
+ responseBuilder.settings(kafkaSettings);
+ responseBuilder.defaultTopic(transportApiResponsesTopic);
+ responseBuilder.encoder(new TransportApiResponseEncoder());
+
+ TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.topic(transportApiRequestsTopic);
+ requestBuilder.clientId(discoveryService.getNodeId());
+ requestBuilder.groupId("tb-node");
+ requestBuilder.autoCommit(true);
+ requestBuilder.autoCommitIntervalMs(autoCommitInterval);
+ requestBuilder.decoder(new TransportApiRequestDecoder());
+
+ TbKafkaResponseTemplate.TbKafkaResponseTemplateBuilder
+ <TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaResponseTemplate.builder();
+ builder.requestTemplate(requestBuilder.build());
+ builder.responseTemplate(responseBuilder.build());
+ builder.maxPendingRequests(maxPendingRequests);
+ builder.requestTimeout(requestTimeout);
+ builder.pollInterval(responsePollDuration);
+ builder.executor(transportCallbackExecutor);
+ builder.handler(this);
+ transportApiTemplate = builder.build();
+ transportApiTemplate.init();
+ }
+
+ @Override
+ public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
+ if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
+ ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
+ //TODO: Make async and enable caching
+ DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(msg.getToken());
+ if (credentials != null && credentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) {
+ return getDeviceInfo(credentials.getDeviceId());
+ } else {
+ return getEmptyTransportApiResponseFuture();
+ }
+ }
+ return getEmptyTransportApiResponseFuture();
+ }
+
+ private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
+ return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
+ if (device == null) {
+ log.trace("[{}] Failed to lookup device by id", deviceId);
+ return getEmptyTransportApiResponse();
+ }
+ try {
+ DeviceInfoProto deviceInfoProto = DeviceInfoProto.newBuilder()
+ .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
+ .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
+ .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
+ .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
+ .setDeviceName(device.getName())
+ .setDeviceType(device.getType())
+ .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
+ .build();
+ return TransportApiResponseMsg.newBuilder()
+ .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
+ } catch (JsonProcessingException e) {
+ log.warn("[{}] Failed to lookup device by id", deviceId, e);
+ return getEmptyTransportApiResponse();
+ }
+ });
+ }
+
+ private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
+ return Futures.immediateFuture(getEmptyTransportApiResponse());
+ }
+
+ private TransportApiResponseMsg getEmptyTransportApiResponse() {
+ return TransportApiResponseMsg.newBuilder()
+ .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.getDefaultInstance()).build();
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java
new file mode 100644
index 0000000..c54f1a0
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.kafka.TbKafkaDecoder;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiRequestDecoder implements TbKafkaDecoder<TransportApiRequestMsg> {
+ @Override
+ public TransportApiRequestMsg decode(byte[] data) throws IOException {
+ return TransportApiRequestMsg.parseFrom(data);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java
new file mode 100644
index 0000000..175a92d
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiResponseEncoder implements TbKafkaEncoder<TransportApiResponseMsg> {
+ @Override
+ public byte[] encode(TransportApiResponseMsg value) {
+ return value.toByteArray();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
index 28a769a..cc6f5b9 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
@@ -1,7 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.service.transport;
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.kafka.TbKafkaHandler;
+
/**
* Created by ashvayka on 05.10.18.
*/
-public interface TransportApiService {
+public interface TransportApiService extends TbKafkaHandler<TransportProtos.TransportApiRequestMsg, TransportProtos.TransportApiResponseMsg> {
}
diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index c072311..751bde6 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index f35df67..c58f3a1 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -84,7 +84,7 @@ http:
# MQTT server parameters
mqtt:
# Enable/disable mqtt transport protocol.
- enabled: "${MQTT_ENABLED:true}"
+ enabled: "${MQTT_ENABLED:false}"
bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
bind_port: "${MQTT_BIND_PORT:1883}"
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
@@ -149,7 +149,7 @@ quota:
# Interval for scheduled task that cleans expired records. TTL is used for expiring
cleanPeriodMs: "${QUOTA_TENANT_CLEAN_PERIOD_MS:300000}"
# Enable Host API Limits
- enabled: "${QUOTA_TENANT_ENABLED:false}"
+ enabled: "${QUOTA_TENANT_ENABLED:true}"
# Array of whitelist tenants
whitelist: "${QUOTA_TENANT_WHITELIST:}"
# Array of blacklist tenants
@@ -450,3 +450,16 @@ js:
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
# Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
+
+transport:
+ remote:
+ enabled: "${REMOTE_TRANSPORT_ENABLED:true}"
+ transport_api:
+ requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
+ responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
+ max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
+ request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
+ request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
+ request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}"
+ rule_engine:
+ topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
\ No newline at end of file
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
index 8d851bc..386bfc5 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
index 66d53c3..a64d28e 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
@@ -1,12 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.kafka;
-import java.util.function.Consumer;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Created by ashvayka on 05.10.18.
*/
public interface TbKafkaHandler<Request, Response> {
- void handle(Request request, Consumer<Response> onSuccess, Consumer<Throwable> onFailure);
+ ListenableFuture<Response> handle(Request request) throws Exception;
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 2611e94..1e109d2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -101,6 +101,10 @@ public class TBKafkaProducerTemplate<T> {
return send(this.defaultTopic, key, value, timestamp, headers);
}
+ public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers) {
+ return send(topic, key, value, null, headers);
+ }
+
public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
byte[] data = encoder.encode(value);
ProducerRecord<String, byte[]> record;
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 1f53387..0dbf45d 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,13 +21,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
-import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -40,12 +42,14 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
private final TBKafkaProducerTemplate<Response> responseTemplate;
private final TbKafkaHandler<Request, Response> handler;
private final ConcurrentMap<UUID, String> pendingRequests;
- private final ExecutorService executor;
+ private final ExecutorService loopExecutor;
+ private final ScheduledExecutorService timeoutExecutor;
+ private final ExecutorService callbackExecutor;
private final int maxPendingRequests;
+ private final long requestTimeout;
private final long pollInterval;
private volatile boolean stopped = false;
- //TODO:
private final AtomicInteger pendingRequestCount = new AtomicInteger();
@Builder
@@ -53,6 +57,7 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
TBKafkaProducerTemplate<Response> responseTemplate,
TbKafkaHandler<Request, Response> handler,
long pollInterval,
+ long requestTimeout,
int maxPendingRequests,
ExecutorService executor) {
this.requestTemplate = requestTemplate;
@@ -61,18 +66,24 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
this.pendingRequests = new ConcurrentHashMap<>();
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
- this.executor = executor;
+ this.requestTimeout = requestTimeout;
+ this.callbackExecutor = executor;
+ this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.loopExecutor = Executors.newSingleThreadExecutor();
}
public void init() {
this.responseTemplate.init();
requestTemplate.subscribe();
- executor.submit(() -> {
+ loopExecutor.submit(() -> {
while (!stopped) {
- if(pendingRequestCount.get() > maxPendingRequests){
-
+ while (pendingRequestCount.get() >= maxPendingRequests) {
+ try {
+ Thread.sleep(pollInterval);
+ } catch (InterruptedException e) {
+ log.trace("Failed to wait until the server has capacity to handle new requests", e);
+ }
}
- //TODO: we need to protect from reading too much requests.
ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
requests.forEach(request -> {
Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
@@ -92,12 +103,27 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
}
String responseTopic = bytesToString(responseTopicHeader.value());
try {
+ pendingRequestCount.getAndIncrement();
Request decodedRequest = requestTemplate.decode(request);
- executor.submit(() -> handler.handle(decodedRequest,
- response -> reply(requestId, responseTopic, response),
- e -> log.error("[{}] Failed to process the request: {}", requestId, request, e)));
+ AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
+ response -> {
+ pendingRequestCount.decrementAndGet();
+ reply(requestId, responseTopic, response);
+ },
+ e -> {
+ pendingRequestCount.decrementAndGet();
+ if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
+ log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
+ } else {
+ log.trace("[{}] Failed to process the request: {}", requestId, request, e);
+ }
+ },
+ requestTimeout,
+ timeoutExecutor,
+ callbackExecutor);
} catch (Throwable e) {
- log.error("[{}] Failed to process the request: {}", requestId, request, e);
+ pendingRequestCount.decrementAndGet();
+ log.warn("[{}] Failed to process the request: {}", requestId, request, e);
}
});
}
@@ -106,10 +132,16 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
public void stop() {
stopped = true;
+ if (timeoutExecutor != null) {
+ timeoutExecutor.shutdownNow();
+ }
+ if (loopExecutor != null) {
+ loopExecutor.shutdownNow();
+ }
}
private void reply(UUID requestId, String topic, Response response) {
- responseTemplate.send(topic, response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
+ responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
}
}
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index f0cc3c7..029bb14 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -47,11 +47,13 @@ message TsKvListProto {
}
message DeviceInfoProto {
- int64 deviceIdMSB = 1;
- int64 deviceIdLSB = 2;
- string deviceName = 3;
- string deviceType = 4;
- string additionalInfo = 5;
+ int64 tenantIdMSB = 1;
+ int64 tenantIdLSB = 2;
+ int64 deviceIdMSB = 3;
+ int64 deviceIdLSB = 4;
+ string deviceName = 5;
+ string deviceType = 6;
+ string additionalInfo = 7;
}
/**
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 8a596d7..0853124 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -157,10 +157,10 @@ public class CoapTransportResource extends CoapResource {
CoapSessionCtx ctx = new CoapSessionCtx(exchange, adaptor, processor, authService, timeout);
- if (!ctx.login(credentials.get())) {
- exchange.respond(ResponseCode.UNAUTHORIZED);
- return Optional.empty();
- }
+// if (!ctx.login(credentials.get())) {
+// exchange.respond(ResponseCode.UNAUTHORIZED);
+// return Optional.empty();
+// }
AdaptorToSessionActorMsg msg;
try {
@@ -190,7 +190,7 @@ public class CoapTransportResource extends CoapResource {
throw new IllegalArgumentException("Unsupported msg type: " + type);
}
log.trace("Processing msg: {}", msg);
- processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
+// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
} catch (AdaptorException e) {
log.debug("Failed to decode payload {}", e);
exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
index 7a703b2..c43a4de 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
@@ -49,7 +49,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
private final AtomicInteger seqNumber = new AtomicInteger(2);
public CoapSessionCtx(CoapExchange exchange, CoapTransportAdaptor adaptor, SessionMsgProcessor processor, DeviceAuthService authService, long timeout) {
- super(processor, authService);
+ super();
Request request = exchange.advanced().getRequest();
this.token = request.getTokenString();
this.sessionId = new CoapSessionId(request.getSource().getHostAddress(), request.getSourcePort(), this.token);
@@ -112,7 +112,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
public void close() {
log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut());
- processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
+// processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
}
@Override
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index d26d076..add89b1 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -73,20 +73,20 @@ public class DeviceApiController {
if (quotaExceeded(httpRequest, responseWriter)) {
return responseWriter;
}
- HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- GetAttributesRequest request;
- if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
- request = new BasicGetAttributesRequest(0);
- } else {
- Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
- Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
- request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
- }
- process(ctx, request);
- } else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- }
+// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+// GetAttributesRequest request;
+// if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
+// request = new BasicGetAttributesRequest(0);
+// } else {
+// Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
+// Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
+// request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
+// }
+// process(ctx, request);
+// } else {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+// }
return responseWriter;
}
@@ -98,16 +98,16 @@ public class DeviceApiController {
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
- HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- try {
- process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
- } catch (IllegalStateException | JsonSyntaxException ex) {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
- }
- } else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- }
+// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+// try {
+// process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
+// } catch (IllegalStateException | JsonSyntaxException ex) {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+// }
+// } else {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+// }
return responseWriter;
}
@@ -119,15 +119,15 @@ public class DeviceApiController {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- try {
- process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
- } catch (IllegalStateException | JsonSyntaxException ex) {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
- }
- } else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- }
+// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+// try {
+// process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
+// } catch (IllegalStateException | JsonSyntaxException ex) {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+// }
+// } else {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+// }
return responseWriter;
}
@@ -147,17 +147,17 @@ public class DeviceApiController {
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
- HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- try {
- JsonObject response = new JsonParser().parse(json).getAsJsonObject();
- process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
- } catch (IllegalStateException | JsonSyntaxException ex) {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
- }
- } else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- }
+// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+// try {
+// JsonObject response = new JsonParser().parse(json).getAsJsonObject();
+// process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
+// } catch (IllegalStateException | JsonSyntaxException ex) {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+// }
+// } else {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+// }
return responseWriter;
}
@@ -169,18 +169,18 @@ public class DeviceApiController {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- try {
- JsonObject request = new JsonParser().parse(json).getAsJsonObject();
- process(ctx, new ToServerRpcRequestMsg(0,
- request.get("method").getAsString(),
- request.get("params").toString()));
- } catch (IllegalStateException | JsonSyntaxException ex) {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
- }
- } else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- }
+// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+// try {
+// JsonObject request = new JsonParser().parse(json).getAsJsonObject();
+// process(ctx, new ToServerRpcRequestMsg(0,
+// request.get("method").getAsString(),
+// request.get("params").toString()));
+// } catch (IllegalStateException | JsonSyntaxException ex) {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+// }
+// } else {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+// }
return responseWriter;
}
@@ -197,16 +197,16 @@ public class DeviceApiController {
if (quotaExceeded(httpRequest, responseWriter)) {
return responseWriter;
}
- HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- try {
- process(ctx, msg);
- } catch (IllegalStateException | JsonSyntaxException ex) {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
- }
- } else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- }
+// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
+// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+// try {
+// process(ctx, msg);
+// } catch (IllegalStateException | JsonSyntaxException ex) {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+// }
+// } else {
+// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+// }
return responseWriter;
}
@@ -220,7 +220,7 @@ public class DeviceApiController {
private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
- processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
+// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index 4732785..e503409 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -43,7 +43,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
private final DeferredResult<ResponseEntity> responseWriter;
public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {
- super(processor, authService);
+ super();
this.sessionId = new HttpSessionId();
this.responseWriter = responseWriter;
this.timeout = timeout;
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index cc40ee0..b500a31 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 8c64e7e..a318b30 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -388,7 +388,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.close();
} else {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
- deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo());
+ deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
checkGatewaySession();
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index a1641f1..f5fd690 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.mqtt.service;
import org.springframework.beans.factory.annotation.Autowired;
@@ -5,6 +20,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
@@ -23,19 +39,19 @@ import java.util.concurrent.Executors;
@Service
public class MqttTransportService implements TransportService {
- @Value("${kafka.rule-engine.topic}")
+ @Value("${kafka.rule_engine.topic}")
private String ruleEngineTopic;
- @Value("${kafka.transport-api.requests-topic}")
+ @Value("${kafka.transport_api.requests_topic}")
private String transportApiRequestsTopic;
- @Value("${kafka.transport-api.responses-topic}")
+ @Value("${kafka.transport_api.responses_topic}")
private String transportApiResponsesTopic;
- @Value("${kafka.transport-api.max_pending_requests}")
+ @Value("${kafka.transport_api.max_pending_requests}")
private long maxPendingRequests;
- @Value("${kafka.transport-api.max_requests_timeout}")
+ @Value("${kafka.transport_api.max_requests_timeout}")
private long maxRequestsTimeout;
- @Value("${kafka.transport-api.response_poll_interval}")
+ @Value("${kafka.transport_api.response_poll_interval}")
private int responsePollDuration;
- @Value("${kafka.transport-api.response_auto_commit_interval}")
+ @Value("${kafka.transport_api.response_auto_commit_interval}")
private int autoCommitInterval;
@Autowired
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
index f931db6..8ae4d17 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.mqtt.service;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
index 22e1647..66e64b5 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.mqtt.service;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
index a2a0d54..707fb4b 100644
--- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -73,13 +73,12 @@ kafka:
batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
linger.ms: "${TB_KAFKA_LINGER_MS:1}"
buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
- transport-api:
- requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
- responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
+ transport_api:
+ requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
+ responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
- # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
- rule-engine:
+ rule_engine:
topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"