MqttSessionCtx.java
Home
/
transport /
mqtt /
src /
main /
java /
org /
thingsboard /
server /
transport /
mqtt /
session /
MqttSessionCtx.java
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ex.SessionException;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class MqttSessionCtx extends DeviceAwareSessionContext {
private final MqttTransportAdaptor adaptor;
private final MqttSessionId sessionId;
private ChannelHandlerContext channel;
private volatile boolean allowAttributeResponses;
private AtomicInteger msgIdSeq = new AtomicInteger(0);
public MqttSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor) {
super(processor, authService);
this.adaptor = adaptor;
this.sessionId = new MqttSessionId();
}
@Override
public SessionType getSessionType() {
return SessionType.ASYNC;
}
@Override
public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException {
try {
adaptor.convertToAdaptorMsg(this, msg).ifPresent(this::pushToNetwork);
} catch (AdaptorException e) {
//TODO: close channel with disconnect;
logAndWrap(e);
}
}
private void logAndWrap(AdaptorException e) throws SessionException {
log.warn("Failed to convert msg: {}", e.getMessage(), e);
throw new SessionException(e);
}
private void pushToNetwork(MqttMessage msg) {
channel.writeAndFlush(msg);
}
@Override
public void onMsg(SessionCtrlMsg msg) throws SessionException {
}
@Override
public void onError(SessionException e) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public long getTimeout() {
return 0;
}
@Override
public SessionId getSessionId() {
return sessionId;
}
public void setChannel(ChannelHandlerContext channel) {
this.channel = channel;
}
public void setAllowAttributeResponses() {
allowAttributeResponses = true;
}
public void setDisallowAttributeResponses() {
allowAttributeResponses = false;
}
public int nextMsgId() {
return msgIdSeq.incrementAndGet();
}
}