thingsboard-developers
Changes
application/pom.xml 15(+8 -7)
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java 3(+2 -1)
common/transport/http/pom.xml 10(+5 -5)
common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 174(+135 -39)
common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java 37(+37 -0)
common/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java 0(+0 -0)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 38(+6 -32)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 2(+1 -1)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java 2(+1 -1)
common/transport/pom.xml 2(+1 -1)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java 22(+13 -9)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java 17(+17 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java 53(+53 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java 2(+1 -1)
pom.xml 2(+1 -1)
transport/pom.xml 1(+0 -1)
Details
application/pom.xml 15(+8 -7)
diff --git a/application/pom.xml b/application/pom.xml
index 915a4ab..f8c3219 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -61,18 +61,18 @@
<artifactId>transport</artifactId>
</dependency>
<!--<dependency>-->
- <!--<groupId>org.thingsboard.transport</groupId>-->
- <!--<artifactId>http</artifactId>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.thingsboard.transport</groupId>-->
- <!--<artifactId>coap</artifactId>-->
+ <!--<groupId>org.thingsboard.transport</groupId>-->
+ <!--<artifactId>coap</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>mqtt</artifactId>
</dependency>
<dependency>
+ <groupId>org.thingsboard.common.transport</groupId>
+ <artifactId>http</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard</groupId>
<artifactId>dao</artifactId>
</dependency>
@@ -542,7 +542,8 @@
<args>
<arg>-PprojectBuildDir=${project.build.directory}</arg>
<arg>-PprojectVersion=${project.version}</arg>
- <arg>-PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging}</arg>
+ <arg>-PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging}
+ </arg>
<arg>-PpkgName=${pkg.name}</arg>
<arg>-PpkgInstallFolder=${pkg.installFolder}</arg>
<arg>-PpkgLogFolder=${pkg.unixLogFolder}</arg>
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index ba5cc07..a7590c9 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@@ -175,7 +176,7 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
}
@Override
- public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
+ public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
}
diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
new file mode 100644
index 0000000..87db06e
--- /dev/null
+++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.http;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.TransportContext;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+@Slf4j
+@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
+@Component
+public class HttpTransportContext extends TransportContext {
+
+ @Getter
+ @Value("${http.request_timeout}")
+ private long defaultTimeout;
+
+}
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 9ae42e8..16f8981 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.kafka.TbNodeIdProvider;
@@ -40,48 +41,21 @@ import java.util.concurrent.Executors;
@Slf4j
@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
@Component
-@Data
-public class MqttTransportContext {
-
- private final ObjectMapper mapper = new ObjectMapper();
-
- @Autowired
- private TransportService transportService;
+public class MqttTransportContext extends TransportContext {
+ @Getter
@Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
- @Autowired(required = false)
- private HostRequestsQuotaService quotaService;
-
+ @Getter
@Autowired
private MqttTransportAdaptor adaptor;
- @Autowired
- private TbNodeIdProvider nodeIdProvider;
-
+ @Getter
@Value("${transport.mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;
-
- private SslHandler sslHandler;
-
@Getter
- private ExecutorService executor;
-
- @PostConstruct
- public void init() {
- executor = Executors.newCachedThreadPool();
- }
-
- @PreDestroy
- public void stop() {
- if (executor != null) {
- executor.shutdownNow();
- }
- }
+ private SslHandler sslHandler;
- public String getNodeId() {
- return nodeIdProvider.getNodeId();
- }
}
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 6a5597b..30cc9c7 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
.build();
transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
- transportService.registerSession(sessionInfo, this);
+ transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this);
checkGatewaySession();
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
}
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index f233695..43f9978 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -121,7 +121,7 @@ public class GatewaySessionHandler {
transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
- transportService.registerSession(deviceSessionInfo, deviceSessionCtx);
+ transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx);
}
future.set(devices.get(deviceName));
}
common/transport/pom.xml 2(+1 -1)
diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index 69f64e6..c9b47e8 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -37,7 +37,7 @@
<modules>
<module>transport-api</module>
<module>mqtt</module>
- <!--module>http</module-->
+ <module>http</module>
<!--module>coap</module-->
</modules>
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index ab52d41..eb6dbc5 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -85,7 +85,7 @@ public class RemoteTransportService implements TransportService {
@Value("${kafka.transport_api.response_auto_commit_interval}")
private int autoCommitInterval;
- private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
+ private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
@Autowired
private TbKafkaSettings kafkaSettings;
@@ -159,8 +159,9 @@ public class RemoteTransportService implements TransportService {
if (toTransportMsg.hasToDeviceSessionMsg()) {
DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
- SessionMsgListener listener = sessions.get(sessionId);
- if (listener != null) {
+ SessionMetaData md = sessions.get(sessionId);
+ if (md != null) {
+ SessionMsgListener listener = md.getListener();
transportCallbackExecutor.submit(() -> {
if (toSessionMsg.hasGetAttributesResponse()) {
listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
@@ -178,6 +179,9 @@ public class RemoteTransportService implements TransportService {
listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
}
});
+ if (md.getSessionType() == SessionType.SYNC) {
+ deregisterSession(md.getSessionInfo());
+ }
} else {
//TODO: should we notify the device actor about missed session?
log.debug("[{}] Missing session.", sessionId);
@@ -311,8 +315,8 @@ public class RemoteTransportService implements TransportService {
}
@Override
- public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
- sessions.putIfAbsent(toId(sessionInfo), listener);
+ public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) {
+ sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener));
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
new file mode 100644
index 0000000..dd52f77
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
@@ -0,0 +1,17 @@
+package org.thingsboard.server.common.transport.service;
+
+import lombok.Data;
+import org.thingsboard.server.common.transport.SessionMsgListener;
+import org.thingsboard.server.gen.transport.TransportProtos;
+
+/**
+ * Created by ashvayka on 15.10.18.
+ */
+@Data
+public class SessionMetaData {
+
+ private final TransportProtos.SessionInfoProto sessionInfo;
+ private final TransportProtos.SessionType sessionType;
+ private final SessionMsgListener listener;
+
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
new file mode 100644
index 0000000..f846cf0
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
@@ -0,0 +1,53 @@
+package org.thingsboard.server.common.transport;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
+import org.thingsboard.server.kafka.TbNodeIdProvider;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 15.10.18.
+ */
+@Slf4j
+@Data
+public class TransportContext {
+
+ protected final ObjectMapper mapper = new ObjectMapper();
+
+ @Autowired
+ private TransportService transportService;
+
+ @Autowired
+ private TbNodeIdProvider nodeIdProvider;
+
+ @Autowired(required = false)
+ private HostRequestsQuotaService quotaService;
+
+ @Getter
+ private ExecutorService executor;
+
+ @PostConstruct
+ public void init() {
+ executor = Executors.newCachedThreadPool();
+ }
+
+ @PreDestroy
+ public void stop() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
+ public String getNodeId() {
+ return nodeIdProvider.getNodeId();
+ }
+
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index c9a04c4..9bbc40c 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -59,7 +59,7 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
- void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
+ void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener);
void deregisterSession(SessionInfoProto sessionInfo);
pom.xml 2(+1 -1)
diff --git a/pom.xml b/pom.xml
index e999a4c..f4c2957 100755
--- a/pom.xml
+++ b/pom.xml
@@ -365,7 +365,7 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.thingsboard.transport</groupId>
+ <groupId>org.thingsboard.common.transport</groupId>
<artifactId>http</artifactId>
<version>${project.version}</version>
</dependency>
transport/pom.xml 1(+0 -1)
diff --git a/transport/pom.xml b/transport/pom.xml
index 9a7c2e4..96a5a08 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -34,7 +34,6 @@
</properties>
<modules>
- <module>http</module>
<!--<module>coap</module>-->
<module>mqtt-transport</module>
</modules>