thingsboard-memoizeit
Changes
.travis.yml 1(+1 -0)
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 108(+60 -48)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 42(+33 -9)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 13(+9 -4)
application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java 10(+5 -5)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 76(+65 -11)
application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java 32(+32 -0)
application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java 8(+4 -4)
application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java 21(+14 -7)
application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 50(+21 -29)
application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java 19(+15 -4)
application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java 7(+7 -0)
common/data/src/main/java/org/thingsboard/server/common/data/objects/AttributesEntityView.java 3(+2 -1)
common/data/src/main/java/org/thingsboard/server/common/data/objects/TelemetryEntityView.java 3(+2 -1)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 18(+10 -8)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java 2(+1 -1)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java 61(+45 -16)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java 31(+31 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java 32(+21 -11)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java 6(+6 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java 1(+0 -1)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java 2(+1 -1)
dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java 22(+21 -1)
docker/.env 2(+1 -1)
docker/.gitignore 1(+1 -0)
docker/docker-compose.postgres.volumes.yml 60(+60 -0)
docker/docker-compose.yml 31(+28 -3)
docker/docker-install-tb.sh 2(+0 -2)
docker/docker-start-services.sh 2(+0 -2)
docker/docker-upgrade-tb.sh 2(+0 -2)
docker/haproxy/config/haproxy.cfg 20(+14 -6)
docker/kafka.env 2(+1 -1)
docker/tb-node/conf/logback.xml 2(+1 -1)
docker/tb-node/conf/thingsboard.conf 2(+1 -1)
docker/tb-web-ui.env 3(+1 -2)
msa/black-box-tests/pom.xml 110(+110 -0)
msa/black-box-tests/README.md 23(+23 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java 206(+206 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java 57(+57 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java 400(+400 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/DockerComposeExecutor.java 119(+119 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java 26(+26 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java 40(+40 -0)
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java 132(+132 -0)
msa/js-executor/server.js 18(+18 -0)
msa/pom.xml 3(+2 -1)
msa/transport/coap/docker/Dockerfile 5(+1 -4)
msa/transport/http/docker/Dockerfile 5(+1 -4)
msa/transport/mqtt/docker/Dockerfile 5(+1 -4)
msa/web-ui/build.gradle 2(+1 -1)
msa/web-ui/config/default.yml 1(+1 -0)
msa/web-ui/server.js 77(+43 -34)
Details
.travis.yml 1(+1 -0)
diff --git a/.travis.yml b/.travis.yml
index d3591cf..e8d8847 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,6 +2,7 @@ before_install:
- sudo rm -f /etc/mavenrc
- export M2_HOME=/usr/local/maven
- export MAVEN_OPTS="-Dmaven.repo.local=$HOME/.m2/repository -Xms1024m -Xmx3072m"
+ - export HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false
jdk:
- oraclejdk8
language: java
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 091b504..a59e85c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -61,6 +61,7 @@ import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
@@ -188,6 +189,10 @@ public class ActorSystemContext {
@Autowired
@Getter
+ private ClusterRpcCallbackExecutorService clusterRpcCallbackExecutor;
+
+ @Autowired
+ @Getter
private DbCallbackExecutorService dbCallbackExecutor;
@Autowired
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index f4373db..9f27641 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -25,15 +25,21 @@ import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager;
import org.thingsboard.server.actors.tenant.TenantActor;
+import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
@@ -50,16 +56,15 @@ import java.util.Optional;
public class AppActor extends RuleChainManagerActor {
- private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
- public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
+ private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
private final TenantService tenantService;
- private final Map<TenantId, ActorRef> tenantActors;
+ private final BiMap<TenantId, ActorRef> tenantActors;
+ private boolean ruleChainsInitialized;
private AppActor(ActorSystemContext systemContext) {
super(systemContext, new SystemRuleChainManager(systemContext));
this.tenantService = systemContext.getTenantService();
- this.tenantActors = new HashMap<>();
+ this.tenantActors = HashBiMap.create();
}
@Override
@@ -69,28 +74,20 @@ public class AppActor extends RuleChainManagerActor {
@Override
public void preStart() {
- logger.info("Starting main system actor.");
- try {
- initRuleChains();
-
- if (systemContext.isTenantComponentsInitEnabled()) {
- PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
- for (Tenant tenant : tenantIterator) {
- logger.debug("[{}] Creating tenant actor", tenant.getId());
- getOrCreateTenantActor(tenant.getId());
- logger.debug("Tenant actor created.");
- }
- }
-
- logger.info("Main system actor started.");
- } catch (Exception e) {
- logger.error(e, "Unknown failure");
- }
}
@Override
protected boolean process(TbActorMsg msg) {
+ if (!ruleChainsInitialized) {
+ initRuleChainsAndTenantActors();
+ ruleChainsInitialized = true;
+ if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
+ log.warn("Rule Chains initialized by unexpected message: {}", msg);
+ }
+ }
switch (msg.getMsgType()) {
+ case APP_INIT_MSG:
+ break;
case SEND_TO_CLUSTER_MSG:
onPossibleClusterMsg((SendToClusterMsg) msg);
break;
@@ -118,6 +115,24 @@ public class AppActor extends RuleChainManagerActor {
return true;
}
+ private void initRuleChainsAndTenantActors() {
+ log.info("Starting main system actor.");
+ try {
+ initRuleChains();
+ if (systemContext.isTenantComponentsInitEnabled()) {
+ PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
+ for (Tenant tenant : tenantIterator) {
+ log.debug("[{}] Creating tenant actor", tenant.getId());
+ getOrCreateTenantActor(tenant.getId());
+ log.debug("Tenant actor created.");
+ }
+ }
+ log.info("Main system actor started.");
+ } catch (Exception e) {
+ log.warn("Unknown failure", e);
+ }
+ }
+
private void onPossibleClusterMsg(SendToClusterMsg msg) {
Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
if (address.isPresent()) {
@@ -130,7 +145,8 @@ public class AppActor extends RuleChainManagerActor {
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
- //TODO: ashvayka handle this.
+// this may be a notification about system entities created.
+// log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet: {}", SYSTEM_TENANT, msg);
} else {
getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
}
@@ -143,16 +159,26 @@ public class AppActor extends RuleChainManagerActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
- ActorRef target;
+ ActorRef target = null;
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
target = getEntityActorRef(msg.getEntityId());
} else {
- target = getOrCreateTenantActor(msg.getTenantId());
+ if (msg.getEntityId().getEntityType() == EntityType.TENANT
+ && msg.getEvent() == ComponentLifecycleEvent.DELETED) {
+ log.debug("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
+ ActorRef tenantActor = tenantActors.remove(new TenantId(msg.getEntityId().getId()));
+ if (tenantActor != null) {
+ log.debug("[{}] Deleting tenant actor: {}", msg.getTenantId(), tenantActor);
+ context().stop(tenantActor);
+ }
+ } else {
+ target = getOrCreateTenantActor(msg.getTenantId());
+ }
}
if (target != null) {
target.tell(msg, ActorRef.noSender());
} else {
- logger.debug("Invalid component lifecycle msg: {}", msg);
+ log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
}
}
@@ -161,14 +187,24 @@ public class AppActor extends RuleChainManagerActor {
}
private ActorRef getOrCreateTenantActor(TenantId tenantId) {
- return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
- .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()));
+ return tenantActors.computeIfAbsent(tenantId, k -> {
+ log.debug("[{}] Creating tenant actor.", tenantId);
+ ActorRef tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
+ .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString());
+ context().watch(tenantActor);
+ log.debug("[{}] Created tenant actor: {}.", tenantId, tenantActor);
+ return tenantActor;
+ });
}
- private void processTermination(Terminated message) {
+ @Override
+ protected void processTermination(Terminated message) {
ActorRef terminated = message.actor();
if (terminated instanceof LocalActorRef) {
- logger.debug("Removed actor: {}", terminated);
+ boolean removed = tenantActors.inverse().remove(terminated) != null;
+ if (removed) {
+ log.debug("[{}] Removed actor:", terminated);
+ }
} else {
throw new IllegalStateException("Remote actors are not supported!");
}
@@ -182,20 +218,17 @@ public class AppActor extends RuleChainManagerActor {
}
@Override
- public AppActor create() throws Exception {
+ public AppActor create() {
return new AppActor(context);
}
}
- private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, Directive>() {
- @Override
- public Directive apply(Throwable t) {
- logger.error(t, "Unknown failure");
- if (t instanceof RuntimeException) {
- return SupervisorStrategy.restart();
- } else {
- return SupervisorStrategy.stop();
- }
+ private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+ log.warn("Unknown failure", t);
+ if (t instanceof RuntimeException) {
+ return SupervisorStrategy.restart();
+ } else {
+ return SupervisorStrategy.stop();
}
});
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java b/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java
new file mode 100644
index 0000000..fd8c178
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.app;
+
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+public class AppInitMsg implements TbActorMsg {
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.APP_INIT_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 7b412b1..f53a410 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -15,17 +15,13 @@
*/
package org.thingsboard.server.actors.device;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
-import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
-import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
@@ -34,23 +30,21 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
public class DeviceActor extends ContextAwareActor {
- private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
private final DeviceActorMessageProcessor processor;
- private DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
+ DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
super(systemContext);
- this.processor = new DeviceActorMessageProcessor(systemContext, logger, tenantId, deviceId);
+ this.processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
}
@Override
public void preStart() {
- logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
+ log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
try {
processor.initSessionTimeout(context());
- logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
+ log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
} catch (Exception e) {
- logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId);
+ log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
}
}
@@ -90,22 +84,4 @@ public class DeviceActor extends ContextAwareActor {
return true;
}
- public static class ActorCreator extends ContextBasedCreator<DeviceActor> {
- private static final long serialVersionUID = 1L;
-
- private final TenantId tenantId;
- private final DeviceId deviceId;
-
- public ActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
- super(context);
- this.tenantId = tenantId;
- this.deviceId = deviceId;
- }
-
- @Override
- public DeviceActor create() throws Exception {
- return new DeviceActor(context, tenantId, deviceId);
- }
- }
-
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java
new file mode 100644
index 0000000..18aa926
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+
+public class DeviceActorCreator extends ContextBasedCreator<DeviceActor> {
+ private static final long serialVersionUID = 1L;
+
+ private final TenantId tenantId;
+ private final DeviceId deviceId;
+
+ public DeviceActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
+ super(context);
+ this.tenantId = tenantId;
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public DeviceActor create() {
+ return new DeviceActor(context, tenantId, deviceId);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index a16bd78..833e5ea 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -24,6 +24,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
@@ -88,6 +90,7 @@ import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
*/
+@Slf4j
class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
final TenantId tenantId;
@@ -106,8 +109,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private String deviceType;
private TbMsgMetaData defaultMetaData;
- DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, DeviceId deviceId) {
- super(systemContext, logger);
+ DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
+ super(systemContext);
this.tenantId = tenantId;
this.deviceId = deviceId;
this.sessions = new LinkedHashMap<>();
@@ -136,30 +139,30 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
long timeout = request.getExpirationTime() - System.currentTimeMillis();
if (timeout <= 0) {
- logger.debug("[{}][{}] Ignoring message due to exp time reached", deviceId, request.getId(), request.getExpirationTime());
+ log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
return;
}
boolean sent = rpcSubscriptions.size() > 0;
Set<UUID> syncSessionSet = new HashSet<>();
- rpcSubscriptions.entrySet().forEach(sub -> {
- sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId());
- if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) {
- syncSessionSet.add(sub.getKey());
+ rpcSubscriptions.forEach((key, value) -> {
+ sendToTransport(rpcRequest, key, value.getNodeId());
+ if (TransportProtos.SessionType.SYNC == value.getType()) {
+ syncSessionSet.add(key);
}
});
syncSessionSet.forEach(rpcSubscriptions::remove);
if (request.isOneway() && sent) {
- logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
+ log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
} else {
registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
}
if (sent) {
- logger.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
+ log.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
} else {
- logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
+ log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
}
}
@@ -172,7 +175,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) {
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
- logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
+ log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
}
@@ -181,13 +184,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
TransportProtos.SessionType sessionType = getSessionType(sessionId);
if (!toDeviceRpcPendingMap.isEmpty()) {
- logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
+ log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
if (sessionType == TransportProtos.SessionType.SYNC) {
- logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
+ log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
}
} else {
- logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
+ log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
}
Set<Integer> sentOneWayIds = new HashSet<>();
if (sessionType == TransportProtos.SessionType.ASYNC) {
@@ -335,7 +338,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
if (data != null) {
- logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
+ log.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
.setRequestId(msg.getId()).setError("timeout").build()
, data.getSessionId(), data.getNodeId());
@@ -346,9 +349,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
int requestId = msg.getMsg().getRequestId();
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
if (data != null) {
+ log.debug("[{}] Pushing reply to [{}][{}]!", deviceId, data.getNodeId(), data.getSessionId());
sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
, data.getSessionId(), data.getNodeId());
+ } else {
+ log.debug("[{}][{}] Pending RPC request to server not found!", deviceId, requestId);
}
}
@@ -380,7 +386,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
hasNotificationData = true;
}
} else {
- logger.debug("[{}] No public server side attributes changed!", deviceId);
+ log.debug("[{}] No public server side attributes changed!", deviceId);
}
}
}
@@ -391,27 +397,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
});
}
} else {
- logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
+ log.debug("[{}] No registered attributes subscriptions to process!", deviceId);
}
}
private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
UUID sessionId = getSessionId(sessionInfo);
- logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
+ log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
responseMsg.getPayload(), null));
} else {
- logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
+ log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
}
}
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
- logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
+ log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.remove(sessionId);
} else {
SessionInfoMetaData sessionMD = sessions.get(sessionId);
@@ -419,7 +425,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
}
sessionMD.setSubscribedToAttributes(true);
- logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
+ log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
dumpSessions();
}
@@ -432,7 +438,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
- logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
+ log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
} else {
SessionInfoMetaData sessionMD = sessions.get(sessionId);
@@ -440,7 +446,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
}
sessionMD.setSubscribedToRPC(true);
- logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
+ log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
sendPendingRequests(context, sessionId, sessionInfo);
dumpSessions();
@@ -451,10 +457,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
UUID sessionId = getSessionId(sessionInfo);
if (msg.getEvent() == SessionEvent.OPEN) {
if (sessions.containsKey(sessionId)) {
- logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
+ log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
return;
}
- logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
+ log.debug("[{}] Processing new session [{}]", deviceId, sessionId);
if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
if (sessionIdToRemove != null) {
@@ -467,7 +473,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
dumpSessions();
} else if (msg.getEvent() == SessionEvent.CLOSED) {
- logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
+ log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
@@ -478,19 +484,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
- private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
- UUID sessionId = getSessionId(sessionInfo);
- SessionInfoMetaData sessionMD = sessions.get(sessionId);
- if (sessionMD != null) {
- sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
- sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
- sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
- if (subscriptionInfo.getAttributeSubscription()) {
- attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
- }
- if (subscriptionInfo.getRpcSubscription()) {
- rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
- }
+ private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
+ UUID sessionId = getSessionId(sessionInfoProto);
+ SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
+ id -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
+
+ sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
+ sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
+ sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
+ if (subscriptionInfo.getAttributeSubscription()) {
+ attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+ }
+ if (subscriptionInfo.getRpcSubscription()) {
+ rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
dumpSessions();
}
@@ -623,10 +629,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
private void restoreSessions() {
- logger.debug("[{}] Restoring sessions from cache", deviceId);
- TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
- if (sessionsDump.getSerializedSize() == 0) {
- logger.debug("[{}] No session information found", deviceId);
+ log.debug("[{}] Restoring sessions from cache", deviceId);
+ TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
+ try {
+ sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
+ } catch (InvalidProtocolBufferException e) {
+ log.warn("[{}] Failed to decode device sessions from cache", deviceId);
+ return;
+ }
+ if (sessionsDump.getSessionsCount() == 0) {
+ log.debug("[{}] No session information found", deviceId);
return;
}
for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
@@ -644,13 +656,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
rpcSubscriptions.put(sessionId, sessionInfo);
sessionMD.setSubscribedToRPC(true);
}
- logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
+ log.debug("[{}] Restored session: {}", deviceId, sessionMD);
}
- logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
+ log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
}
private void dumpSessions() {
- logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
+ log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
sessions.forEach((uuid, sessionMD) -> {
if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
@@ -668,11 +680,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
.setSessionInfo(sessionInfoProto)
.setSubscriptionInfo(subscriptionInfoProto).build());
- logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
+ log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
});
systemContext.getDeviceSessionCacheService()
.put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
- .addAllSessions(sessionsList).build());
+ .addAllSessions(sessionsList).build().toByteArray());
}
void initSessionTimeout(ActorContext context) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 1eba066..669ed74 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,10 +17,12 @@ package org.thingsboard.server.actors.rpc;
import akka.actor.ActorRef;
import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
+import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
/**
* @author Andrew Shvayka
@@ -28,19 +30,21 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
@Slf4j
public class BasicRpcSessionListener implements GrpcSessionListener {
+ private final ClusterRpcCallbackExecutorService callbackExecutorService;
private final ActorService service;
private final ActorRef manager;
private final ActorRef self;
- BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
- this.service = service;
+ BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) {
+ this.service = context.getActorService();
+ this.callbackExecutorService = context.getClusterRpcCallbackExecutor();
this.manager = manager;
this.self = self;
}
@Override
public void onConnected(GrpcSession session) {
- log.info("{} session started -> {}", getType(session), session.getRemoteServer());
+ log.info("[{}][{}] session started", session.getRemoteServer(), getType(session));
if (!session.isClient()) {
manager.tell(new RpcSessionConnectedMsg(session.getRemoteServer(), session.getSessionId()), self);
}
@@ -48,21 +52,25 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
@Override
public void onDisconnected(GrpcSession session) {
- log.info("{} session closed -> {}", getType(session), session.getRemoteServer());
+ log.info("[{}][{}] session closed", session.getRemoteServer(), getType(session));
manager.tell(new RpcSessionDisconnectedMsg(session.isClient(), session.getRemoteServer()), self);
}
@Override
public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
- log.trace("{} Service [{}] received session actor msg {}", getType(session),
- session.getRemoteServer(),
- clusterMessage);
- service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
+ log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
+ callbackExecutorService.execute(() -> {
+ try {
+ service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
+ } catch (Exception e) {
+ log.debug("[{}][{}] Failed to process cluster message: {}", session.getRemoteServer(), getType(session), clusterMessage, e);
+ }
+ });
}
@Override
public void onError(GrpcSession session, Throwable t) {
- log.warn("{} session got error -> {}", getType(session), session.getRemoteServer(), t);
+ log.warn("[{}][{}] session got error -> {}", session.getRemoteServer(), getType(session), t);
manager.tell(new RpcSessionClosedMsg(session.isClient(), session.getRemoteServer()), self);
session.close();
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 31320ce..b2931f8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -16,9 +16,12 @@
package org.thingsboard.server.actors.rpc;
import akka.actor.ActorRef;
+import akka.actor.OneForOneStrategy;
import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -29,6 +32,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
+import scala.concurrent.duration.Duration;
import java.util.*;
@@ -37,15 +41,11 @@ import java.util.*;
*/
public class RpcManagerActor extends ContextAwareActor {
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
private final Map<ServerAddress, SessionActorInfo> sessionActors;
-
private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
-
private final ServerAddress instance;
- RpcManagerActor(ActorSystemContext systemContext) {
+ private RpcManagerActor(ActorSystemContext systemContext) {
super(systemContext);
this.sessionActors = new HashMap<>();
this.pendingMsgs = new HashMap<>();
@@ -64,7 +64,7 @@ public class RpcManagerActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
+ public void onReceive(Object msg) {
if (msg instanceof ClusterAPIProtos.ClusterMessage) {
onMsg((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcBroadcastMsg) {
@@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
queue.add(msg);
}
} else {
- logger.warning("Cluster msg doesn't have server address [{}]", msg);
+ log.warn("Cluster msg doesn't have server address [{}]", msg);
}
}
@@ -164,6 +164,7 @@ public class RpcManagerActor extends ContextAwareActor {
log.info("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
if (sessionRef != null && context().sender() != null && context().sender().equals(sessionRef.actor)) {
+ context().stop(sessionRef.actor);
sessionActors.remove(remoteAddress);
pendingMsgs.remove(remoteAddress);
if (reconnect) {
@@ -173,9 +174,13 @@ public class RpcManagerActor extends ContextAwareActor {
}
private void onCreateSessionRequest(RpcSessionCreateRequestMsg msg) {
- ActorRef actorRef = createSessionActor(msg);
if (msg.getRemoteAddress() != null) {
- register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
+ if (!sessionActors.containsKey(msg.getRemoteAddress())) {
+ ActorRef actorRef = createSessionActor(msg);
+ register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
+ }
+ } else {
+ createSessionActor(msg);
}
}
@@ -194,7 +199,8 @@ public class RpcManagerActor extends ContextAwareActor {
private ActorRef createSessionActor(RpcSessionCreateRequestMsg msg) {
log.info("[{}] Creating session actor.", msg.getMsgUid());
ActorRef actor = context().actorOf(
- Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid())).withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
+ Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid()))
+ .withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
actor.tell(msg, context().self());
return actor;
}
@@ -207,8 +213,18 @@ public class RpcManagerActor extends ContextAwareActor {
}
@Override
- public RpcManagerActor create() throws Exception {
+ public RpcManagerActor create() {
return new RpcManagerActor(context);
}
}
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return strategy;
+ }
+
+ private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+ log.warn("Unknown failure", t);
+ return SupervisorStrategy.resume();
+ });
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index 86509ca..2ca5c3e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -15,12 +15,10 @@
*/
package org.thingsboard.server.actors.rpc;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -38,15 +36,15 @@ import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CO
/**
* @author Andrew Shvayka
*/
+@Slf4j
public class RpcSessionActor extends ContextAwareActor {
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private final UUID sessionId;
private GrpcSession session;
private GrpcSessionListener listener;
- public RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
+ private RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
super(systemContext);
this.sessionId = sessionId;
}
@@ -58,7 +56,7 @@ public class RpcSessionActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
+ public void onReceive(Object msg) {
if (msg instanceof ClusterAPIProtos.ClusterMessage) {
tell((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -67,19 +65,29 @@ public class RpcSessionActor extends ContextAwareActor {
}
private void tell(ClusterAPIProtos.ClusterMessage msg) {
- session.sendMsg(msg);
+ if (session != null) {
+ session.sendMsg(msg);
+ } else {
+ log.trace("Failed to send message due to missing session!");
+ }
}
@Override
public void postStop() {
- log.info("Closing session -> {}", session.getRemoteServer());
- session.close();
+ if (session != null) {
+ log.info("Closing session -> {}", session.getRemoteServer());
+ try {
+ session.close();
+ } catch (RuntimeException e) {
+ log.trace("Failed to close session!", e);
+ }
+ }
}
private void initSession(RpcSessionCreateRequestMsg msg) {
log.info("[{}] Initializing session", context().self());
ServerAddress remoteServer = msg.getRemoteAddress();
- listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
+ listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
if (msg.getRemoteAddress() == null) {
// Server session
session = new GrpcSession(listener);
@@ -113,7 +121,7 @@ public class RpcSessionActor extends ContextAwareActor {
}
@Override
- public RpcSessionActor create() throws Exception {
+ public RpcSessionActor create() {
return new RpcSessionActor(context, sessionId);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index dbad7c0..be39320 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.ruleChain;
+import akka.actor.ActorInitializationException;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -33,7 +34,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId) {
super(systemContext, tenantId, ruleChainId);
setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChainId, systemContext,
- logger, context().parent(), context().self()));
+ context().parent(), context().self()));
}
@Override
@@ -79,7 +80,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
}
@Override
- public RuleChainActor create() throws Exception {
+ public RuleChainActor create() {
return new RuleChainActor(context, tenantId, ruleChainId);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 3da90d1..5c6c676 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.utils.UUIDs;
import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
import org.thingsboard.server.actors.service.DefaultActorService;
@@ -55,6 +56,7 @@ import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
*/
+@Slf4j
public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
private static final long DEFAULT_CLUSTER_PARTITION = 0L;
@@ -67,24 +69,34 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
private boolean started;
+ private String ruleChainName;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
- , LoggingAdapter logger, ActorRef parent, ActorRef self) {
- super(systemContext, logger, tenantId, ruleChainId);
+ , ActorRef parent, ActorRef self) {
+ super(systemContext, tenantId, ruleChainId);
this.parent = parent;
this.self = self;
this.nodeActors = new HashMap<>();
this.nodeRoutes = new HashMap<>();
this.service = systemContext.getRuleChainService();
+ this.ruleChainName = ruleChainId.toString();
}
@Override
- public void start(ActorContext context) throws Exception {
+ public String getComponentName() {
+ return null;
+ }
+
+ @Override
+ public void start(ActorContext context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(entityId);
+ ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+ log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
+ log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
@@ -96,16 +108,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
@Override
- public void onUpdate(ActorContext context) throws Exception {
+ public void onUpdate(ActorContext context) {
RuleChain ruleChain = service.findRuleChainById(entityId);
+ ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
-
+ log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
for (RuleNode ruleNode : ruleNodeList) {
RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
if (existing == null) {
+ log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
} else {
+ log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
existing.setSelf(ruleNode);
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
}
@@ -114,6 +129,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
removedRules.forEach(ruleNodeId -> {
+ log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self);
});
@@ -122,7 +138,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
@Override
- public void stop(ActorContext context) throws Exception {
+ public void stop(ActorContext context) {
+ log.trace("[{}][{}] Stopping rule chain with {} nodes", tenantId, entityId, nodeActors.size());
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
nodeActors.clear();
nodeRoutes.clear();
@@ -131,7 +148,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
@Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+ public void onClusterEventMsg(ClusterEventMsg msg) {
}
@@ -148,10 +165,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
+ log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
+ log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) {
@@ -165,13 +184,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
firstId = ruleChain.getFirstRuleNodeId();
- firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
+ firstNode = nodeActors.get(firstId);
state = ComponentLifecycleState.ACTIVE;
}
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
+ log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, envelope.getTbMsg().getId(), envelope.getTbMsg());
checkActive();
if (firstNode != null) {
+ log.trace("[{}][{}] Pushing message to first rule node", entityId, firstId);
pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
}
}
@@ -216,7 +237,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
TbMsg msg = envelope.getMsg();
- logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
+ log.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
}
@@ -230,17 +251,20 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
int relationsCount = relations.size();
EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) {
+ log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (ackId != null) {
// TODO: Ack this message in Kafka
// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
}
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
+ log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(msg, relation.getOut(), relation.getType());
}
} else {
for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut();
+ log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
switch (target.getEntityType()) {
case RULE_NODE:
enqueueAndForwardMsgCopyToNode(msg, target, relation.getType());
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
index 273a569..f5521a0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -32,7 +32,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
super(systemContext, tenantId, ruleNodeId);
this.ruleChainId = ruleChainId;
setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
- logger, context().parent(), context().self()));
+ context().parent(), context().self()));
}
@Override
@@ -60,7 +60,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
- logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
+ }
try {
processor.onRuleToSelfMsg(msg);
increaseMessagesProcessedCount();
@@ -70,7 +72,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
- logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
+ }
try {
processor.onRuleChainToRuleNodeMsg(msg);
increaseMessagesProcessedCount();
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index acb171d..a4bd1d0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -44,8 +44,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
private TbContext defaultCtx;
RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
- , LoggingAdapter logger, ActorRef parent, ActorRef self) {
- super(systemContext, logger, tenantId, ruleNodeId);
+ , ActorRef parent, ActorRef self) {
+ super(systemContext, tenantId, ruleNodeId);
this.parent = parent;
this.self = self;
this.service = systemContext.getRuleChainService();
@@ -75,7 +75,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
@Override
- public void stop(ActorContext context) throws Exception {
+ public void stop(ActorContext context) {
if (tbNode != null) {
tbNode.destroy();
}
@@ -83,7 +83,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
@Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+ public void onClusterEventMsg(ClusterEventMsg msg) {
}
@@ -111,6 +111,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
}
+ @Override
+ public String getComponentName() {
+ return ruleNode.getName();
+ }
+
private TbNode initComponent(RuleNode ruleNode) throws Exception {
Class<?> componentClazz = Class.forName(ruleNode.getType());
TbNode tbNode = (TbNode) (componentClazz.newInstance());
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index ed59051..1f084e1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
import akka.actor.ActorRef;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.actors.stats.StatsPersistMsg;
@@ -32,8 +33,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
*/
public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
- protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
private long lastPersistedErrorTs = 0L;
protected final TenantId tenantId;
protected final T id;
@@ -54,13 +53,14 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void preStart() {
try {
+ log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
processor.start(context());
logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) {
scheduleStatsPersistTick();
}
} catch (Exception e) {
- logger.warning("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
+ log.warn("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
logAndPersist("OnStart", e, true);
logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
}
@@ -70,7 +70,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
try {
processor.scheduleStatsPersistTick(context(), systemContext.getStatisticsPersistFrequency());
} catch (Exception e) {
- logger.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
+ log.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
logAndPersist("onScheduleStatsPersistMsg", e);
}
}
@@ -78,16 +78,18 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void postStop() {
try {
+ log.debug("[{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
processor.stop(context());
logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
} catch (Exception e) {
- logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
+ log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
logAndPersist("OnStop", e, true);
logLifecycleEvent(ComponentLifecycleEvent.STOPPED, e);
}
}
protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
+ log.debug("[{}][{}][{}] onComponentLifecycleMsg: [{}]", tenantId, id, id.getEntityType(), msg.getEvent());
try {
switch (msg.getEvent()) {
case CREATED:
@@ -148,9 +150,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
private void logAndPersist(String method, Exception e, boolean critical) {
errorsOccurred++;
if (critical) {
- logger.warning("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+ log.warn("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
} else {
- logger.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+ log.debug("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
}
long ts = System.currentTimeMillis();
if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 3624127..a53b88e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -15,14 +15,17 @@
*/
package org.thingsboard.server.actors.service;
+import akka.actor.Terminated;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.msg.TbActorMsg;
+
public abstract class ContextAwareActor extends UntypedActor {
- protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
public static final int ENTITY_PACK_LIMIT = 1024;
@@ -34,22 +37,27 @@ public abstract class ContextAwareActor extends UntypedActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- if (logger.isDebugEnabled()) {
- logger.debug("Processing msg: {}", msg);
+ public void onReceive(Object msg) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing msg: {}", msg);
}
if (msg instanceof TbActorMsg) {
try {
if (!process((TbActorMsg) msg)) {
- logger.warning("Unknown message: {}!", msg);
+ log.warn("Unknown message: {}!", msg);
}
} catch (Exception e) {
throw e;
}
+ } else if (msg instanceof Terminated) {
+ processTermination((Terminated) msg);
} else {
- logger.warning("Unknown message: {}!", msg);
+ log.warn("Unknown message: {}!", msg);
}
}
+ protected void processTermination(Terminated msg) {
+ }
+
protected abstract boolean process(TbActorMsg msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 85b8943..69061ba 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -22,11 +22,14 @@ import akka.actor.Terminated;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.app.AppActor;
+import org.thingsboard.server.actors.app.AppInitMsg;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcManagerActor;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
@@ -54,6 +57,12 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
@Service
@@ -86,6 +95,8 @@ public class DefaultActorService implements ActorService {
private ActorRef rpcManagerActor;
+ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
@PostConstruct
public void initActorSystem() {
log.info("Initializing Actor system. {}", actorContext.getRuleChainService());
@@ -106,6 +117,12 @@ public class DefaultActorService implements ActorService {
log.info("Actor system initialized.");
}
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Sending application init message to actor system");
+ appActor.tell(new AppInitMsg(), ActorRef.noSender());
+ }
+
@PreDestroy
public void stopActorSystem() {
Future<Terminated> status = system.terminate();
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index c809782..b707baf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -22,22 +22,22 @@ import akka.event.LoggingAdapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
+@Slf4j
public abstract class AbstractContextAwareMsgProcessor {
protected final ActorSystemContext systemContext;
- protected final LoggingAdapter logger;
protected final ObjectMapper mapper = new ObjectMapper();
- protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger) {
+ protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext) {
super();
this.systemContext = systemContext;
- this.logger = logger;
}
private Scheduler getScheduler() {
@@ -53,7 +53,7 @@ public abstract class AbstractContextAwareMsgProcessor {
}
private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) {
- logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
+ log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
}
@@ -62,7 +62,7 @@ public abstract class AbstractContextAwareMsgProcessor {
}
private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
- logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
+ log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index c9dc307..46a76e6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -19,6 +19,7 @@ import akka.actor.ActorContext;
import akka.event.LoggingAdapter;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.stats.StatsPersistTick;
import org.thingsboard.server.common.data.id.EntityId;
@@ -30,18 +31,21 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import javax.annotation.Nullable;
import java.util.function.Consumer;
+@Slf4j
public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
protected final TenantId tenantId;
protected final T entityId;
protected ComponentLifecycleState state;
- protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
- super(systemContext, logger);
+ protected ComponentMsgProcessor(ActorSystemContext systemContext, TenantId tenantId, T id) {
+ super(systemContext);
this.tenantId = tenantId;
this.entityId = id;
}
+ public abstract String getComponentName();
+
public abstract void start(ActorContext context) throws Exception;
public abstract void stop(ActorContext context) throws Exception;
@@ -79,7 +83,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
protected void checkActive() {
if (state != ComponentLifecycleState.ACTIVE) {
- logger.warning("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
+ log.warn("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
index 1b9e6a8..dd03d69 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -20,6 +20,8 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
@@ -39,11 +41,11 @@ import java.util.Map;
public abstract class EntityActorsManager<T extends EntityId, A extends UntypedActor, M extends SearchTextBased<? extends UUIDBased>> {
protected final ActorSystemContext systemContext;
- protected final Map<T, ActorRef> actors;
+ protected final BiMap<T, ActorRef> actors;
public EntityActorsManager(ActorSystemContext systemContext) {
this.systemContext = systemContext;
- this.actors = new HashMap<>();
+ this.actors = HashBiMap.create();
}
protected abstract TenantId getTenantId();
@@ -65,7 +67,8 @@ public abstract class EntityActorsManager<T extends EntityId, A extends UntypedA
}
}
- public void visit(M entity, ActorRef actorRef) {}
+ public void visit(M entity, ActorRef actorRef) {
+ }
public ActorRef getOrCreateActor(ActorContext context, T entityId) {
return actors.computeIfAbsent(entityId, eId ->
diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
index 8623370..79aa6da 100644
--- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
@@ -15,10 +15,9 @@
*/
package org.thingsboard.server.actors.stats;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -27,9 +26,9 @@ import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+@Slf4j
public class StatsActor extends ContextAwareActor {
- private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
private final ObjectMapper mapper = new ObjectMapper();
public StatsActor(ActorSystemContext context) {
@@ -43,13 +42,13 @@ public class StatsActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("Received message: {}", msg);
+ public void onReceive(Object msg) {
+ log.debug("Received message: {}", msg);
if (msg instanceof StatsPersistMsg) {
try {
onStatsPersistMsg((StatsPersistMsg) msg);
} catch (Exception e) {
- logger.warning("Failed to persist statistics: {}", msg, e);
+ log.warn("Failed to persist statistics: {}", msg, e);
}
}
}
@@ -75,7 +74,7 @@ public class StatsActor extends ContextAwareActor {
}
@Override
- public StatsActor create() throws Exception {
+ public StatsActor create() {
return new StatsActor(context);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 721d828..0d693ee 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -17,15 +17,19 @@ package org.thingsboard.server.actors.tenant;
import akka.actor.ActorInitializationException;
import akka.actor.ActorRef;
+import akka.actor.LocalActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
+import akka.actor.Terminated;
import akka.japi.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.device.DeviceActor;
+import org.thingsboard.server.actors.device.DeviceActorCreator;
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
-import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
@@ -33,6 +37,7 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
@@ -47,15 +52,14 @@ import java.util.Map;
public class TenantActor extends RuleChainManagerActor {
private final TenantId tenantId;
- private final Map<DeviceId, ActorRef> deviceActors;
+ private final BiMap<DeviceId, ActorRef> deviceActors;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext, new TenantRuleChainManager(systemContext, tenantId));
this.tenantId = tenantId;
- this.deviceActors = new HashMap<>();
+ this.deviceActors = HashBiMap.create();
}
-
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
@@ -63,16 +67,21 @@ public class TenantActor extends RuleChainManagerActor {
@Override
public void preStart() {
- logger.info("[{}] Starting tenant actor.", tenantId);
+ log.info("[{}] Starting tenant actor.", tenantId);
try {
initRuleChains();
- logger.info("[{}] Tenant actor started.", tenantId);
+ log.info("[{}] Tenant actor started.", tenantId);
} catch (Exception e) {
- logger.error(e, "[{}] Unknown failure", tenantId);
+ log.warn("[{}] Unknown failure", tenantId, e);
}
}
@Override
+ public void postStop() {
+ log.info("[{}] Stopping tenant actor.", tenantId);
+ }
+
+ @Override
protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
case CLUSTER_EVENT_MSG:
@@ -105,22 +114,20 @@ public class TenantActor extends RuleChainManagerActor {
return true;
}
- @Override
- protected void broadcast(Object msg) {
- super.broadcast(msg);
-// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
- }
-
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
- if (ruleChainManager.getRootChainActor()!=null)
- ruleChainManager.getRootChainActor().tell(msg, self());
- else logger.info("[{}] No Root Chain", msg);
+ if (ruleChainManager.getRootChainActor() != null) {
+ ruleChainManager.getRootChainActor().tell(msg, self());
+ } else {
+ log.info("[{}] No Root Chain: {}", tenantId, msg);
+ }
}
private void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg msg) {
- if (ruleChainManager.getRootChainActor()!=null)
- ruleChainManager.getRootChainActor().tell(msg, self());
- else logger.info("[{}] No Root Chain", msg);
+ if (ruleChainManager.getRootChainActor() != null) {
+ ruleChainManager.getRootChainActor().tell(msg, self());
+ } else {
+ log.info("[{}] No Root Chain: {}", tenantId, msg);
+ }
}
private void onRuleChainMsg(RuleChainAwareMsg msg) {
@@ -141,13 +148,35 @@ public class TenantActor extends RuleChainManagerActor {
}
target.tell(msg, ActorRef.noSender());
} else {
- logger.debug("Invalid component lifecycle msg: {}", msg);
+ log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
}
}
private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
- return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
- .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()));
+ return deviceActors.computeIfAbsent(deviceId, k -> {
+ log.debug("[{}][{}] Creating device actor.", tenantId, deviceId);
+ ActorRef deviceActor = context().actorOf(Props.create(new DeviceActorCreator(systemContext, tenantId, deviceId))
+ .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME)
+ , deviceId.toString());
+ context().watch(deviceActor);
+ log.debug("[{}][{}] Created device actor: {}.", tenantId, deviceId, deviceActor);
+ return deviceActor;
+ });
+ }
+
+ @Override
+ protected void processTermination(Terminated message) {
+ ActorRef terminated = message.actor();
+ if (terminated instanceof LocalActorRef) {
+ boolean removed = deviceActors.inverse().remove(terminated) != null;
+ if (removed) {
+ log.debug("[{}] Removed actor:", terminated);
+ } else {
+ log.warn("[{}] Removed actor was not found in the device map!");
+ }
+ } else {
+ throw new IllegalStateException("Remote actors are not supported!");
+ }
}
public static class ActorCreator extends ContextBasedCreator<TenantActor> {
@@ -161,7 +190,7 @@ public class TenantActor extends RuleChainManagerActor {
}
@Override
- public TenantActor create() throws Exception {
+ public TenantActor create() {
return new TenantActor(context, tenantId);
}
}
@@ -169,8 +198,8 @@ public class TenantActor extends RuleChainManagerActor {
private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
- logger.error(t, "Unknown failure");
- if(t instanceof ActorInitializationException){
+ log.warn("[{}] Unknown failure", tenantId, t);
+ if (t instanceof ActorInitializationException) {
return SupervisorStrategy.stop();
} else {
return SupervisorStrategy.resume();
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index be1b6db..825929f 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -19,9 +19,11 @@ import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.ExceptionHandler;
@@ -152,6 +154,11 @@ public abstract class BaseController {
@Autowired
protected AttributesService attributesService;
+ @Value("${server.log_controller_error_stack_trace}")
+ @Getter
+ private boolean logControllerErrorStackTrace;
+
+
@ExceptionHandler(ThingsboardException.class)
public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
errorResponseHandler.handle(ex, response);
@@ -162,7 +169,7 @@ public abstract class BaseController {
}
private ThingsboardException handleException(Exception exception, boolean logException) {
- if (logException) {
+ if (logException && logControllerErrorStackTrace) {
log.error("Error [{}]", exception.getMessage(), exception);
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/DashboardController.java b/application/src/main/java/org/thingsboard/server/controller/DashboardController.java
index 450b976..0bb6a66 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DashboardController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DashboardController.java
@@ -52,7 +52,6 @@ public class DashboardController extends BaseController {
public static final String DASHBOARD_ID = "dashboardId";
@Value("${dashboard.max_datapoints_limit}")
- @Getter
private long maxDatapointsLimit;
diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
index 9b7ffd2..2a9c073 100644
--- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
@@ -29,7 +29,6 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
-import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntitySubtype;
@@ -39,7 +38,6 @@ import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -47,7 +45,6 @@ import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
-import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.service.security.model.SecurityUser;
@@ -174,7 +171,7 @@ public class EntityViewController extends BaseController {
EntityView entityView = checkEntityViewId(entityViewId);
entityViewService.deleteEntityView(entityViewId);
logEntityAction(entityViewId, entityView, entityView.getCustomerId(),
- ActionType.DELETED,null, strEntityViewId);
+ ActionType.DELETED, null, strEntityViewId);
} catch (Exception e) {
logEntityAction(emptyId(EntityType.ENTITY_VIEW),
null,
@@ -185,10 +182,23 @@ public class EntityViewController extends BaseController {
}
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/tenant/entityViews", params = {"entityViewName"}, method = RequestMethod.GET)
+ @ResponseBody
+ public EntityView getTenantEntityView(
+ @RequestParam String entityViewName) throws ThingsboardException {
+ try {
+ TenantId tenantId = getCurrentUser().getTenantId();
+ return checkNotNull(entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName));
+ } catch (Exception e) {
+ throw handleException(e);
+ }
+ }
+
+ @PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/customer/{customerId}/entityView/{entityViewId}", method = RequestMethod.POST)
@ResponseBody
public EntityView assignEntityViewToCustomer(@PathVariable(CUSTOMER_ID) String strCustomerId,
- @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
+ @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
checkParameter(CUSTOMER_ID, strCustomerId);
checkParameter(ENTITY_VIEW_ID, strEntityViewId);
try {
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 6c37614..ef09a7a 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -49,9 +49,11 @@ import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
@@ -60,12 +62,10 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
-import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.telemetry.AttributeData;
-import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.telemetry.TsData;
import org.thingsboard.server.service.telemetry.exception.InvalidParametersException;
import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
@@ -250,6 +250,60 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/timeseries/delete", method = RequestMethod.DELETE)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> deleteEntityTimeseries(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @RequestParam(name = "keys") String keysStr,
+ @RequestParam(name = "deleteAllDataForKeys", defaultValue = "false") boolean deleteAllDataForKeys,
+ @RequestParam(name = "startTs", required = false) Long startTs,
+ @RequestParam(name = "endTs", required = false) Long endTs,
+ @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted);
+ }
+
+ private DeferredResult<ResponseEntity> deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys,
+ Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException {
+ List<String> keys = toKeysList(keysStr);
+ if (keys.isEmpty()) {
+ return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
+ }
+ SecurityUser user = getCurrentUser();
+
+ long deleteFromTs;
+ long deleteToTs;
+ if (deleteAllDataForKeys) {
+ deleteFromTs = 0L;
+ deleteToTs = System.currentTimeMillis();
+ } else {
+ deleteFromTs = startTs;
+ deleteToTs = endTs;
+ }
+
+ return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, entityId) -> {
+ List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>();
+ for (String key : keys) {
+ deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
+ }
+
+ ListenableFuture<List<Void>> future = tsService.remove(entityId, deleteTsKvQueries);
+ Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> tmp) {
+ logTimeseriesDeleted(user, entityId, keys, null);
+ result.setResult(new ResponseEntity<>(HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logTimeseriesDeleted(user, entityId, keys, t);
+ result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ }, executor);
+ });
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
@ResponseBody
public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("deviceId") String deviceIdStr,
@@ -506,6 +560,15 @@ public class TelemetryController extends BaseController {
};
}
+ private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) {
+ try {
+ logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e),
+ keys);
+ } catch (ThingsboardException te) {
+ log.warn("Failed to log timeseries delete", te);
+ }
+ }
+
private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),
diff --git a/application/src/main/java/org/thingsboard/server/controller/TenantController.java b/application/src/main/java/org/thingsboard/server/controller/TenantController.java
index 1a7c116..155f0a1 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TenantController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TenantController.java
@@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.service.install.InstallScripts;
@@ -84,6 +85,8 @@ public class TenantController extends BaseController {
try {
TenantId tenantId = new TenantId(toUUID(strTenantId));
tenantService.deleteTenant(tenantId);
+
+ actorService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED);
} catch (Exception e) {
throw handleException(e);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index bb92642..e0bdd12 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -30,12 +31,14 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.server.actors.service.ActorService;
@@ -51,13 +54,15 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
+
/**
* @author Andrew Shvayka
*/
@Service
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
@Slf4j
-public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener, ApplicationListener<ApplicationReadyEvent> {
+public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
@Value("${zk.url}")
private String zkUrl;
@@ -95,6 +100,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
private PathChildrenCache cache;
private String nodePath;
+ private volatile boolean stopped = false;
+
@PostConstruct
public void init() {
log.info("Initializing...");
@@ -115,6 +122,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
cache.start();
} catch (Exception e) {
log.error("Failed to connect to ZK: {}", e.getMessage(), e);
+ CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(client);
throw new RuntimeException(e);
}
@@ -122,25 +130,50 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@PreDestroy
public void destroy() {
+ stopped = true;
unpublishCurrentServer();
+ CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(client);
log.info("Stopped discovery service");
}
@Override
- public void publishCurrentServer() {
+ public synchronized void publishCurrentServer() {
+ ServerInstance self = this.serverInstance.getSelf();
+ if (currentServerExists()) {
+ log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath);
+ } else {
+ try {
+ log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
+ nodePath = client.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
+ log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+ client.getConnectionStateListenable().addListener(checkReconnect(self));
+ } catch (Exception e) {
+ log.error("Failed to create ZK node", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private boolean currentServerExists() {
+ if (nodePath == null) {
+ return false;
+ }
try {
ServerInstance self = this.serverInstance.getSelf();
- log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
- nodePath = client.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
- log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
- client.getConnectionStateListenable().addListener(checkReconnect(self));
+ ServerAddress registeredServerAdress = null;
+ registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath));
+ if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) {
+ return true;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ log.info("ZK node does not exist: {}", nodePath);
} catch (Exception e) {
- log.error("Failed to create ZK node", e);
- throw new RuntimeException(e);
+ log.error("Couldn't check if ZK node exists", e);
}
+ return false;
}
private ConnectionStateListener checkReconnect(ServerInstance self) {
@@ -200,8 +233,17 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.collect(Collectors.toList());
}
- @Override
+ @EventListener(ApplicationReadyEvent.class)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Starting current ZK node.");
+ if (stopped) {
+ log.debug("Ignoring application ready event. Service is stopped.");
+ return;
+ }
+ if (client.getState() != CuratorFrameworkState.STARTED) {
+ log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState());
+ return;
+ }
publishCurrentServer();
getOtherServers().forEach(
server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
@@ -210,6 +252,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
+ if (stopped) {
+ log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
+ return;
+ }
+ if (client.getState() != CuratorFrameworkState.STARTED) {
+ log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState());
+ return;
+ }
ChildData data = pathChildrenCacheEvent.getData();
if (data == null) {
log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
@@ -218,6 +268,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
return;
} else if (nodePath != null && nodePath.equals(data.getPath())) {
+ if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
+ log.info("ZK node for current instance is somehow deleted.");
+ publishCurrentServer();
+ }
log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
return;
}
diff --git a/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java
new file mode 100644
index 0000000..2def8b0
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.executors;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ClusterRpcCallbackExecutorService extends AbstractListeningExecutor {
+
+ @Value("${actors.cluster.grpc_callback_thread_pool_size}")
+ private int grpcCallbackExecutorThreadPoolSize;
+
+ @Override
+ protected int getThreadPollSize() {
+ return grpcCallbackExecutorThreadPoolSize;
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index db31bda..edd7b35 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -71,7 +71,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
private int maxErrors;
private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
- protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
+ private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
@@ -100,7 +100,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
- responseBuilder.groupId("rule-engine-node");
+ responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new RemoteJsResponseDecoder());
@@ -136,6 +136,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setCompileRequest(jsRequest)
.build();
+ log.trace("Post compile request for scriptId [{}]", scriptId);
ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
diff --git a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
index 544b6fd..68d9ada 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
@@ -22,6 +22,7 @@ import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.MalformedJwtException;
import io.jsonwebtoken.SignatureException;
import io.jsonwebtoken.UnsupportedJwtException;
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.authentication.BadCredentialsException;
@@ -29,12 +30,11 @@ import org.thingsboard.server.service.security.exception.JwtExpiredTokenExceptio
import java.io.Serializable;
+@Slf4j
public class RawAccessJwtToken implements JwtToken, Serializable {
private static final long serialVersionUID = -797397445703066079L;
- private static Logger logger = LoggerFactory.getLogger(RawAccessJwtToken.class);
-
private String token;
public RawAccessJwtToken(String token) {
@@ -52,10 +52,10 @@ public class RawAccessJwtToken implements JwtToken, Serializable {
try {
return Jwts.parser().setSigningKey(signingKey).parseClaimsJws(this.token);
} catch (UnsupportedJwtException | MalformedJwtException | IllegalArgumentException | SignatureException ex) {
- logger.error("Invalid JWT Token", ex);
+ log.error("Invalid JWT Token", ex);
throw new BadCredentialsException("Invalid JWT token: ", ex);
} catch (ExpiredJwtException expiredEx) {
- logger.info("JWT Token is expired", expiredEx);
+ log.info("JWT Token is expired", expiredEx);
throw new JwtExpiredTokenException(this, "JWT Token expired", expiredEx);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
index 6201dab..c78d7df 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -22,8 +22,8 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.UUID;
import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
@@ -35,16 +35,23 @@ import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
@Override
- @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
- public DeviceSessionsCacheEntry get(DeviceId deviceId) {
+ @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
+ public byte[] get(DeviceId deviceId) {
log.debug("[{}] Fetching session data from cache", deviceId);
- return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
+ return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build().toByteArray();
}
@Override
- @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
- public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
- log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
+ @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
+ public byte[] put(DeviceId deviceId, byte[] sessions) {
+ log.debug("[{}] Pushing session data to cache: {}", deviceId, sessions);
return sessions;
}
+
+ public static void main (String[] args){
+ UUID uuid = UUID.fromString("d5db434e-9cd2-4903-8b3b-421b2d93664d");
+ System.out.println(uuid.getMostSignificantBits());
+ System.out.println(uuid.getLeastSignificantBits());
+ }
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
index 0a2e6a5..a9a1702 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
@@ -23,8 +23,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheE
*/
public interface DeviceSessionCacheService {
- DeviceSessionsCacheEntry get(DeviceId deviceId);
+ byte[] get(DeviceId deviceId);
- DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
+ byte[] put(DeviceId deviceId, byte[] sessions);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 7c77242..30299ee 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -35,23 +35,11 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.BooleanDataEntry;
-import org.thingsboard.server.common.data.kv.DataType;
-import org.thingsboard.server.common.data.kv.DoubleDataEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
-import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entityview.EntityViewService;
-import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
@@ -68,7 +56,6 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -339,9 +326,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
Set<Subscription> subscriptions = e.getValue();
Optional<ServerAddress> newAddressOptional = routingService.resolveById(e.getKey());
if (newAddressOptional.isPresent()) {
- newAddressOptional.ifPresent(serverAddress -> checkSubsciptionsNewAddress(serverAddress, subscriptions));
+ newAddressOptional.ifPresent(serverAddress -> checkSubscriptionsNewAddress(serverAddress, subscriptions));
} else {
- checkSubsciptionsPrevAddress(subscriptions);
+ checkSubscriptionsPrevAddress(subscriptions);
}
if (subscriptions.size() == 0) {
log.trace("[{}] No more subscriptions for this device on current server.", e.getKey());
@@ -350,7 +337,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
}
- private void checkSubsciptionsNewAddress(ServerAddress newAddress, Set<Subscription> subscriptions) {
+ private void checkSubscriptionsNewAddress(ServerAddress newAddress, Set<Subscription> subscriptions) {
Iterator<Subscription> subscriptionIterator = subscriptions.iterator();
while (subscriptionIterator.hasNext()) {
Subscription s = subscriptionIterator.next();
@@ -368,7 +355,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
}
- private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) {
+ private void checkSubscriptionsPrevAddress(Set<Subscription> subscriptions) {
for (Subscription s : subscriptions) {
if (s.isLocal() && s.getServer() != null) {
log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());
@@ -381,7 +368,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
EntityId entityId = subscription.getEntityId();
- log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
+ log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
registerSubscription(sessionId, entityId, subscription);
if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
final Map<String, Long> keyStates = subscription.getKeyStates();
@@ -401,17 +388,22 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
long curTs = System.currentTimeMillis();
List<ReadTsKvQuery> queries = new ArrayList<>();
subscription.getKeyStates().entrySet().forEach(e -> {
- queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+ if (curTs > e.getValue()) {
+ queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs, 0, 1000, Aggregation.NONE));
+ } else {
+ log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", sessionId, subscription, entityId, curTs);
+ }
});
-
- DonAsynchron.withCallback(tsService.findAll(entityId, queries),
- missedUpdates -> {
- if (missedUpdates != null && !missedUpdates.isEmpty()) {
- tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
- }
- },
- e -> log.error("Failed to fetch missed updates.", e),
- tsCallBackExecutor);
+ if (!queries.isEmpty()) {
+ DonAsynchron.withCallback(tsService.findAll(entityId, queries),
+ missedUpdates -> {
+ if (missedUpdates != null && !missedUpdates.isEmpty()) {
+ tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+ }
+ },
+ e -> log.error("Failed to fetch missed updates.", e),
+ tsCallBackExecutor);
+ }
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 6c903cc..45f8433 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -29,6 +29,9 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ActorService;
@@ -127,7 +130,11 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
ruleEngineConsumer = ruleEngineConsumerBuilder.build();
ruleEngineConsumer.subscribe();
+ }
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Starting polling for events.");
LocalBucketBuilder builder = Bucket4j.builder();
builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
@@ -149,6 +156,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
records.forEach(record -> {
try {
ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
+ log.trace("Forwarding message to rule engine {}", toRuleEngineMsg);
if (toRuleEngineMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
}
@@ -175,18 +183,21 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
@Override
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
- notificationsProducer.send(notificationsTopic + "." + nodeId,
- new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(),
- ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
- , new QueueCallbackAdaptor(onSuccess, onFailure));
+ String topic = notificationsTopic + "." + nodeId;
+ UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
+ ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build();
+ log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg);
+ notificationsProducer.send(topic, sessionId.toString(), transportMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
}
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
if (address.isPresent()) {
+ log.trace("[{}] Pushing message to remote server: {}", address.get(), toDeviceActorMsg);
rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
} else {
+ log.trace("Pushing message to local server: {}", toDeviceActorMsg);
actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 6d422f4..88033aa 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -19,6 +19,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@@ -93,6 +95,11 @@ public class RemoteTransportApiService {
builder.executor(transportCallbackExecutor);
builder.handler(transportApiService);
transportApiTemplate = builder.build();
+ }
+
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Starting polling for events.");
transportApiTemplate.init();
}
diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf
index 763319e..28673f6 100644
--- a/application/src/main/resources/actor-system.conf
+++ b/application/src/main/resources/actor-system.conf
@@ -19,7 +19,7 @@ akka {
# JVM shutdown, System.exit(-1), in case of a fatal error,
# such as OutOfMemoryError
jvm-exit-on-fatal-error = off
- loglevel = "DEBUG"
+ loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 51ddaa9..f8f71e0 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -31,6 +31,7 @@ server:
key-store-type: "${SSL_KEY_STORE_TYPE:PKCS12}"
# Alias that identifies the key in the key store
key-alias: "${SSL_KEY_ALIAS:tomcat}"
+ log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}"
# Zookeeper connection parameters. Used for service discovery.
zk:
@@ -63,7 +64,7 @@ cluster:
# Plugins configuration parameters
plugins:
- # Comma seperated package list used during classpath scanning for plugins
+ # Comma separated package list used during classpath scanning for plugins
scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine}"
# Security parameters
@@ -83,6 +84,7 @@ dashboard:
max_datapoints_limit: "${DASHBOARD_MAX_DATAPOINTS_LIMIT:50000}"
database:
+ ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by single API call to fetch telemetry records
entities:
type: "${DATABASE_ENTITIES_TYPE:sql}" # cassandra OR sql
ts:
@@ -105,7 +107,7 @@ cassandra:
metrics: "${CASSANDRA_DISABLE_METRICS:true}"
# NONE SNAPPY LZ4
compression: "${CASSANDRA_COMPRESSION:none}"
- # Specify cassandra claster initialization timeout (if no hosts available during startup)
+ # Specify cassandra cluster initialization timeout in milliseconds (if no hosts available during startup)
init_timeout_ms: "${CASSANDRA_CLUSTER_INIT_TIMEOUT_MS:300000}"
# Specify cassandra claster initialization retry interval (if no hosts available during startup)
init_retry_interval_ms: "${CASSANDRA_CLUSTER_INIT_RETRY_INTERVAL_MS:3000}"
@@ -151,6 +153,8 @@ sql:
# Actor system parameters
actors:
+ cluster:
+ grpc_callback_thread_pool_size: "${ACTORS_CLUSTER_GRPC_CALLBACK_THREAD_POOL_SIZE:10}"
tenant:
create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}"
session:
@@ -306,7 +310,7 @@ audit_log:
"user": "${AUDIT_LOG_MASK_USER:W}"
"rule_chain": "${AUDIT_LOG_MASK_RULE_CHAIN:W}"
"alarm": "${AUDIT_LOG_MASK_ALARM:W}"
- "entity_view": "${AUDIT_LOG_MASK_RULE_CHAIN:W}"
+ "entity_view": "${AUDIT_LOG_MASK_ENTITY_VIEW:W}"
sink:
# Type of external sink. possible options: none, elasticsearch
type: "${AUDIT_LOG_SINK_TYPE:none}"
@@ -320,7 +324,7 @@ audit_log:
date_format: "${AUDIT_LOG_SINK_DATE_FORMAT:YYYY.MM.DD}"
scheme_name: "${AUDIT_LOG_SINK_SCHEME_NAME:http}" # http or https
host: "${AUDIT_LOG_SINK_HOST:localhost}"
- port: "${AUDIT_LOG_SINK_POST:9200}"
+ port: "${AUDIT_LOG_SINK_PORT:9200}"
user_name: "${AUDIT_LOG_SINK_USER_NAME:}"
password: "${AUDIT_LOG_SINK_PASSWORD:}"
@@ -340,7 +344,7 @@ kafka:
requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
- request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
+ max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
request_poll_interval: "${TB_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
request_auto_commit_interval: "${TB_TRANSPORT_REQUEST_AUTO_COMMIT_INTERVAL_MS:100}"
rule_engine:
@@ -367,7 +371,7 @@ js:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
# JS Eval responses topic prefix that is combined with node id
- response_topic_prefix: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.responses}"
+ response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
# JS Eval max pending requests
max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
# JS Eval max request timeout
@@ -405,6 +409,9 @@ transport:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
+ json:
+ # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
+ type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
# Local HTTP transport parameters
http:
enabled: "${HTTP_ENABLED:true}"
diff --git a/application/src/test/resources/logback.xml b/application/src/test/resources/logback.xml
index b77027a..47dacce 100644
--- a/application/src/test/resources/logback.xml
+++ b/application/src/test/resources/logback.xml
@@ -9,7 +9,7 @@
<logger name="org.thingsboard.server" level="WARN"/>
<logger name="org.springframework" level="WARN"/>
- <logger name="org.springframework.boot.test" level="DEBUG"/>
+ <logger name="org.springframework.boot.test" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>
<logger name="org.cassandraunit" level="INFO"/>
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
index c37d460..822387c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
@@ -24,6 +24,7 @@ public enum ActionType {
UPDATED(false), // log entity
ATTRIBUTES_UPDATED(false), // log attributes/values
ATTRIBUTES_DELETED(false), // log attributes
+ TIMESERIES_DELETED(false), // log timeseries
RPC_CALL(false), // log method and params
CREDENTIALS_UPDATED(false), // log new credentials
ASSIGNED_TO_CUSTOMER(false), // log customer name
@@ -32,11 +33,11 @@ public enum ActionType {
SUSPENDED(false), // log string id
CREDENTIALS_READ(true), // log device id
ATTRIBUTES_READ(true), // log attributes
- RELATION_ADD_OR_UPDATE (false),
- RELATION_DELETED (false),
- RELATIONS_DELETED (false),
- ALARM_ACK (false),
- ALARM_CLEAR (false);
+ RELATION_ADD_OR_UPDATE(false),
+ RELATION_DELETED(false),
+ RELATIONS_DELETED(false),
+ ALARM_ACK(false),
+ ALARM_CLEAR(false);
private final boolean isRead;
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/objects/AttributesEntityView.java b/common/data/src/main/java/org/thingsboard/server/common/data/objects/AttributesEntityView.java
index 1c32579..d125f26 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/objects/AttributesEntityView.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/objects/AttributesEntityView.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.objects;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -26,7 +27,7 @@ import java.util.List;
*/
@Data
@NoArgsConstructor
-public class AttributesEntityView {
+public class AttributesEntityView implements Serializable {
private List<String> cs = new ArrayList<>();
private List<String> ss = new ArrayList<>();
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/objects/TelemetryEntityView.java b/common/data/src/main/java/org/thingsboard/server/common/data/objects/TelemetryEntityView.java
index c899c65..6b9f5dd 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/objects/TelemetryEntityView.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/objects/TelemetryEntityView.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.objects;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -26,7 +27,7 @@ import java.util.List;
*/
@Data
@NoArgsConstructor
-public class TelemetryEntityView {
+public class TelemetryEntityView implements Serializable {
private List<String> timeseries;
private AttributesEntityView attributes;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 44a98d6..a63ef6b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -28,6 +28,8 @@ public enum MsgType {
*/
CLUSTER_EVENT_MSG,
+ APP_INIT_MSG,
+
/**
* All messages, could be send to cluster
*/
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index bd42f31..ee652f4 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -77,9 +77,10 @@ public class TBKafkaProducerTemplate<T> {
result.all().get();
} catch (Exception e) {
if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
- log.trace("[{}] Topic already exists: ", defaultTopic);
+ log.trace("[{}] Topic already exists.", defaultTopic);
} else {
- log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+ log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+ throw new RuntimeException(e);
}
}
//Maybe this should not be cached, but we don't plan to change size of partitions
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index 77ad033..b18ccc2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -23,6 +23,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -83,7 +86,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
result.all().get();
} catch (Exception e) {
- log.trace("Failed to create topic: {}", e.getMessage(), e);
+ if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
+ log.trace("[{}] Topic already exists. ", responseTemplate.getTopic());
+ } else {
+ log.info("[{}] Failed to create topic: {}", responseTemplate.getTopic(), e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
}
this.requestTemplate.init();
tickTs = System.currentTimeMillis();
@@ -92,7 +101,11 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
long nextCleanupMs = 0L;
while (!stopped) {
ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+ if (responses.count() > 0) {
+ log.trace("Polling responses completed, consumer records count [{}]", responses.count());
+ }
responses.forEach(response -> {
+ log.trace("Received response to Kafka Template request: {}", response);
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
Response decodedResponse = null;
UUID requestId = null;
@@ -109,6 +122,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
if (requestId == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
+ log.trace("[{}] Response received", requestId);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
@@ -132,6 +146,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
if (kv.getValue().expTime < tickTs) {
ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
if (staleRequest != null) {
+ log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
staleRequest.future.setException(new TimeoutException());
}
}
@@ -158,9 +173,17 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
SettableFuture<Response> future = SettableFuture.create();
- pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+ ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
+ pendingRequests.putIfAbsent(requestId, responseMetaData);
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
- requestTemplate.send(key, request, headers, null);
+ log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
+ requestTemplate.send(key, request, headers, (metadata, exception) -> {
+ if (exception != null) {
+ log.trace("[{}] Failed to post the request", requestId, exception);
+ } else {
+ log.trace("[{}] Posted the request", requestId, metadata);
+ }
+ });
return future;
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 4c23ac2..e10cd3c 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.kafka;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -127,6 +128,10 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
log.warn("[{}] Failed to process the request: {}", requestId, request, e);
}
});
+ } catch (InterruptException ie) {
+ if (!stopped) {
+ log.warn("Fetching data from kafka was interrupted.", ie);
+ }
} catch (Throwable e) {
log.warn("Failed to obtain messages from queue.", e);
try {
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index a178bdf..3986837 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -141,7 +141,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
- if (checkConnected(ctx)) {
+ if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
@@ -150,7 +150,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
break;
case DISCONNECT:
- if (checkConnected(ctx)) {
+ if (checkConnected(ctx, msg)) {
processDisconnect(ctx);
}
break;
@@ -161,12 +161,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
- if (!checkConnected(ctx)) {
+ if (!checkConnected(ctx, mqttMsg)) {
return;
}
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().packetId();
- log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+ log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
@@ -248,7 +248,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
- if (!checkConnected(ctx)) {
+ if (!checkConnected(ctx, mqttMsg)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@@ -293,7 +293,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
- if (!checkConnected(ctx)) {
+ if (!checkConnected(ctx, mqttMsg)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@@ -336,6 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
+ log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
@@ -444,11 +445,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
}
- private boolean checkConnected(ChannelHandlerContext ctx) {
+ private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {
if (deviceSessionCtx.isConnected()) {
return true;
} else {
- log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
+ log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
ctx.close();
return false;
}
@@ -496,6 +497,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
transportService.registerAsyncSession(sessionInfo, this);
checkGatewaySession();
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ log.info("[{}] Client connected!", sessionId);
}
}
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index ac33ba6..37fb84b 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -119,10 +119,10 @@ public class GatewaySessionHandler {
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
+ transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
- transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
}
future.set(devices.get(deviceName));
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 7498e8e..d52a896 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSyntaxException;
+import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@@ -58,6 +59,8 @@ public class JsonConverter {
private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
private static final String DEVICE_PROPERTY = "device";
+ private static boolean isTypeCastEnabled = true;
+
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException {
long systemTs = System.currentTimeMillis();
PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
@@ -128,24 +131,22 @@ public class JsonConverter {
if (element.isJsonPrimitive()) {
JsonPrimitive value = element.getAsJsonPrimitive();
if (value.isString()) {
- result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
- .setStringV(value.getAsString()).build());
+ if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
+ try {
+ result.add(buildNumericKeyValueProto(value, valueEntry.getKey()));
+ } catch (RuntimeException th) {
+ result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
+ .setStringV(value.getAsString()).build());
+ }
+ } else {
+ result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
+ .setStringV(value.getAsString()).build());
+ }
} else if (value.isBoolean()) {
result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.BOOLEAN_V)
.setBoolV(value.getAsBoolean()).build());
} else if (value.isNumber()) {
- if (value.getAsString().contains(".")) {
- result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.DOUBLE_V)
- .setDoubleV(value.getAsDouble()).build());
- } else {
- try {
- long longValue = Long.parseLong(value.getAsString());
- result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.LONG_V)
- .setLongV(longValue).build());
- } catch (NumberFormatException e) {
- throw new JsonSyntaxException("Big integer values are not supported!");
- }
- }
+ result.add(buildNumericKeyValueProto(value, valueEntry.getKey()));
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
}
@@ -156,6 +157,24 @@ public class JsonConverter {
return result;
}
+ private static KeyValueProto buildNumericKeyValueProto(JsonPrimitive value, String key) {
+ if (value.getAsString().contains(".")) {
+ return KeyValueProto.newBuilder()
+ .setKey(key)
+ .setType(KeyValueType.DOUBLE_V)
+ .setDoubleV(value.getAsDouble())
+ .build();
+ } else {
+ try {
+ long longValue = Long.parseLong(value.getAsString());
+ return KeyValueProto.newBuilder().setKey(key).setType(KeyValueType.LONG_V)
+ .setLongV(longValue).build();
+ } catch (NumberFormatException e) {
+ throw new JsonSyntaxException("Big integer values are not supported!");
+ }
+ }
+ }
+
public static TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
JsonObject object = json.getAsJsonObject();
return TransportProtos.ToServerRpcRequestMsg.newBuilder().setRequestId(requestId).setMethodName(object.get("method").getAsString()).setParams(GSON.toJson(object.get("params"))).build();
@@ -370,7 +389,15 @@ public class JsonConverter {
if (element.isJsonPrimitive()) {
JsonPrimitive value = element.getAsJsonPrimitive();
if (value.isString()) {
- result.add(new StringDataEntry(valueEntry.getKey(), value.getAsString()));
+ if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
+ try {
+ parseNumericValue(result, valueEntry, value);
+ } catch (RuntimeException th) {
+ result.add(new StringDataEntry(valueEntry.getKey(), value.getAsString()));
+ }
+ } else {
+ result.add(new StringDataEntry(valueEntry.getKey(), value.getAsString()));
+ }
} else if (value.isBoolean()) {
result.add(new BooleanDataEntry(valueEntry.getKey(), value.getAsBoolean()));
} else if (value.isNumber()) {
@@ -426,5 +453,7 @@ public class JsonConverter {
}
}
-
+ public static void setTypeCastEnabled(boolean enabled) {
+ isTypeCastEnabled = enabled;
+ }
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java
new file mode 100644
index 0000000..ac3f4dc
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.adaptor;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@Slf4j
+public class JsonConverterConfig {
+
+ @Value("${transport.json.type_cast_enabled:true}")
+ public void setIsJsonTypeCastEnabled(boolean jsonTypeCastEnabled) {
+ JsonConverter.setTypeCastEnabled(jsonTypeCastEnabled);
+ log.info("JSON type cast enabled = {}", jsonTypeCastEnabled);
+ }
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index fad1954..4af594c 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -68,7 +68,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -76,7 +76,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -84,7 +84,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -92,7 +92,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -100,7 +100,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
doProcess(sessionInfo, msg, callback);
@@ -109,7 +109,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
doProcess(sessionInfo, msg, callback);
@@ -118,7 +118,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -126,7 +126,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -196,7 +196,10 @@ public abstract class AbstractTransportService implements TransportService {
}
@Override
- public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) {
+ public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
+ }
if (!rateLimitEnabled) {
return true;
}
@@ -206,6 +209,9 @@ public abstract class AbstractTransportService implements TransportService {
if (callback != null) {
callback.onError(new TbRateLimitsException(EntityType.TENANT));
}
+ if (log.isTraceEnabled()) {
+ log.trace("[{}][{}] Tenant level rate limit detected: {}", toId(sessionInfo), tenantId, msg);
+ }
return false;
}
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
@@ -214,8 +220,12 @@ public abstract class AbstractTransportService implements TransportService {
if (callback != null) {
callback.onError(new TbRateLimitsException(EntityType.DEVICE));
}
+ if (log.isTraceEnabled()) {
+ log.trace("[{}][{}] Device level rate limit detected: {}", toId(sessionInfo), deviceId, msg);
+ }
return false;
}
+
return true;
}
@@ -250,11 +260,11 @@ public abstract class AbstractTransportService implements TransportService {
}
}
- private UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
+ protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
}
- String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
+ protected String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 4b11bf5..5c123a6 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -197,6 +197,7 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -204,6 +205,7 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -211,6 +213,7 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
+ log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(),
TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -218,6 +221,9 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
+ }
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscriptionInfo(msg).build()
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 93c0f41..5763c35 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -45,7 +45,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
}
-
public boolean isConnected() {
return deviceInfo != null;
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 8944e94..bb9cb4e 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -43,7 +43,7 @@ public interface TransportService {
void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
- boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback);
+ boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
index da87b44..9c82866 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
@@ -43,6 +43,8 @@ public interface EntityViewService {
EntityView findEntityViewById(EntityViewId entityViewId);
+ EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name);
+
TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink);
TextPageData<EntityView> findEntityViewByTenantIdAndType(TenantId tenantId, TextPageLink pageLink, String type);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
index 2d94cc2..9f8949d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
@@ -29,8 +29,6 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Customer;
-import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
@@ -40,12 +38,10 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
-import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.customer.CustomerDao;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
@@ -56,15 +52,13 @@ import org.thingsboard.server.dao.tenant.TenantDao;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ENTITY_VIEW_CACHE;
-import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@@ -96,6 +90,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
@Caching(evict = {
@CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.entityId}"),
+ @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.name}"),
@CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.id}")})
@Override
public EntityView saveEntityView(EntityView entityView) {
@@ -137,6 +132,15 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
return entityViewDao.findById(entityViewId.getId());
}
+ @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #name}")
+ @Override
+ public EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name) {
+ log.trace("Executing findEntityViewByTenantIdAndName [{}][{}]", tenantId, name);
+ validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
+ Optional<EntityView> entityViewOpt = entityViewDao.findEntityViewByTenantIdAndName(tenantId.getId(), name);
+ return entityViewOpt.orElse(null);
+ }
+
@Override
public TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink) {
log.trace("Executing findEntityViewsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);
@@ -255,6 +259,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
deleteEntityRelations(entityViewId);
EntityView entityView = entityViewDao.findById(entityViewId.getId());
cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getEntityId()));
+ cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getName()));
entityViewDao.removeById(entityViewId.getId());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
index 3001e12..1c9f17a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvCompositeKey.java
@@ -20,14 +20,29 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.EntityType;
+import javax.persistence.Column;
+import javax.persistence.Embeddable;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
import java.io.Serializable;
+import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_KEY_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_TYPE_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
+import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
+
@Data
@AllArgsConstructor
@NoArgsConstructor
+@Embeddable
public class AttributeKvCompositeKey implements Serializable {
+ @Enumerated(EnumType.STRING)
+ @Column(name = ENTITY_TYPE_COLUMN)
private EntityType entityType;
+ @Column(name = ENTITY_ID_COLUMN)
private String entityId;
+ @Column(name = ATTRIBUTE_TYPE_COLUMN)
private String attributeType;
+ @Column(name = ATTRIBUTE_KEY_COLUMN)
private String attributeKey;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
index 587a314..515c86c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.model.ToData;
import javax.persistence.Column;
+import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
@@ -48,25 +49,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUM
@Data
@Entity
@Table(name = "attribute_kv")
-@IdClass(AttributeKvCompositeKey.class)
public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
- @Id
- @Enumerated(EnumType.STRING)
- @Column(name = ENTITY_TYPE_COLUMN)
- private EntityType entityType;
-
- @Id
- @Column(name = ENTITY_ID_COLUMN)
- private String entityId;
-
- @Id
- @Column(name = ATTRIBUTE_TYPE_COLUMN)
- private String attributeType;
-
- @Id
- @Column(name = ATTRIBUTE_KEY_COLUMN)
- private String attributeKey;
+ @EmbeddedId
+ private AttributeKvCompositeKey id;
@Column(name = BOOLEAN_VALUE_COLUMN)
private Boolean booleanValue;
@@ -87,13 +73,13 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable
public AttributeKvEntry toData() {
KvEntry kvEntry = null;
if (strValue != null) {
- kvEntry = new StringDataEntry(attributeKey, strValue);
+ kvEntry = new StringDataEntry(id.getAttributeKey(), strValue);
} else if (booleanValue != null) {
- kvEntry = new BooleanDataEntry(attributeKey, booleanValue);
+ kvEntry = new BooleanDataEntry(id.getAttributeKey(), booleanValue);
} else if (doubleValue != null) {
- kvEntry = new DoubleDataEntry(attributeKey, doubleValue);
+ kvEntry = new DoubleDataEntry(id.getAttributeKey(), doubleValue);
} else if (longValue != null) {
- kvEntry = new LongDataEntry(attributeKey, longValue);
+ kvEntry = new LongDataEntry(id.getAttributeKey(), longValue);
}
return new BaseAttributeKvEntry(kvEntry, lastUpdateTs);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
index c76cefe..d716886 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
@@ -15,7 +15,9 @@
*/
package org.thingsboard.server.dao.sql.attributes;
+import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
@@ -26,8 +28,11 @@ import java.util.List;
@SqlDao
public interface AttributeKvRepository extends CrudRepository<AttributeKvEntity, AttributeKvCompositeKey> {
- List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(EntityType entityType,
- String entityId,
- String attributeType);
+ @Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityType = :entityType " +
+ "AND a.id.entityId = :entityId " +
+ "AND a.id.attributeType = :attributeType")
+ List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(@Param("entityType") EntityType entityType,
+ @Param("entityId") String entityId,
+ @Param("attributeType") String attributeType);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
index 0dabf4c..4ac0c0c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
@@ -79,10 +79,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
@Override
public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
AttributeKvEntity entity = new AttributeKvEntity();
- entity.setEntityType(entityId.getEntityType());
- entity.setEntityId(fromTimeUUID(entityId.getId()));
- entity.setAttributeType(attributeType);
- entity.setAttributeKey(attribute.getKey());
+ entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, attribute.getKey()));
entity.setLastUpdateTs(attribute.getLastUpdateTs());
entity.setStrValue(attribute.getStrValue().orElse(null));
entity.setDoubleValue(attribute.getDoubleValue().orElse(null));
@@ -100,10 +97,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
.stream()
.map(key -> {
AttributeKvEntity entityToDelete = new AttributeKvEntity();
- entityToDelete.setEntityType(entityId.getEntityType());
- entityToDelete.setEntityId(fromTimeUUID(entityId.getId()));
- entityToDelete.setAttributeType(attributeType);
- entityToDelete.setAttributeKey(key);
+ entityToDelete.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, key));
return entityToDelete;
}).collect(Collectors.toList());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 5bd9175..04227a3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -31,6 +32,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.UUIDConverter;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@@ -41,9 +43,9 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity;
import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
+import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.timeseries.TsInsertExecutorType;
-import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import javax.annotation.Nullable;
@@ -53,6 +55,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -64,6 +67,8 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
@SqlTsDao
public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
+ private static final String DESC_ORDER = "DESC";
+
@Value("${sql.ts_inserts_executor_type}")
private String insertExecutorType;
@@ -326,14 +331,72 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Override
public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
- TsKvLatestEntity latestEntity = new TsKvLatestEntity();
- latestEntity.setEntityType(entityId.getEntityType());
- latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
- latestEntity.setKey(query.getKey());
- return service.submit(() -> {
- tsKvLatestRepository.delete(latestEntity);
- return null;
+ ListenableFuture<TsKvEntry> latestFuture = findLatest(entityId, query.getKey());
+
+ ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
+ long ts = tsKvEntry.getTs();
+ return ts > query.getStartTs() && ts <= query.getEndTs();
+ }, service);
+
+ ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ TsKvLatestEntity latestEntity = new TsKvLatestEntity();
+ latestEntity.setEntityType(entityId.getEntityType());
+ latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
+ latestEntity.setKey(query.getKey());
+ return service.submit(() -> {
+ tsKvLatestRepository.delete(latestEntity);
+ return null;
+ });
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ if (query.getRewriteLatestIfDeleted()) {
+ ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ return getNewLatestEntryFuture(entityId, query);
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ try {
+ resultFuture.set(savedLatestFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
+ }
+ } else {
+ resultFuture.set(null);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("[{}] Failed to process remove of the latest value", entityId, t);
+ }
});
+ return resultFuture;
+ }
+
+ private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
+ long startTs = 0;
+ long endTs = query.getStartTs() - 1;
+ ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
+ Aggregation.NONE, DESC_ORDER);
+ ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
+
+ return Futures.transformAsync(future, entryList -> {
+ if (entryList.size() == 1) {
+ return saveLatest(entityId, entryList.get(0));
+ } else {
+ log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
+ }
+ return Futures.immediateFuture(null);
+ }, service);
}
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 4c743e5..296d173 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -47,7 +47,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Modifying
@Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
- "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
void delete(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String key,
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 4658cbe..1006499 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -20,11 +20,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
+import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@@ -47,8 +49,11 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
@Slf4j
public class BaseTimeseriesService implements TimeseriesService {
- public static final int INSERTS_PER_ENTRY = 3;
- public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
+ private static final int INSERTS_PER_ENTRY = 3;
+ private static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
+
+ @Value("${database.ts_max_intervals}")
+ private long maxTsIntervals;
@Autowired
private TimeseriesDao timeseriesDao;
@@ -59,7 +64,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override
public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries) {
validate(entityId);
- queries.forEach(BaseTimeseriesService::validate);
+ queries.forEach(this::validate);
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId);
List<ReadTsKvQuery> filteredQueries =
@@ -189,7 +194,7 @@ public class BaseTimeseriesService implements TimeseriesService {
Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
}
- private static void validate(ReadTsKvQuery query) {
+ private void validate(ReadTsKvQuery query) {
if (query == null) {
throw new IncorrectParameterException("ReadTsKvQuery can't be null");
} else if (isBlank(query.getKey())) {
@@ -197,6 +202,14 @@ public class BaseTimeseriesService implements TimeseriesService {
} else if (query.getAggregation() == null) {
throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
}
+ if(!Aggregation.NONE.equals(query.getAggregation())) {
+ long step = Math.max(query.getInterval(), 1000);
+ long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
+ if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
+ throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
+ "Please increase 'interval' parameter for your query or reduce the time range of the query.");
+ }
+ }
}
private static void validate(DeleteTsKvQuery query) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 709bfd5..fdc69f9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -48,7 +48,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
-import org.thingsboard.server.dao.util.NoSqlDao;
import org.thingsboard.server.dao.util.NoSqlTsDao;
import javax.annotation.Nullable;
@@ -62,6 +61,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -434,14 +434,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
- ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
+ ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
long ts = latestEntry.getTs();
- if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
- return Futures.immediateFuture(true);
+ if (ts > query.getStartTs() && ts <= query.getEndTs()) {
+ return true;
} else {
log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
}
- return Futures.immediateFuture(false);
+ return false;
}, readResultsProcessingExecutor);
ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
@@ -451,18 +451,34 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return Futures.immediateFuture(null);
}, readResultsProcessingExecutor);
- if (query.getRewriteLatestIfDeleted()) {
- ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
- if (isRemove) {
- return getNewLatestEntryFuture(entityId, query);
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ if (query.getRewriteLatestIfDeleted()) {
+ ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ return getNewLatestEntryFuture(entityId, query);
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+
+ try {
+ resultFuture.set(savedLatestFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
+ }
+ } else {
+ resultFuture.set(null);
}
- return Futures.immediateFuture(null);
- }, readResultsProcessingExecutor);
+ }
- return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
- list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
- }
- return removedLatestFuture;
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("[{}] Failed to process remove of the latest value", entityId, t);
+ }
+ });
+ return resultFuture;
}
private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index 88f4d84..81de40a 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -152,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
@Test
- public void testDeleteDeviceTsData() throws Exception {
+ public void testDeleteDeviceTsDataWithoutOverwritingLatest() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
saveEntries(deviceId, 10000);
@@ -172,6 +172,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
@Test
+ public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception {
+ DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+
+ saveEntries(deviceId, 10000);
+ saveEntries(deviceId, 20000);
+ saveEntries(deviceId, 30000);
+ saveEntries(deviceId, 40000);
+
+ tsService.remove(deviceId, Collections.singletonList(
+ new BaseDeleteTsKvQuery(STRING_KEY, 25000, 45000, true))).get();
+
+ List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
+ new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
+ Assert.assertEquals(2, list.size());
+
+ List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
+ Assert.assertEquals(20000, latest.get(0).getTs());
+ }
+
+ @Test
public void testFindDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
List<TsKvEntry> entries = new ArrayList<>();
diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties
index a61c285..3bd0f7d 100644
--- a/dao/src/test/resources/application-test.properties
+++ b/dao/src/test/resources/application-test.properties
@@ -35,4 +35,4 @@ redis.connection.port=6379
redis.connection.db=0
redis.connection.password=
-rule.queue.type=memory
+database.ts_max_intervals=700
\ No newline at end of file
docker/.env 2(+1 -1)
diff --git a/docker/.env b/docker/.env
index c03845d..ef0bf40 100644
--- a/docker/.env
+++ b/docker/.env
@@ -15,4 +15,4 @@ TB_VERSION=latest
DATABASE=postgres
-KAFKA_TOPICS="js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1"
+LOAD_BALANCER_NAME=haproxy-certbot
docker/.gitignore 1(+1 -0)
diff --git a/docker/.gitignore b/docker/.gitignore
index bc09e84..eee422d 100644
--- a/docker/.gitignore
+++ b/docker/.gitignore
@@ -4,4 +4,5 @@ tb-node/log/**
tb-node/db/**
tb-node/postgres/**
tb-node/cassandra/**
+tb-transports/*/log
!.env
docker/docker-compose.postgres.volumes.yml 60(+60 -0)
diff --git a/docker/docker-compose.postgres.volumes.yml b/docker/docker-compose.postgres.volumes.yml
new file mode 100644
index 0000000..a1dbaa5
--- /dev/null
+++ b/docker/docker-compose.postgres.volumes.yml
@@ -0,0 +1,60 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.2'
+
+services:
+ postgres:
+ volumes:
+ - postgres-db-volume:/var/lib/postgresql/data
+ tb1:
+ volumes:
+ - tb-log-volume:/var/log/thingsboard
+ tb2:
+ volumes:
+ - tb-log-volume:/var/log/thingsboard
+ tb-coap-transport:
+ volumes:
+ - tb-coap-transport-log-volume:/var/log/tb-coap-transport
+ tb-http-transport1:
+ volumes:
+ - tb-http-transport-log-volume:/var/log/tb-http-transport
+ tb-http-transport2:
+ volumes:
+ - tb-http-transport-log-volume:/var/log/tb-http-transport
+ tb-mqtt-transport1:
+ volumes:
+ - tb-mqtt-transport-log-volume:/var/log/tb-mqtt-transport
+ tb-mqtt-transport2:
+ volumes:
+ - tb-mqtt-transport-log-volume:/var/log/tb-mqtt-transport
+
+volumes:
+ postgres-db-volume:
+ external: true
+ name: ${POSTGRES_DATA_VOLUME}
+ tb-log-volume:
+ external: true
+ name: ${TB_LOG_VOLUME}
+ tb-coap-transport-log-volume:
+ external: true
+ name: ${TB_COAP_TRANSPORT_LOG_VOLUME}
+ tb-http-transport-log-volume:
+ external: true
+ name: ${TB_HTTP_TRANSPORT_LOG_VOLUME}
+ tb-mqtt-transport-log-volume:
+ external: true
+ name: ${TB_MQTT_TRANSPORT_LOG_VOLUME}
docker/docker-compose.yml 31(+28 -3)
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 1741db3..267d560 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -64,6 +64,7 @@ services:
depends_on:
- kafka
- redis
+ - tb-js-executor
tb2:
restart: always
image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
@@ -84,13 +85,19 @@ services:
depends_on:
- kafka
- redis
+ - tb-js-executor
tb-mqtt-transport1:
restart: always
image: "${DOCKER_REPO}/${MQTT_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"
ports:
- "1883"
+ environment:
+ TB_HOST: tb-mqtt-transport1
env_file:
- tb-mqtt-transport.env
+ volumes:
+ - ./tb-transports/mqtt/conf:/config
+ - ./tb-transports/mqtt/log:/var/log/tb-mqtt-transport
depends_on:
- kafka
tb-mqtt-transport2:
@@ -98,8 +105,13 @@ services:
image: "${DOCKER_REPO}/${MQTT_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"
ports:
- "1883"
+ environment:
+ TB_HOST: tb-mqtt-transport2
env_file:
- tb-mqtt-transport.env
+ volumes:
+ - ./tb-transports/mqtt/conf:/config
+ - ./tb-transports/mqtt/log:/var/log/tb-mqtt-transport
depends_on:
- kafka
tb-http-transport1:
@@ -107,8 +119,13 @@ services:
image: "${DOCKER_REPO}/${HTTP_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"
ports:
- "8081"
+ environment:
+ TB_HOST: tb-http-transport1
env_file:
- tb-http-transport.env
+ volumes:
+ - ./tb-transports/http/conf:/config
+ - ./tb-transports/http/log:/var/log/tb-http-transport
depends_on:
- kafka
tb-http-transport2:
@@ -116,8 +133,13 @@ services:
image: "${DOCKER_REPO}/${HTTP_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"
ports:
- "8081"
+ environment:
+ TB_HOST: tb-http-transport2
env_file:
- tb-http-transport.env
+ volumes:
+ - ./tb-transports/http/conf:/config
+ - ./tb-transports/http/log:/var/log/tb-http-transport
depends_on:
- kafka
tb-coap-transport:
@@ -125,8 +147,13 @@ services:
image: "${DOCKER_REPO}/${COAP_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"
ports:
- "5683:5683/udp"
+ environment:
+ TB_HOST: tb-coap-transport
env_file:
- tb-coap-transport.env
+ volumes:
+ - ./tb-transports/coap/conf:/config
+ - ./tb-transports/coap/log:/var/log/tb-coap-transport
depends_on:
- kafka
tb-web-ui1:
@@ -145,7 +172,7 @@ services:
- tb-web-ui.env
haproxy:
restart: always
- container_name: haproxy-certbot
+ container_name: "${LOAD_BALANCER_NAME}"
image: xalauc/haproxy-certbot:1.7.9
volumes:
- ./haproxy/config:/config
@@ -153,7 +180,6 @@ services:
- ./haproxy/certs.d:/usr/local/etc/haproxy/certs.d
ports:
- "80:80"
- - "8080"
- "443:443"
- "1883:1883"
- "9999:9999"
@@ -163,7 +189,6 @@ services:
HTTP_PORT: 80
HTTPS_PORT: 443
MQTT_PORT: 1883
- TB_API_PORT: 8080
FORCE_HTTPS_REDIRECT: "false"
links:
- tb1
docker/docker-install-tb.sh 2(+0 -2)
diff --git a/docker/docker-install-tb.sh b/docker/docker-install-tb.sh
index 9032e2f..3816229 100755
--- a/docker/docker-install-tb.sh
+++ b/docker/docker-install-tb.sh
@@ -15,8 +15,6 @@
# limitations under the License.
#
-./check-dirs.sh
-
while [[ $# -gt 0 ]]
do
key="$1"
docker/docker-start-services.sh 2(+0 -2)
diff --git a/docker/docker-start-services.sh b/docker/docker-start-services.sh
index 8437695..452a823 100755
--- a/docker/docker-start-services.sh
+++ b/docker/docker-start-services.sh
@@ -15,8 +15,6 @@
# limitations under the License.
#
-./check-dirs.sh
-
set -e
source compose-utils.sh
docker/docker-upgrade-tb.sh 2(+0 -2)
diff --git a/docker/docker-upgrade-tb.sh b/docker/docker-upgrade-tb.sh
index d83b1b7..6f460f3 100755
--- a/docker/docker-upgrade-tb.sh
+++ b/docker/docker-upgrade-tb.sh
@@ -15,8 +15,6 @@
# limitations under the License.
#
-./check-dirs.sh
-
for i in "$@"
do
case $i in
docker/haproxy/config/haproxy.cfg 20(+14 -6)
diff --git a/docker/haproxy/config/haproxy.cfg b/docker/haproxy/config/haproxy.cfg
index 8ba0ce5..50566c9 100644
--- a/docker/haproxy/config/haproxy.cfg
+++ b/docker/haproxy/config/haproxy.cfg
@@ -56,13 +56,20 @@ frontend http-in
reqadd X-Forwarded-Proto:\ http
+ acl acl_static path_beg /static/ /index.html
+ acl acl_static path /
+ acl acl_static_rulenode path_beg /static/rulenode/
+
acl transport_http_acl path_beg /api/v1/
acl letsencrypt_http_acl path_beg /.well-known/acme-challenge/
+
redirect scheme https if !letsencrypt_http_acl !transport_http_acl { env(FORCE_HTTPS_REDIRECT) -m str true }
+
use_backend letsencrypt_http if letsencrypt_http_acl
use_backend tb-http-backend if transport_http_acl
+ use_backend tb-web-backend if acl_static !acl_static_rulenode
- default_backend tb-web-backend
+ default_backend tb-api-backend
frontend https_in
bind *:${HTTPS_PORT} ssl crt /usr/local/etc/haproxy/default.pem crt /usr/local/etc/haproxy/certs.d ciphers ECDHE-RSA-AES256-SHA:RC4-SHA:RC4:HIGH:!MD5:!aNULL:!EDH:!AESGCM
@@ -72,14 +79,15 @@ frontend https_in
reqadd X-Forwarded-Proto:\ https
acl transport_http_acl path_beg /api/v1/
- use_backend tb-http-backend if transport_http_acl
- default_backend tb-web-backend
+ acl acl_static path_beg /static/ /index.html
+ acl acl_static path /
+ acl acl_static_rulenode path_beg /static/rulenode/
-frontend http-api-in
- bind *:${TB_API_PORT}
+ use_backend tb-http-backend if transport_http_acl
+ use_backend tb-web-backend if acl_static !acl_static_rulenode
- default_backend tb-api-backend
+ default_backend tb-api-backend
backend letsencrypt_http
server letsencrypt_http_srv 127.0.0.1:8080
docker/kafka.env 2(+1 -1)
diff --git a/docker/kafka.env b/docker/kafka.env
index 69fbdf6..87dad07 100644
--- a/docker/kafka.env
+++ b/docker/kafka.env
@@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092
KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
-KAFKA_CREATE_TOPICS=${KAFKA_TOPICS}
+KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1
KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_LOG_RETENTION_BYTES=1073741824
KAFKA_LOG_SEGMENT_BYTES=268435456
docker/tb-node/conf/logback.xml 2(+1 -1)
diff --git a/docker/tb-node/conf/logback.xml b/docker/tb-node/conf/logback.xml
index 1c69f53..6ec2d0b 100644
--- a/docker/tb-node/conf/logback.xml
+++ b/docker/tb-node/conf/logback.xml
@@ -24,7 +24,7 @@
<file>/var/log/thingsboard/${TB_HOST}/thingsboard.log</file>
<rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
- <fileNamePattern>/var/log/thingsboard/thingsboard.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+ <fileNamePattern>/var/log/thingsboard/${TB_HOST}/thingsboard.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>30</maxHistory>
<totalSizeCap>3GB</totalSizeCap>
docker/tb-node/conf/thingsboard.conf 2(+1 -1)
diff --git a/docker/tb-node/conf/thingsboard.conf b/docker/tb-node/conf/thingsboard.conf
index aa430b4..392b330 100644
--- a/docker/tb-node/conf/thingsboard.conf
+++ b/docker/tb-node/conf/thingsboard.conf
@@ -15,7 +15,7 @@
#
export JAVA_OPTS="$JAVA_OPTS -Dplatform=deb -Dinstall.data_dir=/usr/share/thingsboard/data"
-export JAVA_OPTS="$JAVA_OPTS -Xloggc:/var/log/thingsboard/${TB_HOST}/gc.log -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
+export JAVA_OPTS="$JAVA_OPTS -Xloggc:/var/log/thingsboard/${TB_HOST}/gc.log -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/thingsboard/${TB_HOST}/heapdump.bin -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
export JAVA_OPTS="$JAVA_OPTS -XX:+PrintHeapAtGC -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10"
export JAVA_OPTS="$JAVA_OPTS -XX:GCLogFileSize=10M -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark"
export JAVA_OPTS="$JAVA_OPTS -XX:CMSWaitDuration=10000 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSParallelInitialMarkEnabled"
docker/tb-web-ui.env 3(+1 -2)
diff --git a/docker/tb-web-ui.env b/docker/tb-web-ui.env
index dbf8120..3916337 100644
--- a/docker/tb-web-ui.env
+++ b/docker/tb-web-ui.env
@@ -1,8 +1,7 @@
HTTP_BIND_ADDRESS=0.0.0.0
HTTP_BIND_PORT=8080
-TB_HOST=haproxy
-TB_PORT=8080
+TB_ENABLE_PROXY=false
LOGGER_LEVEL=info
LOG_FOLDER=logs
LOGGER_FILENAME=tb-web-ui-%DATE%.log
msa/black-box-tests/pom.xml 110(+110 -0)
diff --git a/msa/black-box-tests/pom.xml b/msa/black-box-tests/pom.xml
new file mode 100644
index 0000000..9f5ac09
--- /dev/null
+++ b/msa/black-box-tests/pom.xml
@@ -0,0 +1,110 @@
+<!--
+
+ Copyright © 2016-2018 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.thingsboard</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ <artifactId>msa</artifactId>
+ </parent>
+ <groupId>org.thingsboard.msa</groupId>
+ <artifactId>black-box-tests</artifactId>
+
+ <name>ThingsBoard Black Box Tests</name>
+ <url>https://thingsboard.io</url>
+ <description>Project for ThingsBoard black box testing with using Docker</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <main.dir>${basedir}/../..</main.dir>
+ <blackBoxTests.skip>true</blackBoxTests.skip>
+ <testcontainers.version>1.9.1</testcontainers.version>
+ <zeroturnaround.version>1.10</zeroturnaround.version>
+ <java-websocket.version>1.3.9</java-websocket.version>
+ <httpclient.version>4.5.6</httpclient.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.zeroturnaround</groupId>
+ <artifactId>zt-exec</artifactId>
+ <version>${zeroturnaround.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.java-websocket</groupId>
+ <artifactId>Java-WebSocket</artifactId>
+ <version>${java-websocket.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.takari.junit</groupId>
+ <artifactId>takari-cpsuite</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.thingsboard</groupId>
+ <artifactId>netty-mqtt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.thingsboard</groupId>
+ <artifactId>tools</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ </includes>
+ <skipTests>${blackBoxTests.skip}</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
msa/black-box-tests/README.md 23(+23 -0)
diff --git a/msa/black-box-tests/README.md b/msa/black-box-tests/README.md
new file mode 100644
index 0000000..044fe00
--- /dev/null
+++ b/msa/black-box-tests/README.md
@@ -0,0 +1,23 @@
+
+## Black box tests execution
+To run the black box tests with using Docker, the local Docker images of Thingsboard's microservices should be built. <br />
+- Build the local Docker images in the directory with the Thingsboard's main [pom.xml](./../../pom.xml):
+
+ mvn clean install -Ddockerfile.skip=false
+- Verify that the new local images were built:
+
+ docker image ls
+As result, in REPOSITORY column, next images should be present:
+
+ thingsboard/tb-coap-transport
+ thingsboard/tb-http-transport
+ thingsboard/tb-mqtt-transport
+ thingsboard/tb-node
+ thingsboard/tb-web-ui
+ thingsboard/tb-js-executor
+
+- Run the black box tests in the [msa/black-box-tests](../black-box-tests) directory:
+
+ mvn clean install -DblackBoxTests.skip=false
+
+
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
new file mode 100644
index 0000000..4c3ac83
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
@@ -0,0 +1,206 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.conn.ssl.X509HostnameVerifier;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.junit.*;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.thingsboard.client.tools.RestClient;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
+
+import javax.net.ssl.*;
+import java.net.URI;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+@Slf4j
+public abstract class AbstractContainerTest {
+ protected static final String HTTPS_URL = "https://localhost";
+ protected static final String WSS_URL = "wss://localhost";
+ protected static RestClient restClient;
+ protected ObjectMapper mapper = new ObjectMapper();
+
+ @BeforeClass
+ public static void before() throws Exception {
+ restClient = new RestClient(HTTPS_URL);
+ restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
+ }
+
+ @Rule
+ public TestRule watcher = new TestWatcher() {
+ protected void starting(Description description) {
+ log.info("=================================================");
+ log.info("STARTING TEST: {}" , description.getMethodName());
+ log.info("=================================================");
+ }
+
+ /**
+ * Invoked when a test succeeds
+ */
+ protected void succeeded(Description description) {
+ log.info("=================================================");
+ log.info("SUCCEEDED TEST: {}" , description.getMethodName());
+ log.info("=================================================");
+ }
+
+ /**
+ * Invoked when a test fails
+ */
+ protected void failed(Throwable e, Description description) {
+ log.info("=================================================");
+ log.info("FAILED TEST: {}" , description.getMethodName(), e);
+ log.info("=================================================");
+ }
+ };
+
+ protected Device createDevice(String name) {
+ return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
+ }
+
+ protected WsClient subscribeToWebSocket(DeviceId deviceId, String scope, CmdsType property) throws Exception {
+ WsClient wsClient = new WsClient(new URI(WSS_URL + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
+ SSLContextBuilder builder = SSLContexts.custom();
+ builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
+ wsClient.setSocket(builder.build().getSocketFactory().createSocket());
+ wsClient.connectBlocking();
+
+ JsonObject cmdsObject = new JsonObject();
+ cmdsObject.addProperty("entityType", EntityType.DEVICE.name());
+ cmdsObject.addProperty("entityId", deviceId.toString());
+ cmdsObject.addProperty("scope", scope);
+ cmdsObject.addProperty("cmdId", new Random().nextInt(100));
+
+ JsonArray cmd = new JsonArray();
+ cmd.add(cmdsObject);
+ JsonObject wsRequest = new JsonObject();
+ wsRequest.add(property.toString(), cmd);
+ wsClient.send(wsRequest.toString());
+ wsClient.waitForFirstReply();
+ return wsClient;
+ }
+
+ protected Map<String, Long> getExpectedLatestValues(long ts) {
+ return ImmutableMap.<String, Long>builder()
+ .put("booleanKey", ts)
+ .put("stringKey", ts)
+ .put("doubleKey", ts)
+ .put("longKey", ts)
+ .build();
+ }
+
+ protected boolean verify(WsTelemetryResponse wsTelemetryResponse, String key, Long expectedTs, String expectedValue) {
+ List<Object> list = wsTelemetryResponse.getDataValuesByKey(key);
+ return expectedTs.equals(list.get(0)) && expectedValue.equals(list.get(1));
+ }
+
+ protected boolean verify(WsTelemetryResponse wsTelemetryResponse, String key, String expectedValue) {
+ List<Object> list = wsTelemetryResponse.getDataValuesByKey(key);
+ return expectedValue.equals(list.get(1));
+ }
+
+ protected JsonObject createPayload(long ts) {
+ JsonObject values = createPayload();
+ JsonObject payload = new JsonObject();
+ payload.addProperty("ts", ts);
+ payload.add("values", values);
+ return payload;
+ }
+
+ protected JsonObject createPayload() {
+ JsonObject values = new JsonObject();
+ values.addProperty("stringKey", "value1");
+ values.addProperty("booleanKey", true);
+ values.addProperty("doubleKey", 42.0);
+ values.addProperty("longKey", 73L);
+
+ return values;
+ }
+
+ protected enum CmdsType {
+ TS_SUB_CMDS("tsSubCmds"),
+ HISTORY_CMDS("historyCmds"),
+ ATTR_SUB_CMDS("attrSubCmds");
+
+ private final String text;
+
+ CmdsType(final String text) {
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return text;
+ }
+ }
+
+ private static HttpComponentsClientHttpRequestFactory getRequestFactoryForSelfSignedCert() throws Exception {
+ SSLContextBuilder builder = SSLContexts.custom();
+ builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
+ SSLContext sslContext = builder.build();
+ SSLConnectionSocketFactory sslSelfSigned = new SSLConnectionSocketFactory(sslContext, new X509HostnameVerifier() {
+ @Override
+ public void verify(String host, SSLSocket ssl) {
+ }
+
+ @Override
+ public void verify(String host, X509Certificate cert) {
+ }
+
+ @Override
+ public void verify(String host, String[] cns, String[] subjectAlts) {
+ }
+
+ @Override
+ public boolean verify(String s, SSLSession sslSession) {
+ return true;
+ }
+ });
+
+ Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
+ .<ConnectionSocketFactory>create()
+ .register("https", sslSelfSigned)
+ .build();
+
+ PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(cm).build();
+ return new HttpComponentsClientHttpRequestFactory(httpClient);
+ }
+
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
new file mode 100644
index 0000000..a6e89de
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa.connectivity;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.http.ResponseEntity;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.msa.AbstractContainerTest;
+import org.thingsboard.server.msa.WsClient;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
+
+public class HttpClientTest extends AbstractContainerTest {
+
+ @Test
+ public void telemetryUpload() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+
+ Device device = createDevice("http_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+ ResponseEntity deviceTelemetryResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/v1/{credentialsId}/telemetry",
+ mapper.readTree(createPayload().toString()),
+ ResponseEntity.class,
+ deviceCredentials.getCredentialsId());
+ Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());
+ WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ wsClient.closeBlocking();
+
+ Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
+ actualLatestTelemetry.getLatestValues().keySet());
+
+ Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
+ Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
+ Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
+ Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
new file mode 100644
index 0000000..9918963
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
@@ -0,0 +1,400 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa.connectivity;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.JsonObject;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.*;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.thingsboard.mqtt.MqttClient;
+import org.thingsboard.mqtt.MqttClientConfig;
+import org.thingsboard.mqtt.MqttHandler;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.page.TextPageData;
+import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.msa.AbstractContainerTest;
+import org.thingsboard.server.msa.WsClient;
+import org.thingsboard.server.msa.mapper.AttributesResponse;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.*;
+
+@Slf4j
+public class MqttClientTest extends AbstractContainerTest {
+
+ @Test
+ public void telemetryUpload() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+ MqttClient mqttClient = getMqttClient(deviceCredentials, null);
+ mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
+ WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ log.info("Received telemetry: {}", actualLatestTelemetry);
+ wsClient.closeBlocking();
+
+ Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+ Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
+ actualLatestTelemetry.getLatestValues().keySet());
+
+ Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
+ Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
+ Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
+ Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void telemetryUploadWithTs() throws Exception {
+ long ts = 1451649600512L;
+
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+ MqttClient mqttClient = getMqttClient(deviceCredentials, null);
+ mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
+ WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ log.info("Received telemetry: {}", actualLatestTelemetry);
+ wsClient.closeBlocking();
+
+ Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+ Assert.assertEquals(getExpectedLatestValues(ts), actualLatestTelemetry.getLatestValues());
+
+ Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", ts, Boolean.TRUE.toString()));
+ Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", ts, "value1"));
+ Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", ts, Double.toString(42.0)));
+ Assert.assertTrue(verify(actualLatestTelemetry, "longKey", ts, Long.toString(73)));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void publishAttributeUpdateToServer() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ JsonObject clientAttributes = new JsonObject();
+ clientAttributes.addProperty("attr1", "value1");
+ clientAttributes.addProperty("attr2", true);
+ clientAttributes.addProperty("attr3", 42.0);
+ clientAttributes.addProperty("attr4", 73);
+ mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+ WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
+ log.info("Received telemetry: {}", actualLatestTelemetry);
+ wsClient.closeBlocking();
+
+ Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+ Assert.assertEquals(Sets.newHashSet("attr1", "attr2", "attr3", "attr4"),
+ actualLatestTelemetry.getLatestValues().keySet());
+
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr1", "value1"));
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr2", Boolean.TRUE.toString()));
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr3", Double.toString(42.0)));
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73)));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void requestAttributeValuesFromServer() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+
+ // Add a new client attribute
+ JsonObject clientAttributes = new JsonObject();
+ String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ clientAttributes.addProperty("clientAttr", clientAttributeValue);
+ mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+
+ // Add a new shared attribute
+ JsonObject sharedAttributes = new JsonObject();
+ String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ sharedAttributes.addProperty("sharedAttr", sharedAttributeValue);
+ ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+ mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
+ device.getId());
+ Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+ // Subscribe to attributes response
+ mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE);
+ // Request attributes
+ JsonObject request = new JsonObject();
+ request.addProperty("clientKeys", "clientAttr");
+ request.addProperty("sharedKeys", "sharedAttr");
+ mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
+ MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+ AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
+ log.info("Received telemetry: {}", attributes);
+
+ Assert.assertEquals(1, attributes.getClient().size());
+ Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
+
+ Assert.assertEquals(1, attributes.getShared().size());
+ Assert.assertEquals(sharedAttributeValue, attributes.getShared().get("sharedAttr"));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void subscribeToAttributeUpdatesFromServer() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE);
+
+ String sharedAttributeName = "sharedAttr";
+
+ // Add a new shared attribute
+ JsonObject sharedAttributes = new JsonObject();
+ String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue);
+ ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+ mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
+ device.getId());
+ Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+ MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+ Assert.assertEquals(sharedAttributeValue,
+ mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
+
+ // Update the shared attribute value
+ JsonObject updatedSharedAttributes = new JsonObject();
+ String updatedSharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ updatedSharedAttributes.addProperty(sharedAttributeName, updatedSharedAttributeValue);
+ ResponseEntity updatedSharedAttributesResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+ mapper.readTree(updatedSharedAttributes.toString()), ResponseEntity.class,
+ device.getId());
+ Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+ event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+ Assert.assertEquals(updatedSharedAttributeValue,
+ mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void serverSideRpc() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE);
+
+ // Send an RPC from the server
+ JsonObject serverRpcPayload = new JsonObject();
+ serverRpcPayload.addProperty("method", "getValue");
+ serverRpcPayload.addProperty("params", true);
+ ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ ListenableFuture<ResponseEntity> future = service.submit(() -> {
+ try {
+ return restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/rpc/twoway/{deviceId}",
+ mapper.readTree(serverRpcPayload.toString()), String.class,
+ device.getId());
+ } catch (IOException e) {
+ return ResponseEntity.badRequest().build();
+ }
+ });
+
+ // Wait for RPC call from the server and send the response
+ MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
+
+ Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage());
+
+ Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length()));
+ JsonObject clientResponse = new JsonObject();
+ clientResponse.addProperty("response", "someResponse");
+ // Send a response to the server's RPC request
+ mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes()));
+
+ ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
+ Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
+ Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void clientSideRpc() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE);
+
+ // Get the default rule chain id to make it root again after test finished
+ RuleChainId defaultRuleChainId = getDefaultRuleChainId();
+
+ // Create a new root rule chain
+ RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
+
+ TimeUnit.SECONDS.sleep(3);
+ // Send the request to the server
+ JsonObject clientRequest = new JsonObject();
+ clientRequest.addProperty("method", "getResponse");
+ clientRequest.addProperty("params", true);
+ Integer requestId = 42;
+ mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes()));
+
+ // Check the response from the server
+ TimeUnit.SECONDS.sleep(1);
+ MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS);
+ Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
+ Assert.assertEquals(requestId, responseId);
+ Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText());
+
+ // Make the default rule chain a root again
+ ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
+ null,
+ RuleChain.class,
+ defaultRuleChainId);
+ Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
+
+ // Delete the created rule chain
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/ruleChain/{ruleChainId}", ruleChainId);
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ private RuleChainId createRootRuleChainForRpcResponse() throws Exception {
+ RuleChain newRuleChain = new RuleChain();
+ newRuleChain.setName("testRuleChain");
+ ResponseEntity<RuleChain> ruleChainResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain",
+ newRuleChain,
+ RuleChain.class);
+ Assert.assertTrue(ruleChainResponse.getStatusCode().is2xxSuccessful());
+ RuleChain ruleChain = ruleChainResponse.getBody();
+
+ JsonNode configuration = mapper.readTree(this.getClass().getClassLoader().getResourceAsStream("RpcResponseRuleChainMetadata.json"));
+ RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
+ ruleChainMetaData.setRuleChainId(ruleChain.getId());
+ ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
+ ruleChainMetaData.setNodes(Arrays.asList(mapper.treeToValue(configuration.get("nodes"), RuleNode[].class)));
+ ruleChainMetaData.setConnections(Arrays.asList(mapper.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
+
+ ResponseEntity<RuleChainMetaData> ruleChainMetadataResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain/metadata",
+ ruleChainMetaData,
+ RuleChainMetaData.class);
+ Assert.assertTrue(ruleChainMetadataResponse.getStatusCode().is2xxSuccessful());
+
+ // Set a new rule chain as root
+ ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
+ null,
+ RuleChain.class,
+ ruleChain.getId());
+ Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
+
+ return ruleChain.getId();
+ }
+
+ private RuleChainId getDefaultRuleChainId() {
+ ResponseEntity<TextPageData<RuleChain>> ruleChains = restClient.getRestTemplate().exchange(
+ HTTPS_URL + "/api/ruleChains?limit=40&textSearch=",
+ HttpMethod.GET,
+ null,
+ new ParameterizedTypeReference<TextPageData<RuleChain>>() {
+ });
+
+ Optional<RuleChain> defaultRuleChain = ruleChains.getBody().getData()
+ .stream()
+ .filter(RuleChain::isRoot)
+ .findFirst();
+ if (!defaultRuleChain.isPresent()) {
+ Assert.fail("Root rule chain wasn't found");
+ }
+ return defaultRuleChain.get().getId();
+ }
+
+ private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
+ MqttClientConfig clientConfig = new MqttClientConfig();
+ clientConfig.setClientId("MQTT client from test");
+ clientConfig.setUsername(deviceCredentials.getCredentialsId());
+ MqttClient mqttClient = MqttClient.create(clientConfig, listener);
+ mqttClient.connect("localhost", 1883).get();
+ return mqttClient;
+ }
+
+ @Data
+ private class MqttMessageListener implements MqttHandler {
+ private final BlockingQueue<MqttEvent> events;
+
+ private MqttMessageListener() {
+ events = new ArrayBlockingQueue<>(100);
+ }
+
+ @Override
+ public void onMessage(String topic, ByteBuf message) {
+ log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
+ events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
+ }
+ }
+
+ @Data
+ private class MqttEvent {
+ private final String topic;
+ private final String message;
+ }
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
new file mode 100644
index 0000000..21d095f
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa;
+
+import org.junit.ClassRule;
+import org.junit.extensions.cpsuite.ClasspathSuite;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.Base58;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(ClasspathSuite.class)
+@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*Test"})
+public class ContainerTestSuite {
+
+ private static DockerComposeContainer testContainer;
+
+ @ClassRule
+ public static ThingsBoardDbInstaller installTb = new ThingsBoardDbInstaller();
+
+ @ClassRule
+ public static DockerComposeContainer getTestContainer() {
+ if (testContainer == null) {
+ testContainer = new DockerComposeContainer(
+ new File("./../../docker/docker-compose.yml"),
+ new File("./../../docker/docker-compose.postgres.yml"),
+ new File("./../../docker/docker-compose.postgres.volumes.yml"))
+ .withPull(false)
+ .withLocalCompose(true)
+ .withTailChildContainers(true)
+ .withEnv(installTb.getEnv())
+ .withEnv("LOAD_BALANCER_NAME", "")
+ .withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(Duration.ofSeconds(120)));
+ }
+ return testContainer;
+ }
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/DockerComposeExecutor.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/DockerComposeExecutor.java
new file mode 100644
index 0000000..25d2e6e
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/DockerComposeExecutor.java
@@ -0,0 +1,119 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.thingsboard.server.msa;
+
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SystemUtils;
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.utility.CommandLine;
+import org.zeroturnaround.exec.InvalidExitValueException;
+import org.zeroturnaround.exec.ProcessExecutor;
+import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.stream.Collectors.joining;
+
+@Slf4j
+public class DockerComposeExecutor {
+
+ String ENV_PROJECT_NAME = "COMPOSE_PROJECT_NAME";
+ String ENV_COMPOSE_FILE = "COMPOSE_FILE";
+
+ private static final String COMPOSE_EXECUTABLE = SystemUtils.IS_OS_WINDOWS ? "docker-compose.exe" : "docker-compose";
+ private static final String DOCKER_EXECUTABLE = SystemUtils.IS_OS_WINDOWS ? "docker.exe" : "docker";
+
+ private final List<File> composeFiles;
+ private final String identifier;
+ private String cmd = "";
+ private Map<String, String> env = new HashMap<>();
+
+ public DockerComposeExecutor(List<File> composeFiles, String identifier) {
+ validateFileList(composeFiles);
+ this.composeFiles = composeFiles;
+ this.identifier = identifier;
+ }
+
+ public DockerComposeExecutor withCommand(String cmd) {
+ this.cmd = cmd;
+ return this;
+ }
+
+ public DockerComposeExecutor withEnv(Map<String, String> env) {
+ this.env = env;
+ return this;
+ }
+
+ public void invokeCompose() {
+ // bail out early
+ if (!CommandLine.executableExists(COMPOSE_EXECUTABLE)) {
+ throw new ContainerLaunchException("Local Docker Compose not found. Is " + COMPOSE_EXECUTABLE + " on the PATH?");
+ }
+ final Map<String, String> environment = Maps.newHashMap(env);
+ environment.put(ENV_PROJECT_NAME, identifier);
+ final Stream<String> absoluteDockerComposeFilePaths = composeFiles.stream().map(File::getAbsolutePath).map(Objects::toString);
+ final String composeFileEnvVariableValue = absoluteDockerComposeFilePaths.collect(joining(File.pathSeparator + ""));
+ log.debug("Set env COMPOSE_FILE={}", composeFileEnvVariableValue);
+ final File pwd = composeFiles.get(0).getAbsoluteFile().getParentFile().getAbsoluteFile();
+ environment.put(ENV_COMPOSE_FILE, composeFileEnvVariableValue);
+ log.info("Local Docker Compose is running command: {}", cmd);
+ final List<String> command = Splitter.onPattern(" ").omitEmptyStrings().splitToList(COMPOSE_EXECUTABLE + " " + cmd);
+ try {
+ new ProcessExecutor().command(command).redirectOutput(Slf4jStream.of(log).asInfo()).redirectError(Slf4jStream.of(log).asError()).environment(environment).directory(pwd).exitValueNormal().executeNoTimeout();
+ log.info("Docker Compose has finished running");
+ } catch (InvalidExitValueException e) {
+ throw new ContainerLaunchException("Local Docker Compose exited abnormally with code " + e.getExitValue() + " whilst running command: " + cmd);
+ } catch (Exception e) {
+ throw new ContainerLaunchException("Error running local Docker Compose command: " + cmd, e);
+ }
+ }
+
+ public void invokeDocker() {
+ // bail out early
+ if (!CommandLine.executableExists(DOCKER_EXECUTABLE)) {
+ throw new ContainerLaunchException("Local Docker not found. Is " + DOCKER_EXECUTABLE + " on the PATH?");
+ }
+ final File pwd = composeFiles.get(0).getAbsoluteFile().getParentFile().getAbsoluteFile();
+ log.info("Local Docker is running command: {}", cmd);
+ final List<String> command = Splitter.onPattern(" ").omitEmptyStrings().splitToList(DOCKER_EXECUTABLE + " " + cmd);
+ try {
+ new ProcessExecutor().command(command).redirectOutput(Slf4jStream.of(log).asInfo()).redirectError(Slf4jStream.of(log).asError()).directory(pwd).exitValueNormal().executeNoTimeout();
+ log.info("Docker has finished running");
+ } catch (InvalidExitValueException e) {
+ throw new ContainerLaunchException("Local Docker exited abnormally with code " + e.getExitValue() + " whilst running command: " + cmd);
+ } catch (Exception e) {
+ throw new ContainerLaunchException("Error running local Docker command: " + cmd, e);
+ }
+ }
+
+ void validateFileList(List<File> composeFiles) {
+ checkNotNull(composeFiles);
+ checkArgument(!composeFiles.isEmpty(), "No docker compose file have been provided");
+ }
+
+
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java
new file mode 100644
index 0000000..f9774ee
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa.mapper;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class AttributesResponse {
+ private Map<String, Object> client;
+ private Map<String, Object> shared;
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java
new file mode 100644
index 0000000..b22244f
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/mapper/WsTelemetryResponse.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa.mapper;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Data
+public class WsTelemetryResponse implements Serializable {
+ private int subscriptionId;
+ private int errorCode;
+ private String errorMsg;
+ private Map<String, List<List<Object>>> data;
+ private Map<String, Object> latestValues;
+
+ public List<Object> getDataValuesByKey(String key) {
+ return data.entrySet().stream()
+ .filter(e -> e.getKey().equals(key))
+ .flatMap(e -> e.getValue().stream().flatMap(Collection::stream))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
new file mode 100644
index 0000000..d0422b0
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
@@ -0,0 +1,132 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa;
+
+import org.junit.rules.ExternalResource;
+import org.testcontainers.utility.Base58;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ThingsBoardDbInstaller extends ExternalResource {
+
+ private final static String POSTGRES_DATA_VOLUME = "tb-postgres-test-data-volume";
+ private final static String TB_LOG_VOLUME = "tb-log-test-volume";
+ private final static String TB_COAP_TRANSPORT_LOG_VOLUME = "tb-coap-transport-log-test-volume";
+ private final static String TB_HTTP_TRANSPORT_LOG_VOLUME = "tb-http-transport-log-test-volume";
+ private final static String TB_MQTT_TRANSPORT_LOG_VOLUME = "tb-mqtt-transport-log-test-volume";
+
+ private final DockerComposeExecutor dockerCompose;
+
+ private final String postgresDataVolume;
+ private final String tbLogVolume;
+ private final String tbCoapTransportLogVolume;
+ private final String tbHttpTransportLogVolume;
+ private final String tbMqttTransportLogVolume;
+ private final Map<String, String> env;
+
+ public ThingsBoardDbInstaller() {
+ List<File> composeFiles = Arrays.asList(new File("./../../docker/docker-compose.yml"),
+ new File("./../../docker/docker-compose.postgres.yml"),
+ new File("./../../docker/docker-compose.postgres.volumes.yml"));
+
+ String identifier = Base58.randomString(6).toLowerCase();
+ String project = identifier + Base58.randomString(6).toLowerCase();
+
+ postgresDataVolume = project + "_" + POSTGRES_DATA_VOLUME;
+ tbLogVolume = project + "_" + TB_LOG_VOLUME;
+ tbCoapTransportLogVolume = project + "_" + TB_COAP_TRANSPORT_LOG_VOLUME;
+ tbHttpTransportLogVolume = project + "_" + TB_HTTP_TRANSPORT_LOG_VOLUME;
+ tbMqttTransportLogVolume = project + "_" + TB_MQTT_TRANSPORT_LOG_VOLUME;
+
+ dockerCompose = new DockerComposeExecutor(composeFiles, project);
+
+ env = new HashMap<>();
+ env.put("POSTGRES_DATA_VOLUME", postgresDataVolume);
+ env.put("TB_LOG_VOLUME", tbLogVolume);
+ env.put("TB_COAP_TRANSPORT_LOG_VOLUME", tbCoapTransportLogVolume);
+ env.put("TB_HTTP_TRANSPORT_LOG_VOLUME", tbHttpTransportLogVolume);
+ env.put("TB_MQTT_TRANSPORT_LOG_VOLUME", tbMqttTransportLogVolume);
+ dockerCompose.withEnv(env);
+ }
+
+ public Map<String, String> getEnv() {
+ return env;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ try {
+
+ dockerCompose.withCommand("volume create " + postgresDataVolume);
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("volume create " + tbLogVolume);
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("volume create " + tbCoapTransportLogVolume);
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("volume create " + tbHttpTransportLogVolume);
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("volume create " + tbMqttTransportLogVolume);
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("up -d redis postgres");
+ dockerCompose.invokeCompose();
+
+ dockerCompose.withCommand("run --no-deps --rm -e INSTALL_TB=true -e LOAD_DEMO=true tb1");
+ dockerCompose.invokeCompose();
+
+ } finally {
+ try {
+ dockerCompose.withCommand("down -v");
+ dockerCompose.invokeCompose();
+ } catch (Exception e) {}
+ }
+ }
+
+ @Override
+ protected void after() {
+ copyLogs(tbLogVolume, "./target/tb-logs/");
+ copyLogs(tbCoapTransportLogVolume, "./target/tb-coap-transport-logs/");
+ copyLogs(tbHttpTransportLogVolume, "./target/tb-http-transport-logs/");
+ copyLogs(tbMqttTransportLogVolume, "./target/tb-mqtt-transport-logs/");
+
+ dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume +
+ " " + tbCoapTransportLogVolume + " " + tbHttpTransportLogVolume + " " + tbMqttTransportLogVolume);
+ dockerCompose.invokeDocker();
+ }
+
+ private void copyLogs(String volumeName, String targetDir) {
+ File tbLogsDir = new File(targetDir);
+ tbLogsDir.mkdirs();
+
+ dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + volumeName + ":/root alpine tail -f /dev/null");
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("cp tb-logs-container:/root/. "+tbLogsDir.getAbsolutePath());
+ dockerCompose.invokeDocker();
+
+ dockerCompose.withCommand("rm -f tb-logs-container");
+ dockerCompose.invokeDocker();
+ }
+
+}
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
new file mode 100644
index 0000000..fa3a63a
--- /dev/null
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class WsClient extends WebSocketClient {
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private WsTelemetryResponse message;
+
+ private volatile boolean firstReplyReceived;
+ private CountDownLatch firstReply = new CountDownLatch(1);
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ WsClient(URI serverUri) {
+ super(serverUri);
+ }
+
+ @Override
+ public void onOpen(ServerHandshake serverHandshake) {
+ }
+
+ @Override
+ public void onMessage(String message) {
+ if (!firstReplyReceived) {
+ firstReplyReceived = true;
+ firstReply.countDown();
+ } else {
+ try {
+ WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class);
+ if (!response.getData().isEmpty()) {
+ this.message = response;
+ latch.countDown();
+ }
+ } catch (IOException e) {
+ log.error("ws message can't be read");
+ }
+ }
+ }
+
+ @Override
+ public void onClose(int code, String reason, boolean remote) {
+ log.info("ws is closed, due to [{}]", reason);
+ }
+
+ @Override
+ public void onError(Exception ex) {
+ ex.printStackTrace();
+ }
+
+ public WsTelemetryResponse getLastMessage() {
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+ return this.message;
+ } catch (InterruptedException e) {
+ log.error("Timeout, ws message wasn't received");
+ }
+ return null;
+ }
+
+ void waitForFirstReply() {
+ try {
+ firstReply.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Timeout, ws message wasn't received");
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/msa/black-box-tests/src/test/resources/RpcResponseRuleChainMetadata.json b/msa/black-box-tests/src/test/resources/RpcResponseRuleChainMetadata.json
new file mode 100644
index 0000000..09178ef
--- /dev/null
+++ b/msa/black-box-tests/src/test/resources/RpcResponseRuleChainMetadata.json
@@ -0,0 +1,59 @@
+{
+ "firstNodeIndex": 0,
+ "nodes": [
+ {
+ "additionalInfo": {
+ "layoutX": 325,
+ "layoutY": 150
+ },
+ "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
+ "name": "msgTypeSwitch",
+ "debugMode": true,
+ "configuration": {
+ "version": 0
+ }
+ },
+ {
+ "additionalInfo": {
+ "layoutX": 60,
+ "layoutY": 300
+ },
+ "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode",
+ "name": "formResponse",
+ "debugMode": true,
+ "configuration": {
+ "jsScript": "if (msg.method == \"getResponse\") {\n return {msg: {\"response\": \"requestReceived\"}, metadata: metadata, msgType: msgType};\n}\n\nreturn {msg: msg, metadata: metadata, msgType: msgType};"
+ }
+ },
+ {
+ "additionalInfo": {
+ "layoutX": 450,
+ "layoutY": 300
+ },
+ "type": "org.thingsboard.rule.engine.rpc.TbSendRPCReplyNode",
+ "name": "rpcReply",
+ "debugMode": true,
+ "configuration": {
+ "requestIdMetaDataAttribute": "requestId"
+ }
+ }
+ ],
+ "connections": [
+ {
+ "fromIndex": 0,
+ "toIndex": 1,
+ "type": "RPC Request from Device"
+ },
+ {
+ "fromIndex": 1,
+ "toIndex": 2,
+ "type": "Success"
+ },
+ {
+ "fromIndex": 1,
+ "toIndex": 2,
+ "type": "Failure"
+ }
+ ],
+ "ruleChainConnections": null
+}
\ No newline at end of file
msa/js-executor/server.js 18(+18 -0)
diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js
index 03fac2e..17f70cb 100644
--- a/msa/js-executor/server.js
+++ b/msa/js-executor/server.js
@@ -45,6 +45,24 @@ var kafkaClient;
kafkaRequestTopic
);
+ consumer.on('error', (err) => {
+ logger.error('Unexpected kafka consumer error: %s', err.message);
+ logger.error(err.stack);
+ });
+
+ consumer.on('offsetOutOfRange', (err) => {
+ logger.error('Offset out of range error: %s', err.message);
+ logger.error(err.stack);
+ });
+
+ consumer.on('rebalancing', () => {
+ logger.info('Rebalancing event received.');
+ })
+
+ consumer.on('rebalanced', () => {
+ logger.info('Rebalanced event received.');
+ });
+
var producer = new Producer(kafkaClient);
producer.on('error', (err) => {
logger.error('Unexpected kafka producer error: %s', err.message);
msa/pom.xml 3(+2 -1)
diff --git a/msa/pom.xml b/msa/pom.xml
index d6d6c8d..5a9d9ab 100644
--- a/msa/pom.xml
+++ b/msa/pom.xml
@@ -16,7 +16,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
@@ -41,6 +41,7 @@
<module>web-ui</module>
<module>tb-node</module>
<module>transport</module>
+ <module>black-box-tests</module>
</modules>
<build>
msa/transport/coap/docker/Dockerfile 5(+1 -4)
diff --git a/msa/transport/coap/docker/Dockerfile b/msa/transport/coap/docker/Dockerfile
index 9240b2a..69180df 100644
--- a/msa/transport/coap/docker/Dockerfile
+++ b/msa/transport/coap/docker/Dockerfile
@@ -16,7 +16,7 @@
FROM openjdk:8-jdk
-COPY logback.xml ${pkg.name}.conf start-tb-coap-transport.sh ${pkg.name}.deb /tmp/
+COPY start-tb-coap-transport.sh ${pkg.name}.deb /tmp/
RUN chmod a+x /tmp/*.sh \
&& mv /tmp/start-tb-coap-transport.sh /usr/bin
@@ -25,7 +25,4 @@ RUN dpkg -i /tmp/${pkg.name}.deb
RUN update-rc.d ${pkg.name} disable
-RUN mv /tmp/logback.xml ${pkg.installFolder}/conf \
- && mv /tmp/${pkg.name}.conf ${pkg.installFolder}/conf
-
CMD ["start-tb-coap-transport.sh"]
diff --git a/msa/transport/coap/docker/start-tb-coap-transport.sh b/msa/transport/coap/docker/start-tb-coap-transport.sh
index 43d4602..d233dbc 100755
--- a/msa/transport/coap/docker/start-tb-coap-transport.sh
+++ b/msa/transport/coap/docker/start-tb-coap-transport.sh
@@ -15,15 +15,17 @@
# limitations under the License.
#
-CONF_FOLDER="${pkg.installFolder}/conf"
+CONF_FOLDER="/config"
jarfile=${pkg.installFolder}/bin/${pkg.name}.jar
configfile=${pkg.name}.conf
source "${CONF_FOLDER}/${configfile}"
+export LOADER_PATH=/config,${LOADER_PATH}
+
echo "Starting '${project.name}' ..."
exec java -cp ${jarfile} $JAVA_OPTS -Dloader.main=org.thingsboard.server.coap.ThingsboardCoapTransportApplication \
-Dspring.jpa.hibernate.ddl-auto=none \
- -Dlogging.config=${CONF_FOLDER}/logback.xml \
+ -Dlogging.config=/config/logback.xml \
org.springframework.boot.loader.PropertiesLauncher
msa/transport/http/docker/Dockerfile 5(+1 -4)
diff --git a/msa/transport/http/docker/Dockerfile b/msa/transport/http/docker/Dockerfile
index 6c83b9c..2ed6318 100644
--- a/msa/transport/http/docker/Dockerfile
+++ b/msa/transport/http/docker/Dockerfile
@@ -16,7 +16,7 @@
FROM openjdk:8-jdk
-COPY logback.xml ${pkg.name}.conf start-tb-http-transport.sh ${pkg.name}.deb /tmp/
+COPY start-tb-http-transport.sh ${pkg.name}.deb /tmp/
RUN chmod a+x /tmp/*.sh \
&& mv /tmp/start-tb-http-transport.sh /usr/bin
@@ -25,7 +25,4 @@ RUN dpkg -i /tmp/${pkg.name}.deb
RUN update-rc.d ${pkg.name} disable
-RUN mv /tmp/logback.xml ${pkg.installFolder}/conf \
- && mv /tmp/${pkg.name}.conf ${pkg.installFolder}/conf
-
CMD ["start-tb-http-transport.sh"]
diff --git a/msa/transport/http/docker/start-tb-http-transport.sh b/msa/transport/http/docker/start-tb-http-transport.sh
index 667988f..e00a66c 100755
--- a/msa/transport/http/docker/start-tb-http-transport.sh
+++ b/msa/transport/http/docker/start-tb-http-transport.sh
@@ -15,15 +15,17 @@
# limitations under the License.
#
-CONF_FOLDER="${pkg.installFolder}/conf"
+CONF_FOLDER="/config"
jarfile=${pkg.installFolder}/bin/${pkg.name}.jar
configfile=${pkg.name}.conf
source "${CONF_FOLDER}/${configfile}"
+export LOADER_PATH=/config,${LOADER_PATH}
+
echo "Starting '${project.name}' ..."
exec java -cp ${jarfile} $JAVA_OPTS -Dloader.main=org.thingsboard.server.http.ThingsboardHttpTransportApplication \
-Dspring.jpa.hibernate.ddl-auto=none \
- -Dlogging.config=${CONF_FOLDER}/logback.xml \
+ -Dlogging.config=/config/logback.xml \
org.springframework.boot.loader.PropertiesLauncher
msa/transport/mqtt/docker/Dockerfile 5(+1 -4)
diff --git a/msa/transport/mqtt/docker/Dockerfile b/msa/transport/mqtt/docker/Dockerfile
index f636e2f..4fb707a 100644
--- a/msa/transport/mqtt/docker/Dockerfile
+++ b/msa/transport/mqtt/docker/Dockerfile
@@ -16,7 +16,7 @@
FROM openjdk:8-jdk
-COPY logback.xml ${pkg.name}.conf start-tb-mqtt-transport.sh ${pkg.name}.deb /tmp/
+COPY start-tb-mqtt-transport.sh ${pkg.name}.deb /tmp/
RUN chmod a+x /tmp/*.sh \
&& mv /tmp/start-tb-mqtt-transport.sh /usr/bin
@@ -25,7 +25,4 @@ RUN dpkg -i /tmp/${pkg.name}.deb
RUN update-rc.d ${pkg.name} disable
-RUN mv /tmp/logback.xml ${pkg.installFolder}/conf \
- && mv /tmp/${pkg.name}.conf ${pkg.installFolder}/conf
-
CMD ["start-tb-mqtt-transport.sh"]
diff --git a/msa/transport/mqtt/docker/start-tb-mqtt-transport.sh b/msa/transport/mqtt/docker/start-tb-mqtt-transport.sh
index 8fe06d2..dd184fe 100755
--- a/msa/transport/mqtt/docker/start-tb-mqtt-transport.sh
+++ b/msa/transport/mqtt/docker/start-tb-mqtt-transport.sh
@@ -15,15 +15,17 @@
# limitations under the License.
#
-CONF_FOLDER="${pkg.installFolder}/conf"
+CONF_FOLDER="/config"
jarfile=${pkg.installFolder}/bin/${pkg.name}.jar
configfile=${pkg.name}.conf
source "${CONF_FOLDER}/${configfile}"
+export LOADER_PATH=/config,${LOADER_PATH}
+
echo "Starting '${project.name}' ..."
exec java -cp ${jarfile} $JAVA_OPTS -Dloader.main=org.thingsboard.server.mqtt.ThingsboardMqttTransportApplication \
-Dspring.jpa.hibernate.ddl-auto=none \
- -Dlogging.config=${CONF_FOLDER}/logback.xml \
+ -Dlogging.config=/config/logback.xml \
org.springframework.boot.loader.PropertiesLauncher
msa/web-ui/build.gradle 2(+1 -1)
diff --git a/msa/web-ui/build.gradle b/msa/web-ui/build.gradle
index 7372c0a..e298541 100644
--- a/msa/web-ui/build.gradle
+++ b/msa/web-ui/build.gradle
@@ -66,7 +66,7 @@ ospackage {
}
// Copy web files
- from("target/package/linux/web") {
+ from("target/web") {
into "web"
}
diff --git a/msa/web-ui/config/custom-environment-variables.yml b/msa/web-ui/config/custom-environment-variables.yml
index 9472a50..de357c9 100644
--- a/msa/web-ui/config/custom-environment-variables.yml
+++ b/msa/web-ui/config/custom-environment-variables.yml
@@ -20,6 +20,7 @@ server:
# Server bind port
port: "HTTP_BIND_PORT"
thingsboard:
+ enableProxy: "TB_ENABLE_PROXY"
# ThingsBoard node host
host: "TB_HOST"
# ThingsBoard node port
msa/web-ui/config/default.yml 1(+1 -0)
diff --git a/msa/web-ui/config/default.yml b/msa/web-ui/config/default.yml
index cf27a14..c7f3ba9 100644
--- a/msa/web-ui/config/default.yml
+++ b/msa/web-ui/config/default.yml
@@ -20,6 +20,7 @@ server:
# Server bind port
port: "8090"
thingsboard:
+ enableProxy: "false"
# ThingsBoard node host
host: "localhost"
# ThingsBoard node port
msa/web-ui/server.js 77(+43 -34)
diff --git a/msa/web-ui/server.js b/msa/web-ui/server.js
index 44e175a..31d35a2 100644
--- a/msa/web-ui/server.js
+++ b/msa/web-ui/server.js
@@ -31,14 +31,19 @@ var server;
const bindAddress = config.get('server.address');
const bindPort = config.get('server.port');
+ const thingsboardEnableProxy = config.get('thingsboard.enableProxy');
+
const thingsboardHost = config.get('thingsboard.host');
const thingsboardPort = config.get('thingsboard.port');
logger.info('Bind address: %s', bindAddress);
logger.info('Bind port: %s', bindPort);
+ logger.info('ThingsBoard Enable Proxy: %s', thingsboardEnableProxy);
logger.info('ThingsBoard host: %s', thingsboardHost);
logger.info('ThingsBoard port: %s', thingsboardPort);
+ const useApiProxy = thingsboardEnableProxy === "true";
+
var webDir = path.join(__dirname, 'web');
if (typeof process.env.WEB_FOLDER === 'string') {
@@ -49,47 +54,51 @@ var server;
const app = express();
server = http.createServer(app);
- const apiProxy = httpProxy.createProxyServer({
- target: {
- host: thingsboardHost,
- port: thingsboardPort
- }
- });
-
- apiProxy.on('error', function (err, req, res) {
- logger.warn('API proxy error: %s', err.message);
- res.writeHead(500);
- if (err.code && err.code === 'ECONNREFUSED') {
- res.end('Unable to connect to ThingsBoard server.');
- } else {
- res.end('Thingsboard server connection error: ' + err.code ? err.code : '');
- }
- });
-
- const root = path.join(webDir, 'public');
-
- const staticDir = path.join(root, 'static');
+ if (useApiProxy) {
+ const apiProxy = httpProxy.createProxyServer({
+ target: {
+ host: thingsboardHost,
+ port: thingsboardPort
+ }
+ });
+
+ apiProxy.on('error', function (err, req, res) {
+ logger.warn('API proxy error: %s', err.message);
+ res.writeHead(500);
+ if (err.code && err.code === 'ECONNREFUSED') {
+ res.end('Unable to connect to ThingsBoard server.');
+ } else {
+ res.end('Thingsboard server connection error: ' + err.code ? err.code : '');
+ }
+ });
+ }
- app.all('/api/*', (req, res) => {
- logger.debug(req.method + ' ' + req.originalUrl);
- apiProxy.web(req, res);
- });
+ if (useApiProxy) {
+ app.all('/api/*', (req, res) => {
+ logger.debug(req.method + ' ' + req.originalUrl);
+ apiProxy.web(req, res);
+ });
- app.all('/static/rulenode/*', (req, res) => {
- apiProxy.web(req, res);
- });
+ app.all('/static/rulenode/*', (req, res) => {
+ apiProxy.web(req, res);
+ });
+ }
app.use(historyApiFallback());
- app.use('/static', express.static(staticDir));
+ const root = path.join(webDir, 'public');
- app.get('*', (req, res) => {
- apiProxy.web(req, res);
- });
+ app.use(express.static(root));
- server.on('upgrade', (req, socket, head) => {
- apiProxy.ws(req, socket, head);
- });
+ if (useApiProxy) {
+ app.get('*', (req, res) => {
+ apiProxy.web(req, res);
+ });
+
+ server.on('upgrade', (req, socket, head) => {
+ apiProxy.ws(req, socket, head);
+ });
+ }
server.listen(bindPort, bindAddress, (error) => {
if (error) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java
index f7f2d2d..c7c33fd 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java
@@ -118,16 +118,16 @@ public class TbRestApiCallNode implements TbNode {
}
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
- TbMsgMetaData metaData = new TbMsgMetaData();
+ TbMsgMetaData metaData = origMsg.getMetaData();
metaData.putValue(STATUS, response.getStatusCode().name());
metaData.putValue(STATUS_CODE, response.getStatusCode().value()+"");
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
- response.getHeaders().toSingleValueMap().forEach((k,v) -> metaData.putValue(k,v) );
+ response.getHeaders().toSingleValueMap().forEach(metaData::putValue);
return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, response.getBody());
}
private TbMsg processFailureResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
- TbMsgMetaData metaData = origMsg.getMetaData().copy();
+ TbMsgMetaData metaData = origMsg.getMetaData();
metaData.putValue(STATUS, response.getStatusCode().name());
metaData.putValue(STATUS_CODE, response.getStatusCode().value()+"");
metaData.putValue(STATUS_REASON, response.getStatusCode().getReasonPhrase());
@@ -136,7 +136,7 @@ public class TbRestApiCallNode implements TbNode {
}
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
- TbMsgMetaData metaData = origMsg.getMetaData().copy();
+ TbMsgMetaData metaData = origMsg.getMetaData();
metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage());
if (e instanceof HttpClientErrorException) {
HttpClientErrorException httpClientErrorException = (HttpClientErrorException)e;
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index f96bcbe..95e7b75 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -30,6 +30,9 @@ transport:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
+ json:
+ # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
+ type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
kafka:
enabled: true
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index 6d593ed..bb1d3c4 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -31,6 +31,9 @@ transport:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
+ json:
+ # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
+ type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
kafka:
enabled: true
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index e7f8942..719530c 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -50,6 +50,9 @@ transport:
enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
+ json:
+ # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
+ type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
kafka:
enabled: true