JmxJobMBeanManager.java
Home
/
azkaban-exec-server /
src /
main /
java /
azkaban /
execapp /
jmx /
JmxJobMBeanManager.java
package azkaban.execapp.jmx;
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
import azkaban.spi.EventType;
import azkaban.utils.Props;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
private static final Logger logger = Logger
.getLogger(JmxJobMBeanManager.class);
private static final JmxJobMBeanManager INSTANCE = new JmxJobMBeanManager();
private final AtomicInteger runningJobCount = new AtomicInteger(0);
private final AtomicInteger totalExecutedJobCount = new AtomicInteger(0);
private final AtomicInteger totalFailedJobCount = new AtomicInteger(0);
private final AtomicInteger totalSucceededJobCount = new AtomicInteger(0);
private final Map<String, AtomicInteger> jobTypeFailureMap =
new HashMap<>();
private final Map<String, AtomicInteger> jobTypeSucceededMap =
new HashMap<>();
private boolean initialized;
private JmxJobMBeanManager() {
}
public static JmxJobMBeanManager getInstance() {
return INSTANCE;
}
public void initialize(final Props props) {
logger.info("Initializing " + getClass().getName());
this.initialized = true;
}
@Override
public int getNumRunningJobs() {
return this.runningJobCount.get();
}
@Override
public int getTotalNumExecutedJobs() {
return this.totalExecutedJobCount.get();
}
@Override
public int getTotalFailedJobs() {
return this.totalFailedJobCount.get();
}
@Override
public int getTotalSucceededJobs() {
return this.totalSucceededJobCount.get();
}
@Override
public Map<String, Integer> getTotalSucceededJobsByJobType() {
return convertMapValueToInteger(this.jobTypeSucceededMap);
}
@Override
public Map<String, Integer> getTotalFailedJobsByJobType() {
return convertMapValueToInteger(this.jobTypeFailureMap);
}
private Map<String, Integer> convertMapValueToInteger(
final Map<String, AtomicInteger> map) {
final Map<String, Integer> result = new HashMap<>(map.size());
for (final Map.Entry<String, AtomicInteger> entry : map.entrySet()) {
result.put(entry.getKey(), entry.getValue().intValue());
}
return result;
}
@Override
public void handleEvent(final Event event) {
if (!this.initialized) {
throw new RuntimeException("JmxJobMBeanManager has not been initialized");
}
if (event.getRunner() instanceof JobRunner) {
final JobRunner jobRunner = (JobRunner) event.getRunner();
final EventData eventData = event.getData();
final ExecutableNode node = jobRunner.getNode();
if (logger.isDebugEnabled()) {
logger.debug("*** got " + event.getType() + " " + node.getId() + " "
+ event.getRunner().getClass().getName() + " status: "
+ eventData.getStatus());
}
if (event.getType() == EventType.JOB_STARTED) {
this.runningJobCount.incrementAndGet();
} else if (event.getType() == EventType.JOB_FINISHED) {
this.totalExecutedJobCount.incrementAndGet();
if (this.runningJobCount.intValue() > 0) {
this.runningJobCount.decrementAndGet();
} else {
logger.warn("runningJobCount not messed up, it is already zero "
+ "and we are trying to decrement on job event "
+ EventType.JOB_FINISHED);
}
if (eventData.getStatus() == Status.FAILED) {
this.totalFailedJobCount.incrementAndGet();
} else if (eventData.getStatus() == Status.SUCCEEDED) {
this.totalSucceededJobCount.incrementAndGet();
}
handleJobFinishedCount(eventData.getStatus(), node.getType());
}
} else {
logger.warn("((((((((( Got a different runner: "
+ event.getRunner().getClass().getName());
}
}
private void handleJobFinishedCount(final Status status, final String jobType) {
switch (status) {
case FAILED:
handleJobFinishedByType(this.jobTypeFailureMap, jobType);
break;
case SUCCEEDED:
handleJobFinishedByType(this.jobTypeSucceededMap, jobType);
break;
default:
}
}
private void handleJobFinishedByType(final Map<String, AtomicInteger> jobTypeMap,
final String jobType) {
synchronized (jobTypeMap) {
AtomicInteger count = jobTypeMap.get(jobType);
if (count == null) {
count = new AtomicInteger();
}
count.incrementAndGet();
jobTypeMap.put(jobType, count);
}
}
}