thingsboard-memoizeit

MQTT Gateway API Implementation

10/12/2018 5:18:41 AM

Details

diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index e02bb4b..1fcd607 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -37,11 +37,16 @@ import org.thingsboard.server.common.msg.core.BasicRequest;
 import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
-import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
 import org.thingsboard.server.gen.transport.TransportProtos;
-import org.thingsboard.server.gen.transport.TransportProtos.*;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
+import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -446,4 +451,10 @@ public class JsonConverter {
         return error;
     }
 
+    public static JsonElement toGatewayJson(String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
+        JsonObject result = new JsonObject();
+        result.addProperty(DEVICE_PROPERTY, deviceName);
+        result.add("data", JsonConverter.toJson(rpcRequest, true));
+        return result;
+    }
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index 85dc813..4bde5a3 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -23,27 +23,23 @@ import com.google.gson.JsonSyntaxException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.msg.core.*;
-import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
-import org.thingsboard.server.common.msg.session.*;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.transport.mqtt.MqttTopics;
-import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
-import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext;
 
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -169,45 +165,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     }
 
     @Override
-    public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
-        return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
+    public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
+        return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, JsonConverter.toGatewayJson(deviceName, rpcRequest)));
     }
 
-    private MqttMessage convertResponseMsg(MqttDeviceAwareSessionContext ctx, ToDeviceMsg msg,
-                                           ResponseMsg<?> responseMsg, Optional<Exception> responseError) throws AdaptorException {
-        MqttMessage result = null;
-        SessionMsgType requestMsgType = responseMsg.getRequestMsgType();
-        Integer requestId = responseMsg.getRequestId();
-        if (requestId >= 0) {
-            if (requestMsgType == SessionMsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == SessionMsgType.POST_TELEMETRY_REQUEST) {
-                result = MqttTransportHandler.createMqttPubAckMsg(requestId);
-            } else if (requestMsgType == SessionMsgType.GET_ATTRIBUTES_REQUEST) {
-                GetAttributesResponse response = (GetAttributesResponse) msg;
-                Optional<AttributesKVMsg> responseData = response.getData();
-                if (response.isSuccess() && responseData.isPresent()) {
-                    result = createMqttPublishMsg(ctx,
-                            MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
-                            responseData.get(), true);
-                } else {
-                    if (responseError.isPresent()) {
-                        throw new AdaptorException(responseError.get());
-                    }
-                }
-            }
-        }
-        return result;
-    }
-
-    private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, AttributesKVMsg msg, boolean asMap) {
-        return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap));
-    }
-
-    private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, ToDeviceRpcRequestMsg msg) {
-        return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
-    }
-
-    private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) {
-        return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
+    @Override
+    public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
+        return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
     }
 
     private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, JsonElement json) {
@@ -219,39 +183,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         return new MqttPublishMessage(mqttFixedHeader, header, payload);
     }
 
-    private FromDeviceMsg convertToGetAttributesRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String topicName = inbound.variableHeader().topicName();
-        try {
-            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
-            String payload = inbound.payload().toString(UTF8);
-            JsonElement requestBody = new JsonParser().parse(payload);
-            Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
-            Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
-            if (clientKeys == null && sharedKeys == null) {
-                return new BasicGetAttributesRequest(requestId);
-            } else {
-                return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);
-            }
-        } catch (RuntimeException e) {
-            log.warn("Failed to decode get attributes request", e);
-            throw new AdaptorException(e);
-        }
-    }
-
-    private FromDeviceMsg convertToRpcCommandResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String topicName = inbound.variableHeader().topicName();
-        try {
-            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
-            String payload = inbound.payload().toString(UTF8);
-            return new ToDeviceRpcResponseMsg(
-                    requestId,
-                    payload);
-        } catch (RuntimeException e) {
-            log.warn("Failed to decode get attributes request", e);
-            throw new AdaptorException(e);
-        }
-    }
-
     private Set<String> toStringSet(JsonElement requestBody, String name) {
         JsonElement element = requestBody.getAsJsonObject().get(name);
         if (element != null) {
@@ -261,24 +192,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         }
     }
 
