/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.service;
import akka.actor.ActorRef;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.actors.stats.StatsPersistMsg;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
/**
* @author Andrew Shvayka
*/
public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
private long lastPersistedErrorTs = 0L;
protected final TenantId tenantId;
protected final T id;
protected P processor;
private long messagesProcessed;
private long errorsOccurred;
public ComponentActor(ActorSystemContext systemContext, TenantId tenantId, T id) {
super(systemContext);
this.tenantId = tenantId;
this.id = id;
}
protected void setProcessor(P processor) {
this.processor = processor;
}
@Override
public void preStart() {
try {
processor.start();
logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) {
scheduleStatsPersistTick();
}
} catch (Exception e) {
logger.warning("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
logAndPersist("OnStart", e, true);
logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
}
}
private void scheduleStatsPersistTick() {
try {
processor.scheduleStatsPersistTick(context(), systemContext.getStatisticsPersistFrequency());
} catch (Exception e) {
logger.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
logAndPersist("onScheduleStatsPersistMsg", e);
}
}
@Override
public void postStop() {
try {
processor.stop();
logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
} catch (Exception e) {
logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
logAndPersist("OnStop", e, true);
logLifecycleEvent(ComponentLifecycleEvent.STOPPED, e);
}
}
protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
try {
switch (msg.getEvent()) {
case CREATED:
processor.onCreated(context());
break;
case UPDATED:
processor.onUpdate(context());
break;
case ACTIVATED:
processor.onActivate(context());
break;
case SUSPENDED:
processor.onSuspend(context());
break;
case DELETED:
processor.onStop(context());
}
logLifecycleEvent(msg.getEvent());
} catch (Exception e) {
logAndPersist("onLifecycleMsg", e, true);
logLifecycleEvent(msg.getEvent(), e);
}
}
protected void onClusterEventMsg(ClusterEventMsg msg) {
try {
processor.onClusterEventMsg(msg);
} catch (Exception e) {
logAndPersist("onClusterEventMsg", e);
}
}
protected void onStatsPersistTick(EntityId entityId) {
try {
systemContext.getStatsActor().tell(new StatsPersistMsg(messagesProcessed, errorsOccurred, tenantId, entityId), ActorRef.noSender());
resetStatsCounters();
} catch (Exception e) {
logAndPersist("onStatsPersistTick", e);
}
}
private void resetStatsCounters() {
messagesProcessed = 0;
errorsOccurred = 0;
}
protected void increaseMessagesProcessedCount() {
messagesProcessed++;
}
protected void logAndPersist(String method, Exception e) {
logAndPersist(method, e, false);
}
private void logAndPersist(String method, Exception e, boolean critical) {
errorsOccurred++;
if (critical) {
logger.warning("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
} else {
logger.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
}
long ts = System.currentTimeMillis();
if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
systemContext.persistError(tenantId, id, method, e);
lastPersistedErrorTs = ts;
}
}
protected void logLifecycleEvent(ComponentLifecycleEvent event) {
logLifecycleEvent(event, null);
}
protected void logLifecycleEvent(ComponentLifecycleEvent event, Exception e) {
systemContext.persistLifecycleEvent(tenantId, id, event, e);
}
protected abstract long getErrorPersistFrequency();
}