TenantActor.java
Home
/
application /
src /
main /
java /
org /
thingsboard /
server /
actors /
tenant /
TenantActor.java
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.tenant;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.device.DeviceActor;
import org.thingsboard.server.actors.plugin.PluginTerminationMsg;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
import java.util.HashMap;
import java.util.Map;
public class TenantActor extends RuleChainManagerActor {
private final TenantId tenantId;
private final Map<DeviceId, ActorRef> deviceActors;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext, new TenantRuleChainManager(systemContext, tenantId), new TenantPluginManager(systemContext, tenantId));
this.tenantId = tenantId;
this.deviceActors = new HashMap<>();
}
@Override
public void preStart() {
logger.info("[{}] Starting tenant actor.", tenantId);
try {
initRuleChains();
logger.info("[{}] Tenant actor started.", tenantId);
} catch (Exception e) {
logger.error(e, "[{}] Unknown failure", tenantId);
}
}
@Override
protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case SERVICE_TO_RULE_ENGINE_MSG:
onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
break;
default:
return false;
}
return true;
}
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
ruleChainManager.getRootChainActor().tell(msg, self());
}
// @Override
// public void onReceive(Object msg) throws Exception {
// logger.debug("[{}] Received message: {}", tenantId, msg);
// if (msg instanceof ToDeviceActorMsg) {
// onToDeviceActorMsg((ToDeviceActorMsg) msg);
// } else if (msg instanceof ToPluginActorMsg) {
// onToPluginMsg((ToPluginActorMsg) msg);
// } else if (msg instanceof ToDeviceActorNotificationMsg) {
// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
// } else if (msg instanceof ClusterEventMsg) {
// broadcast(msg);
// } else if (msg instanceof ComponentLifecycleMsg) {
// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
// } else if (msg instanceof PluginTerminationMsg) {
// onPluginTerminated((PluginTerminationMsg) msg);
// } else {
// logger.warning("[{}] Unknown message: {}!", tenantId, msg);
// }
// }
private void broadcast(Object msg) {
pluginManager.broadcast(msg);
deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
private void onToDeviceActorMsg(ToDeviceActorMsg msg) {
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
}
private void onToDeviceActorMsg(ToDeviceActorNotificationMsg msg) {
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
}
private void onToPluginMsg(ToPluginActorMsg msg) {
if (msg.getPluginTenantId().equals(tenantId)) {
ActorRef pluginActor = pluginManager.getOrCreateActor(this.context(), msg.getPluginId());
pluginActor.tell(msg, ActorRef.noSender());
} else {
context().parent().tell(msg, ActorRef.noSender());
}
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
ActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
target.tell(msg, ActorRef.noSender());
} else {
logger.debug("Invalid component lifecycle msg: {}", msg);
}
}
private void onPluginTerminated(PluginTerminationMsg msg) {
pluginManager.remove(msg.getId());
}
private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
.withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()));
}
public static class ActorCreator extends ContextBasedCreator<TenantActor> {
private static final long serialVersionUID = 1L;
private final TenantId tenantId;
public ActorCreator(ActorSystemContext context, TenantId tenantId) {
super(context);
this.tenantId = tenantId;
}
@Override
public TenantActor create() throws Exception {
return new TenantActor(context, tenantId);
}
}
}