/*
* Copyright 2012 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/package azkaban.execapp.event;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
publicabstractclassFlowWatcher{
privatefinalint execId;
privatefinal Map<String, BlockingStatus> map =
new ConcurrentHashMap<>();
private Logger logger;
private ExecutableFlow flow;
privateboolean cancelWatch = false;
publicFlowWatcher(finalint execId){
this.execId = execId;
}
publicvoidsetFlow(final ExecutableFlow flow){
this.flow = flow;
}
protected Logger getLogger(){
returnthis.logger;
}
publicvoidsetLogger(final Logger logger){
this.logger = logger;
}
/**
* Called to fire events to the JobRunner listeners
*/protectedsynchronizedvoidhandleJobStatusChange(final String jobId, final Status status){
final BlockingStatus block = this.map.get(jobId);
if (block != null) {
block.changeStatus(status);
}
}
publicintgetExecId(){
returnthis.execId;
}
publicsynchronized BlockingStatus getBlockingStatus(final String jobId){
if (this.cancelWatch) {
returnnull;
}
final ExecutableNode node = this.flow.getExecutableNodePath(jobId);
if (node == null) {
returnnull;
}
BlockingStatus blockingStatus = this.map.get(jobId);
if (blockingStatus == null) {
blockingStatus = new BlockingStatus(this.execId, jobId, node.getStatus());
this.map.put(jobId, blockingStatus);
}
return blockingStatus;
}
public Status peekStatus(final String jobId){
if (Status.isStatusFinished(this.flow.getStatus())) {
returnnull;
}
final ExecutableNode node = this.flow.getExecutableNodePath(jobId);
if (node != null) {
ExecutableFlowBase parentFlow = node.getParentFlow();
while (parentFlow != null) {
Status parentStatus = parentFlow.getStatus();
if (parentStatus == Status.SKIPPED || parentStatus == Status.DISABLED) {
return Status.SKIPPED;
}
parentFlow = parentFlow.getParentFlow();
}
return node.getStatus();
}
returnnull;
}
publicsynchronizedvoidunblockAllWatches(){
this.logger.info("Unblock all watches on " + this.execId);
this.cancelWatch = true;
for (final BlockingStatus status : this.map.values()) {
this.logger.info("Unblocking " + status.getJobId());
status.changeStatus(Status.SKIPPED);
status.unblock();
}
this.logger.info("Successfully unblocked all watches on " + this.execId);
}
publicbooleanisWatchCancelled(){
returnthis.cancelWatch;
}
publicabstractvoidstopWatcher();
}