-    private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
-        try {
-            return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId());
-        } catch (IllegalStateException | JsonSyntaxException ex) {
-            throw new AdaptorException(ex);
-        }
-    }
-
-    private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
-        try {
-            return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId());
-        } catch (IllegalStateException | JsonSyntaxException ex) {
-            throw new AdaptorException(ex);
-        }
-    }
-
     public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
         String payload = validatePayload(sessionId, payloadData);
         try {
@@ -288,7 +201,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         }
     }
 
-    public static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
+    private static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
         try {
             String payload = payloadData.toString(UTF8);
             if (payload == null) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 37b5395..a653e4c 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.adaptors;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
@@ -56,5 +57,7 @@ public interface MqttTransportAdaptor {
 
     Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
 
+    Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
+
     Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException;
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 21d3335..1f1288c 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -85,7 +85,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
 @Slf4j
 public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
 
-    public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
+    private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
 
     private final UUID sessionId;
     private final MqttTransportContext context;
@@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private volatile DeviceSessionCtx deviceSessionCtx;
     private volatile GatewaySessionCtx gatewaySessionCtx;
 
-    public MqttTransportHandler(MqttTransportContext context) {
+    MqttTransportHandler(MqttTransportContext context) {
         this.sessionId = UUID.randomUUID();
         this.context = context;
         this.transportService = context.getTransportService();
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
index 387d9cf..32d9e55 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -15,43 +15,14 @@
  */
 package org.thingsboard.server.transport.mqtt.session;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.sun.xml.internal.bind.v2.TODO;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.handler.codec.mqtt.MqttFixedHeader;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import lombok.extern.slf4j.Slf4j;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
-import org.thingsboard.server.common.msg.core.GetAttributesResponse;
-import org.thingsboard.server.common.msg.core.ResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
-import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
-import org.thingsboard.server.common.msg.session.SessionMsgType;
-import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 import org.thingsboard.server.common.transport.SessionMsgListener;
-import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
-import org.thingsboard.server.transport.mqtt.MqttTopics;
-import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Created by ashvayka on 19.01.17.
@@ -59,15 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Slf4j
 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
 
-    private static final Gson GSON = new Gson();
-    private static final Charset UTF8 = Charset.forName("UTF-8");
-    private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
-
     private final GatewaySessionCtx parent;
     private final UUID sessionId;
     private final SessionInfoProto sessionInfo;
-    private volatile boolean closed;
-    private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
     public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) {
         super(mqttQoSMap);
@@ -90,109 +55,9 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
         return sessionId;
     }
 
-    private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) {
-        ToDeviceMsg msg = sessionMsg.getMsg();
-        switch (msg.getSessionMsgType()) {
-            case STATUS_CODE_RESPONSE:
-                ResponseMsg<?> responseMsg = (ResponseMsg) msg;
-                if (responseMsg.isSuccess()) {
-                    SessionMsgType requestMsgType = responseMsg.getRequestMsgType();
-                    Integer requestId = responseMsg.getRequestId();
-                    if (requestId >= 0 && (requestMsgType == SessionMsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == SessionMsgType.POST_TELEMETRY_REQUEST)) {
-                        return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId));
-                    }
-                }
-                break;
-            case GET_ATTRIBUTES_RESPONSE:
-                GetAttributesResponse response = (GetAttributesResponse) msg;
-                if (response.isSuccess()) {
-                    return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response));
-                } else {
-                    //TODO: push error handling to the gateway
-                }
-                break;
-            case ATTRIBUTES_UPDATE_NOTIFICATION:
-                AttributesUpdateNotification notification = (AttributesUpdateNotification) msg;
-                return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData()));
-            case TO_DEVICE_RPC_REQUEST:
-                ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
-                return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest));
-            default:
-                break;
-        }
-        return Optional.empty();
-    }
-
-    private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) {
-        JsonObject result = new JsonObject();
-        result.addProperty("id", response.getRequestId());
-//        result.addProperty(DEVICE_PROPERTY, device.getName());
-        Optional<AttributesKVMsg> responseData = response.getData();
-        if (responseData.isPresent()) {
-            AttributesKVMsg msg = responseData.get();
-            if (msg.getClientAttributes() != null) {
-                addValues(result, msg.getClientAttributes());
-            }
-            if (msg.getSharedAttributes() != null) {
-                addValues(result, msg.getSharedAttributes());
-            }
-        }
-        return createMqttPublishMsg(topic, result);
-    }
-
-    private void addValues(JsonObject result, List<AttributeKvEntry> kvList) {
-        if (kvList.size() == 1) {
-            addValueToJson(result, "value", kvList.get(0));
-        } else {
-            JsonObject values;
-            if (result.has("values")) {
-                values = result.get("values").getAsJsonObject();
-            } else {
-                values = new JsonObject();
-                result.add("values", values);
-            }
-            kvList.forEach(value -> addValueToJson(values, value.getKey(), value));
-        }
-    }
-
-    private void addValueToJson(JsonObject json, String name, KvEntry entry) {
-        switch (entry.getDataType()) {
-            case BOOLEAN:
-                entry.getBooleanValue().ifPresent(aBoolean -> json.addProperty(name, aBoolean));
-                break;
-            case STRING:
-                entry.getStrValue().ifPresent(aString -> json.addProperty(name, aString));
-                break;
-            case DOUBLE:
-                entry.getDoubleValue().ifPresent(aDouble -> json.addProperty(name, aDouble));
-                break;
-            case LONG:
-                entry.getLongValue().ifPresent(aLong -> json.addProperty(name, aLong));
-                break;
-        }
-    }
-
-    private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) {
-        JsonObject result = new JsonObject();
-//        result.addProperty(DEVICE_PROPERTY, device.getName());
-        result.add("data", JsonConverter.toJson(data, false));
-        return createMqttPublishMsg(topic, result);
-    }
-
-    private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) {
-        JsonObject result = new JsonObject();
-//        result.addProperty(DEVICE_PROPERTY, device.getName());
-        result.add("data", JsonConverter.toJson(data, true));
-        return createMqttPublishMsg(topic, result);
-    }
-
-    private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) {
-        MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0);
-        MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet());
-        ByteBuf payload = ALLOCATOR.buffer();
-        payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
-        return new MqttPublishMessage(mqttFixedHeader, header, payload);
+    @Override
+    public int nextMsgId() {
+        return parent.nextMsgId();
     }
 
     SessionInfoProto getSessionInfo() {
@@ -223,12 +88,16 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
     }
 
     @Override
-    public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRequest) {
-
+    public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) {
+        try {
+            parent.getAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush);
+        } catch (Exception e) {
+            log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
+        }
     }
 
     @Override
     public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
-        TODO
+        // This feature is not supported in the TB IoT Gateway yet.
     }
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 3788d57..e038a4d 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -52,6 +52,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Created by ashvayka on 19.01.17.
@@ -70,10 +71,12 @@ public class GatewaySessionCtx {
     private final Map<String, GatewayDeviceSessionCtx> devices;
     private final ConcurrentMap<String, Integer> mqttQoSMap;
     private final ChannelHandlerContext channel;
+    private final DeviceSessionCtx deviceSessionCtx;
 
     public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
         this.context = context;
         this.transportService = context.getTransportService();
+        this.deviceSessionCtx = deviceSessionCtx;
         this.gateway = deviceSessionCtx.getDeviceInfo();
         this.sessionId = sessionId;
         this.devices = new ConcurrentHashMap<>();
@@ -357,4 +360,8 @@ public class GatewaySessionCtx {
     public MqttTransportAdaptor getAdaptor() {
         return context.getAdaptor();
     }
+
+    public int nextMsgId() {
+        return deviceSessionCtx.nextMsgId();
+    }
 }