thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java 173(+173 -0)
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java 249(+249 -0)
application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java 5(+1 -4)
application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java 121(+17 -104)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 1(+0 -1)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 2(+1 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java
new file mode 100644
index 0000000..5c39f8a
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.common.data.security.DeviceCredentialsType;
+import org.thingsboard.server.dao.device.DeviceCredentialsService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+import org.thingsboard.server.service.executors.DbCallbackExecutorService;
+import org.thingsboard.server.service.state.DeviceStateService;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+@Slf4j
+@Service
+public class LocalTransportApiService implements TransportApiService {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Autowired
+ private DeviceService deviceService;
+
+ @Autowired
+ private RelationService relationService;
+
+ @Autowired
+ private DeviceCredentialsService deviceCredentialsService;
+
+ @Autowired
+ private DeviceStateService deviceStateService;
+
+ @Autowired
+ private DbCallbackExecutorService dbCallbackExecutorService;
+
+ private ReentrantLock deviceCreationLock = new ReentrantLock();
+
+ @Override
+ public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) {
+ if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
+ ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
+ return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
+ } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
+ ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
+ return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
+ } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
+ return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
+ }
+ return getEmptyTransportApiResponseFuture();
+ }
+
+ private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
+ //TODO: Make async and enable caching
+ DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
+ if (credentials != null && credentials.getCredentialsType() == credentialsType) {
+ return getDeviceInfo(credentials.getDeviceId());
+ } else {
+ return getEmptyTransportApiResponseFuture();
+ }
+ }
+
+ private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
+ DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
+ ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
+ return Futures.transform(gatewayFuture, gateway -> {
+ deviceCreationLock.lock();
+ try {
+ Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
+ if (device == null) {
+ device = new Device();
+ device.setTenantId(gateway.getTenantId());
+ device.setName(requestMsg.getDeviceName());
+ device.setType(requestMsg.getDeviceType());
+ device.setCustomerId(gateway.getCustomerId());
+ device = deviceService.saveDevice(device);
+ relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
+ deviceStateService.onDeviceAdded(device);
+ }
+ return TransportApiResponseMsg.newBuilder()
+ .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
+ } catch (JsonProcessingException e) {
+ log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
+ throw new RuntimeException(e);
+ } finally {
+ deviceCreationLock.unlock();
+ }
+ }, dbCallbackExecutorService);
+ }
+
+
+ private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
+ return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
+ if (device == null) {
+ log.trace("[{}] Failed to lookup device by id", deviceId);
+ return getEmptyTransportApiResponse();
+ }
+ try {
+ return TransportApiResponseMsg.newBuilder()
+ .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
+ } catch (JsonProcessingException e) {
+ log.warn("[{}] Failed to lookup device by id", deviceId, e);
+ return getEmptyTransportApiResponse();
+ }
+ });
+ }
+
+ private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
+ return DeviceInfoProto.newBuilder()
+ .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
+ .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
+ .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
+ .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
+ .setDeviceName(device.getName())
+ .setDeviceType(device.getType())
+ .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
+ .build();
+ }
+
+ private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
+ return Futures.immediateFuture(getEmptyTransportApiResponse());
+ }
+
+ private TransportApiResponseMsg getEmptyTransportApiResponse() {
+ return TransportApiResponseMsg.newBuilder()
+ .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
new file mode 100644
index 0000000..ba5cc07
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -0,0 +1,249 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import akka.actor.ActorRef;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.util.DonAsynchron;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.transport.SessionMsgListener;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 12.10.18.
+ */
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local")
+public class LocalTransportService implements TransportService, RuleEngineTransportService {
+
+ private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
+
+ private ExecutorService transportCallbackExecutor;
+
+ @Autowired
+ private TransportApiService transportApiService;
+
+ @Autowired
+ private ActorSystemContext actorContext;
+
+ //TODO: completely replace this routing with the Kafka routing by partition ids.
+ @Autowired
+ private ClusterRoutingService routingService;
+ @Autowired
+ private ClusterRpcService rpcService;
+ @Autowired
+ private DataDecodingEncodingService encodingService;
+
+ @PostConstruct
+ public void init() {
+ this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+ }
+
+ @PreDestroy
+ public void destroy() {
+ if (transportCallbackExecutor != null) {
+ transportCallbackExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ DonAsynchron.withCallback(
+ transportApiService.handle(TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+ transportApiResponseMsg -> {
+ if (callback != null) {
+ callback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
+ }
+ },
+ getThrowableConsumer(callback), transportCallbackExecutor);
+ }
+
+ @Override
+ public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ DonAsynchron.withCallback(
+ transportApiService.handle(TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
+ transportApiResponseMsg -> {
+ if (callback != null) {
+ callback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
+ }
+ },
+ getThrowableConsumer(callback), transportCallbackExecutor);
+ }
+
+ @Override
+ public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
+ DonAsynchron.withCallback(
+ transportApiService.handle(TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
+ transportApiResponseMsg -> {
+ if (callback != null) {
+ callback.onSuccess(transportApiResponseMsg.getGetOrCreateDeviceResponseMsg());
+ }
+ },
+ getThrowableConsumer(callback), transportCallbackExecutor);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
+ }
+
+ @Override
+ public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
+ sessions.putIfAbsent(toId(sessionInfo), listener);
+ //TODO: monitor sessions periodically: PING REQ/RESP, etc.
+ }
+
+ @Override
+ public void deregisterSession(SessionInfoProto sessionInfo) {
+ sessions.remove(toId(sessionInfo));
+ }
+
+ private UUID toId(SessionInfoProto sessionInfo) {
+ return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ }
+
+ @Override
+ public void process(String nodeId, DeviceActorToTransportMsg msg) {
+ process(nodeId, msg, null, null);
+ }
+
+ @Override
+ public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
+ UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
+ SessionMsgListener listener = sessions.get(sessionId);
+ if (listener != null) {
+ transportCallbackExecutor.submit(() -> {
+ if (msg.hasGetAttributesResponse()) {
+ listener.onGetAttributesResponse(msg.getGetAttributesResponse());
+ }
+ if (msg.hasAttributeUpdateNotification()) {
+ listener.onAttributeUpdate(msg.getAttributeUpdateNotification());
+ }
+ if (msg.hasSessionCloseNotification()) {
+ listener.onRemoteSessionCloseCommand(msg.getSessionCloseNotification());
+ }
+ if (msg.hasToDeviceRequest()) {
+ listener.onToDeviceRpcRequest(msg.getToDeviceRequest());
+ }
+ if (msg.hasToServerResponse()) {
+ listener.onToServerRpcResponse(msg.getToServerResponse());
+ }
+ });
+ } else {
+ //TODO: should we notify the device actor about missed session?
+ log.debug("[{}] Missing session.", sessionId);
+ }
+ if (onSuccess != null) {
+ onSuccess.run();
+ }
+ }
+
+ private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
+ TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
+ Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
+ if (address.isPresent()) {
+ rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
+ } else {
+ actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
+ }
+ if (callback != null) {
+ callback.onSuccess(null);
+ }
+ }
+
+ private <T> Consumer<Throwable> getThrowableConsumer(TransportServiceCallback<T> callback) {
+ return e -> {
+ if (callback != null) {
+ callback.onError(e);
+ }
+ };
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index aa36b9b..a0791ad 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -55,7 +55,7 @@ import java.util.function.Consumer;
*/
@Slf4j
@Service
-@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
+@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
private static final ObjectMapper mapper = new ObjectMapper();
@@ -78,9 +78,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
@Autowired
private ActorSystemContext actorContext;
- @Autowired
- private ActorService actorService;
-
//TODO: completely replace this routing with the Kafka routing by partition ids.
@Autowired
private ClusterRoutingService routingService;
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 66a8226..ebd35a4 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -48,20 +49,22 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.state.DeviceStateService;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by ashvayka on 05.10.18.
*/
@Slf4j
-@Service
-@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
-public class RemoteTransportApiService implements TransportApiService {
-
- private static final ObjectMapper mapper = new ObjectMapper();
+@Component
+@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
+public class RemoteTransportApiService {
@Value("${transport.remote.transport_api.requests_topic}")
private String transportApiRequestsTopic;
@@ -83,26 +86,15 @@ public class RemoteTransportApiService implements TransportApiService {
private DiscoveryService discoveryService;
@Autowired
- private DeviceService deviceService;
-
- @Autowired
- private RelationService relationService;
-
- @Autowired
- private DeviceCredentialsService deviceCredentialsService;
-
- @Autowired
- private DeviceStateService deviceStateService;
+ private TransportApiService transportApiService;
private ExecutorService transportCallbackExecutor;
private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
- private ReentrantLock deviceCreationLock = new ReentrantLock();
-
@PostConstruct
public void init() {
- this.transportCallbackExecutor = Executors.newCachedThreadPool();
+ this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
responseBuilder.settings(kafkaSettings);
@@ -126,98 +118,19 @@ public class RemoteTransportApiService implements TransportApiService {
builder.requestTimeout(requestTimeout);
builder.pollInterval(responsePollDuration);
builder.executor(transportCallbackExecutor);
- builder.handler(this);
+ builder.handler(transportApiService);
transportApiTemplate = builder.build();
transportApiTemplate.init();
}
- @Override
- public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
- if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
- ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
- return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
- } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
- ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
- return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
- } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
- return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
+ @PreDestroy
+ public void destroy() {
+ if (transportApiTemplate != null) {
+ transportApiTemplate.stop();
}
- return getEmptyTransportApiResponseFuture();
- }
-
- private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
- //TODO: Make async and enable caching
- DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
- if (credentials != null && credentials.getCredentialsType() == credentialsType) {
- return getDeviceInfo(credentials.getDeviceId());
- } else {
- return getEmptyTransportApiResponseFuture();
+ if (transportCallbackExecutor != null) {
+ transportCallbackExecutor.shutdownNow();
}
}
- private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
- DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
- ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
- return Futures.transform(gatewayFuture, gateway -> {
- deviceCreationLock.lock();
- try {
- Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
- if (device == null) {
- device = new Device();
- device.setTenantId(gateway.getTenantId());
- device.setName(requestMsg.getDeviceName());
- device.setType(requestMsg.getDeviceType());
- device.setCustomerId(gateway.getCustomerId());
- device = deviceService.saveDevice(device);
- relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
- deviceStateService.onDeviceAdded(device);
- }
- return TransportApiResponseMsg.newBuilder()
- .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
- } catch (JsonProcessingException e) {
- log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
- throw new RuntimeException(e);
- } finally {
- deviceCreationLock.unlock();
- }
- }, transportCallbackExecutor);
- }
-
-
- private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
- return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
- if (device == null) {
- log.trace("[{}] Failed to lookup device by id", deviceId);
- return getEmptyTransportApiResponse();
- }
- try {
- return TransportApiResponseMsg.newBuilder()
- .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
- } catch (JsonProcessingException e) {
- log.warn("[{}] Failed to lookup device by id", deviceId, e);
- return getEmptyTransportApiResponse();
- }
- });
- }
-
- private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
- return DeviceInfoProto.newBuilder()
- .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
- .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
- .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
- .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
- .setDeviceName(device.getName())
- .setDeviceType(device.getType())
- .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
- .build();
- }
-
- private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
- return Futures.immediateFuture(getEmptyTransportApiResponse());
- }
-
- private TransportApiResponseMsg getEmptyTransportApiResponse() {
- return TransportApiResponseMsg.newBuilder()
- .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
- }
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 923976c..529ab25 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -452,8 +452,8 @@ js:
max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
transport:
+ type: "${TRANSPORT_TYPE:remote}" # local or remote
remote:
- enabled: "${REMOTE_TRANSPORT_ENABLED:true}"
transport_api:
requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
index a64d28e..9f86155 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
@@ -22,6 +22,6 @@ import com.google.common.util.concurrent.ListenableFuture;
*/
public interface TbKafkaHandler<Request, Response> {
- ListenableFuture<Response> handle(Request request) throws Exception;
+ ListenableFuture<Response> handle(Request request);
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 4742b3b..173fe8a 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -48,7 +48,6 @@ public class MqttTransportContext {
private final ObjectMapper mapper = new ObjectMapper();
@Autowired
- @Lazy
private TransportService transportService;
@Autowired(required = false)
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index b1b176d..f9427a8 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -494,7 +494,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
} else {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(context.getNodeId())
@@ -508,6 +507,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
transportService.registerSession(sessionInfo, this);
checkGatewaySession();
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
}
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index b1504b1..7ef832f 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -87,10 +87,12 @@ public class GatewaySessionHandler {
JsonElement json = getJson(msg);
String deviceName = checkDeviceName(getDeviceName(json));
String deviceType = getDeviceType(json);
+ log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() {
@Override
public void onSuccess(@Nullable GatewayDeviceSessionCtx result) {
ack(msg);
+ log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
}
@Override
diff --git a/transport/mqtt-transport/src/main/resources/logback.xml b/transport/mqtt-transport/src/main/resources/logback.xml
new file mode 100644
index 0000000..18864a9
--- /dev/null
+++ b/transport/mqtt-transport/src/main/resources/logback.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Copyright © 2016-2018 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<!DOCTYPE configuration>
+<configuration scan="true" scanPeriod="10 seconds">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.thingsboard.server" level="TRACE" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
\ No newline at end of file