JsonCoapAdaptor.java

262 lines | 11.376 kB Blame History Raw Download
/**
 * Copyright © 2016 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.transport.coap.adaptors;

import java.util.*;

import com.google.gson.JsonElement;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP.ResponseCode;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.springframework.stereotype.Component;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import org.thingsboard.server.transport.coap.CoapTransportResource;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;

@Component("JsonCoapAdaptor")
@Slf4j
public class JsonCoapAdaptor implements CoapTransportAdaptor {

    @Override
    public AdaptorToSessionActorMsg convertToActorMsg(CoapSessionCtx ctx, MsgType type, Request inbound) throws AdaptorException {
        FromDeviceMsg msg = null;
        switch (type) {
            case POST_TELEMETRY_REQUEST:
                msg = convertToTelemetryUploadRequest(ctx, inbound);
                break;
            case POST_ATTRIBUTES_REQUEST:
                msg = convertToUpdateAttributesRequest(ctx, inbound);
                break;
            case GET_ATTRIBUTES_REQUEST:
                msg = convertToGetAttributesRequest(ctx, inbound);
                break;
            case SUBSCRIBE_RPC_COMMANDS_REQUEST:
                msg = new RpcSubscribeMsg();
                break;
            case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
                msg = new RpcUnsubscribeMsg();
                break;
            case TO_DEVICE_RPC_RESPONSE:
                msg = convertToDeviceRpcResponse(ctx, inbound);
                break;
            case TO_SERVER_RPC_REQUEST:
                msg = convertToServerRpcRequest(ctx, inbound);
                break;
            case SUBSCRIBE_ATTRIBUTES_REQUEST:
                msg = new AttributesSubscribeMsg();
                break;
            case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
                msg = new AttributesUnsubscribeMsg();
                break;
            default:
                log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type);
                throw new AdaptorException(new IllegalArgumentException("Unsupported msg type: " + type + "!"));
        }
        return new BasicAdaptorToSessionActorMsg(ctx, msg);
    }

    private FromDeviceMsg convertToDeviceRpcResponse(CoapSessionCtx ctx, Request inbound) throws AdaptorException {
        Optional<Integer> requestId = CoapTransportResource.getRequestId(inbound);
        String payload = validatePayload(ctx, inbound);
        JsonObject response = new JsonParser().parse(payload).getAsJsonObject();
        return new ToDeviceRpcResponseMsg(
                requestId.orElseThrow(() -> new AdaptorException("Request id is missing!")),
                response.get("response").toString());
    }

    private FromDeviceMsg convertToServerRpcRequest(CoapSessionCtx ctx, Request inbound) throws AdaptorException {

        String payload = validatePayload(ctx, inbound);

        return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
    }

    @Override
    public Optional<Response> convertToAdaptorMsg(CoapSessionCtx ctx, SessionActorToAdaptorMsg source) throws AdaptorException {
        ToDeviceMsg msg = source.getMsg();
        switch (msg.getMsgType()) {
            case STATUS_CODE_RESPONSE:
            case TO_DEVICE_RPC_RESPONSE_ACK:
                return Optional.of(convertStatusCodeResponse((StatusCodeResponse) msg));
            case GET_ATTRIBUTES_RESPONSE:
                return Optional.of(convertGetAttributesResponse((GetAttributesResponse) msg));
            case ATTRIBUTES_UPDATE_NOTIFICATION:
                return Optional.of(convertNotificationResponse(ctx, (AttributesUpdateNotification) msg));
            case TO_DEVICE_RPC_REQUEST:
                return Optional.of(convertToDeviceRpcRequest(ctx, (ToDeviceRpcRequestMsg) msg));
            case TO_SERVER_RPC_RESPONSE:
                return Optional.of(convertToServerRpcResponse(ctx, (ToServerRpcResponseMsg) msg));
            case RULE_ENGINE_ERROR:
                return Optional.of(convertToRuleEngineErrorResponse(ctx, (RuleEngineErrorMsg) msg));
            default:
                log.warn("[{}] Unsupported msg type: {}!", source.getSessionId(), msg.getMsgType());
                throw new AdaptorException(new IllegalArgumentException("Unsupported msg type: " + msg.getMsgType() + "!"));
        }
    }

    private Response convertToRuleEngineErrorResponse(CoapSessionCtx ctx, RuleEngineErrorMsg msg) {
        ResponseCode status = ResponseCode.INTERNAL_SERVER_ERROR;
        switch (msg.getError()) {
            case PLUGIN_TIMEOUT:
                status = ResponseCode.GATEWAY_TIMEOUT;
                break;
            default:
                if (msg.getInMsgType() == MsgType.TO_SERVER_RPC_REQUEST) {
                    status = ResponseCode.BAD_REQUEST;
                }
                break;
        }
        Response response = new Response(status);
        response.setPayload(JsonConverter.toErrorJson(msg.getErrorMsg()).toString());
        return response;
    }

    private Response convertNotificationResponse(CoapSessionCtx ctx, AttributesUpdateNotification msg) {
        return getObserveNotification(ctx, JsonConverter.toJson(msg.getData(), false));
    }

    private Response convertToDeviceRpcRequest(CoapSessionCtx ctx, ToDeviceRpcRequestMsg msg) {
        return getObserveNotification(ctx, JsonConverter.toJson(msg, true));
    }

    private Response getObserveNotification(CoapSessionCtx ctx, JsonObject json) {
        Response response = new Response(ResponseCode.CONTENT);
        response.getOptions().setObserve(ctx.nextSeqNumber());
        response.setPayload(json.toString());
        return response;
    }

    private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
        String payload = validatePayload(ctx, inbound);
        try {
            return JsonConverter.convertToAttributes(new JsonParser().parse(payload));
        } catch (IllegalStateException | JsonSyntaxException ex) {
            throw new AdaptorException(ex);
        }
    }

    private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
        List<String> queryElements = inbound.getOptions().getUriQuery();
        if (queryElements != null || queryElements.size() > 0) {
            Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");
            Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");
            return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);
        } else {
            return new BasicGetAttributesRequest(0);
        }
    }

    private Set<String> toKeys(SessionContext ctx, List<String> queryElements, String attributeName) throws AdaptorException {
        String keys = null;
        for (String queryElement : queryElements) {
            String[] queryItem = queryElement.split("=");
            if (queryItem.length == 2 && queryItem[0].equals(attributeName)) {
                keys = queryItem[1];
            }
        }
        if (!StringUtils.isEmpty(keys)) {
            return new HashSet<>(Arrays.asList(keys.split(",")));
        } else {
            return null;
        }
    }

    private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, Request inbound) throws AdaptorException {
        String payload = validatePayload(ctx, inbound);
        try {
            return JsonConverter.convertToTelemetry(new JsonParser().parse(payload));
        } catch (IllegalStateException | JsonSyntaxException ex) {
            throw new AdaptorException(ex);
        }
    }

    private Response convertStatusCodeResponse(StatusCodeResponse msg) {
        if (msg.isSuccess()) {
            Integer code = msg.getData().get();
            if (code == 200) {
                return new Response(ResponseCode.VALID);
            } else {
                return new Response(ResponseCode.CREATED);
            }
        } else {
            return convertError(msg.getError().get());
        }
    }

    private String validatePayload(SessionContext ctx, Request inbound) throws AdaptorException {
        String payload = inbound.getPayloadString();
        if (payload == null) {
            log.warn("[{}] Payload is empty!", ctx.getSessionId());
            throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
        }
        return payload;
    }

    private Response convertToServerRpcResponse(SessionContext ctx, ToServerRpcResponseMsg msg) {
        if (msg.isSuccess()) {
            Response response = new Response(ResponseCode.CONTENT);
            JsonElement result = JsonConverter.toJson(msg);
            response.setPayload(result.toString());
            return response;
        } else {
            return convertError(new RuntimeException("Server RPC response is empty!"));
        }
    }

    private Response convertGetAttributesResponse(GetAttributesResponse msg) {
        if (msg.isSuccess()) {
            AttributesKVMsg payload = msg.getData().get();
            if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) {
                return new Response(ResponseCode.NOT_FOUND);
            } else {
                Response response = new Response(ResponseCode.CONTENT);
                JsonObject result = JsonConverter.toJson(payload, false);
                response.setPayload(result.toString());
                return response;
            }
        } else {
            return convertError(msg.getError().get());
        }
    }

    private Response convertError(Exception exception) {
        log.warn("Converting exception: {}", exception.getMessage(), exception);
        if (exception instanceof ProcessingTimeoutException) {
            return new Response(ResponseCode.SERVICE_UNAVAILABLE);
        } else {
            return new Response(ResponseCode.INTERNAL_SERVER_ERROR);
        }
    }

}