thingsboard-developers

TMP commit for HTTP transport

10/16/2018 7:20:40 AM

Details

diff --git a/application/pom.xml b/application/pom.xml
index 915a4ab..f8c3219 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -61,18 +61,18 @@
             <artifactId>transport</artifactId>
         </dependency>
         <!--<dependency>-->
-            <!--<groupId>org.thingsboard.transport</groupId>-->
-            <!--<artifactId>http</artifactId>-->
-        <!--</dependency>-->
-        <!--<dependency>-->
-            <!--<groupId>org.thingsboard.transport</groupId>-->
-            <!--<artifactId>coap</artifactId>-->
+        <!--<groupId>org.thingsboard.transport</groupId>-->
+        <!--<artifactId>coap</artifactId>-->
         <!--</dependency>-->
         <dependency>
             <groupId>org.thingsboard.common.transport</groupId>
             <artifactId>mqtt</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.thingsboard.common.transport</groupId>
+            <artifactId>http</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.thingsboard</groupId>
             <artifactId>dao</artifactId>
         </dependency>
@@ -542,7 +542,8 @@
                     <args>
                         <arg>-PprojectBuildDir=${project.build.directory}</arg>
                         <arg>-PprojectVersion=${project.version}</arg>
-                        <arg>-PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging}</arg>
+                        <arg>-PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging}
+                        </arg>
                         <arg>-PpkgName=${pkg.name}</arg>
                         <arg>-PpkgInstallFolder=${pkg.installFolder}</arg>
                         <arg>-PpkgLogFolder=${pkg.unixLogFolder}</arg>
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index ba5cc07..a7590c9 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.transport.SessionMsgListener;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@@ -175,7 +176,7 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
     }
 
     @Override
-    public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
+    public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
         sessions.putIfAbsent(toId(sessionInfo), listener);
         //TODO: monitor sessions periodically: PING REQ/RESP, etc.
     }
diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
new file mode 100644
index 0000000..87db06e
--- /dev/null
+++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.http;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.TransportContext;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+@Slf4j
+@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
+@Component
+public class HttpTransportContext extends TransportContext {
+
+    @Getter
+    @Value("${http.request_timeout}")
+    private long defaultTimeout;
+
+}
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 9ae42e8..16f8981 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.TransportContext;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
 import org.thingsboard.server.kafka.TbNodeIdProvider;
@@ -40,48 +41,21 @@ import java.util.concurrent.Executors;
 @Slf4j
 @ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
 @Component
-@Data
-public class MqttTransportContext {
-
-    private final ObjectMapper mapper = new ObjectMapper();
-
-    @Autowired
-    private TransportService transportService;
+public class MqttTransportContext extends TransportContext {
 
+    @Getter
     @Autowired(required = false)
     private MqttSslHandlerProvider sslHandlerProvider;
 
-    @Autowired(required = false)
-    private HostRequestsQuotaService quotaService;
-
+    @Getter
     @Autowired
     private MqttTransportAdaptor adaptor;
 
-    @Autowired
-    private TbNodeIdProvider nodeIdProvider;
-
+    @Getter
     @Value("${transport.mqtt.netty.max_payload_size}")
     private Integer maxPayloadSize;
 
-
-    private SslHandler sslHandler;
-
     @Getter
-    private ExecutorService executor;
-
-    @PostConstruct
-    public void init() {
-        executor = Executors.newCachedThreadPool();
-    }
-
-    @PreDestroy
-    public void stop() {
-        if (executor != null) {
-            executor.shutdownNow();
-        }
-    }
+    private SslHandler sslHandler;
 
-    public String getNodeId() {
-        return nodeIdProvider.getNodeId();
-    }
 }
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 6a5597b..30cc9c7 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                     .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
                     .build();
             transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
-            transportService.registerSession(sessionInfo, this);
+            transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this);
             checkGatewaySession();
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
         }
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index f233695..43f9978 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -121,7 +121,7 @@ public class GatewaySessionHandler {
                                 transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
                                 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
                                 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
-                                transportService.registerSession(deviceSessionInfo, deviceSessionCtx);
+                                transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx);
                             }
                             future.set(devices.get(deviceName));
                         }
diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index 69f64e6..c9b47e8 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -37,7 +37,7 @@
     <modules>
         <module>transport-api</module>
         <module>mqtt</module>
-        <!--module>http</module-->
+        <module>http</module>
         <!--module>coap</module-->
     </modules>
 
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index ab52d41..eb6dbc5 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -85,7 +85,7 @@ public class RemoteTransportService implements TransportService {
     @Value("${kafka.transport_api.response_auto_commit_interval}")
     private int autoCommitInterval;
 
-    private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
+    private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
 
     @Autowired
     private TbKafkaSettings kafkaSettings;
@@ -159,8 +159,9 @@ public class RemoteTransportService implements TransportService {
                             if (toTransportMsg.hasToDeviceSessionMsg()) {
                                 DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
                                 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
-                                SessionMsgListener listener = sessions.get(sessionId);
-                                if (listener != null) {
+                                SessionMetaData md = sessions.get(sessionId);
+                                if (md != null) {
+                                    SessionMsgListener listener = md.getListener();
                                     transportCallbackExecutor.submit(() -> {
                                         if (toSessionMsg.hasGetAttributesResponse()) {
                                             listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
@@ -178,6 +179,9 @@ public class RemoteTransportService implements TransportService {
                                             listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
                                         }
                                     });
+                                    if (md.getSessionType() == SessionType.SYNC) {
+                                        deregisterSession(md.getSessionInfo());
+                                    }
                                 } else {
                                     //TODO: should we notify the device actor about missed session?
                                     log.debug("[{}] Missing session.", sessionId);
@@ -311,8 +315,8 @@ public class RemoteTransportService implements TransportService {
     }
 
     @Override
-    public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
-        sessions.putIfAbsent(toId(sessionInfo), listener);
+    public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) {
+        sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener));
         //TODO: monitor sessions periodically: PING REQ/RESP, etc.
     }
 
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
new file mode 100644
index 0000000..dd52f77
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
@@ -0,0 +1,17 @@
+package org.thingsboard.server.common.transport.service;
+
+import lombok.Data;
+import org.thingsboard.server.common.transport.SessionMsgListener;
+import org.thingsboard.server.gen.transport.TransportProtos;
+
+/**
+ * Created by ashvayka on 15.10.18.
+ */
+@Data
+public class SessionMetaData {
+
+    private final TransportProtos.SessionInfoProto sessionInfo;
+    private final TransportProtos.SessionType sessionType;
+    private final SessionMsgListener listener;
+
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
new file mode 100644
index 0000000..f846cf0
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
@@ -0,0 +1,53 @@
+package org.thingsboard.server.common.transport;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
+import org.thingsboard.server.kafka.TbNodeIdProvider;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 15.10.18.
+ */
+@Slf4j
+@Data
+public class TransportContext {
+
+    protected final ObjectMapper mapper = new ObjectMapper();
+
+    @Autowired
+    private TransportService transportService;
+
+    @Autowired
+    private TbNodeIdProvider nodeIdProvider;
+
+    @Autowired(required = false)
+    private HostRequestsQuotaService quotaService;
+
+    @Getter
+    private ExecutorService executor;
+
+    @PostConstruct
+    public void init() {
+        executor = Executors.newCachedThreadPool();
+    }
+
+    @PreDestroy
+    public void stop() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+
+    public String getNodeId() {
+        return nodeIdProvider.getNodeId();
+    }
+
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index c9a04c4..9bbc40c 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -59,7 +59,7 @@ public interface TransportService {
 
     void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
 
-    void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
+    void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener);
 
     void deregisterSession(SessionInfoProto sessionInfo);
 

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index e999a4c..f4c2957 100755
--- a/pom.xml
+++ b/pom.xml
@@ -365,7 +365,7 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
-                <groupId>org.thingsboard.transport</groupId>
+                <groupId>org.thingsboard.common.transport</groupId>
                 <artifactId>http</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/transport/pom.xml b/transport/pom.xml
index 9a7c2e4..96a5a08 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -34,7 +34,6 @@
     </properties>
 
     <modules>
-        <module>http</module>
         <!--<module>coap</module>-->
         <module>mqtt-transport</module>
     </modules>