diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 0538e6e..af3c7c1 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -18,11 +18,7 @@ package azkaban.trigger;
import static java.util.Objects.requireNonNull;
-import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
-import azkaban.event.EventListener;
-import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.utils.Props;
import com.google.inject.Inject;
@@ -46,6 +42,7 @@ public class TriggerManager extends EventHandler implements
private static final Logger logger = Logger.getLogger(TriggerManager.class);
private static final Map<Integer, Trigger> triggerIdMap =
new ConcurrentHashMap<>();
+
private final TriggerScannerThread runnerThread;
private final Object syncObj = new Object();
private final CheckerTypeLoader checkerTypeLoader;
@@ -57,29 +54,29 @@ public class TriggerManager extends EventHandler implements
private String scannerStage = "";
@Inject
- public TriggerManager(Props props, TriggerLoader triggerLoader,
- ExecutorManager executorManager) throws TriggerManagerException {
+ public TriggerManager(final Props props, final TriggerLoader triggerLoader,
+ final ExecutorManager executorManager) throws TriggerManagerException {
requireNonNull(props);
requireNonNull(executorManager);
this.triggerLoader = requireNonNull(triggerLoader);
- long scannerInterval =
+ final long scannerInterval =
props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
- runnerThread = new TriggerScannerThread(scannerInterval);
+ this.runnerThread = new TriggerScannerThread(scannerInterval);
- checkerTypeLoader = new CheckerTypeLoader();
- actionTypeLoader = new ActionTypeLoader();
+ this.checkerTypeLoader = new CheckerTypeLoader();
+ this.actionTypeLoader = new ActionTypeLoader();
try {
- checkerTypeLoader.init(props);
- actionTypeLoader.init(props);
- } catch (Exception e) {
+ this.checkerTypeLoader.init(props);
+ this.actionTypeLoader.init(props);
+ } catch (final Exception e) {
throw new TriggerManagerException(e);
}
- Condition.setCheckerLoader(checkerTypeLoader);
- Trigger.setActionTypeLoader(actionTypeLoader);
+ Condition.setCheckerLoader(this.checkerTypeLoader);
+ Trigger.setActionTypeLoader(this.actionTypeLoader);
logger.info("TriggerManager loaded.");
}
@@ -89,68 +86,68 @@ public class TriggerManager extends EventHandler implements
try {
// expect loader to return valid triggers
- List<Trigger> triggers = triggerLoader.loadTriggers();
- for (Trigger t : triggers) {
- runnerThread.addTrigger(t);
+ final List<Trigger> triggers = this.triggerLoader.loadTriggers();
+ for (final Trigger t : triggers) {
+ this.runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error(e);
throw new TriggerManagerException(e);
}
- runnerThread.start();
+ this.runnerThread.start();
}
protected CheckerTypeLoader getCheckerLoader() {
- return checkerTypeLoader;
+ return this.checkerTypeLoader;
}
protected ActionTypeLoader getActionLoader() {
- return actionTypeLoader;
+ return this.actionTypeLoader;
}
- public void insertTrigger(Trigger t) throws TriggerManagerException {
+ public void insertTrigger(final Trigger t) throws TriggerManagerException {
logger.info("Inserting trigger " + t + " in TriggerManager");
- synchronized (syncObj) {
+ synchronized (this.syncObj) {
try {
- triggerLoader.addTrigger(t);
- } catch (TriggerLoaderException e) {
+ this.triggerLoader.addTrigger(t);
+ } catch (final TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
- runnerThread.addTrigger(t);
+ this.runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
}
- public void removeTrigger(int id) throws TriggerManagerException {
+ public void removeTrigger(final int id) throws TriggerManagerException {
logger.info("Removing trigger with id: " + id + " from TriggerManager");
- synchronized (syncObj) {
- Trigger t = triggerIdMap.get(id);
+ synchronized (this.syncObj) {
+ final Trigger t = triggerIdMap.get(id);
if (t != null) {
removeTrigger(triggerIdMap.get(id));
}
}
}
- public void updateTrigger(Trigger t) throws TriggerManagerException {
+ public void updateTrigger(final Trigger t) throws TriggerManagerException {
logger.info("Updating trigger " + t + " in TriggerManager");
- synchronized (syncObj) {
- runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
- runnerThread.addTrigger(t);
+ synchronized (this.syncObj) {
+ this.runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+ this.runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
}
- public void removeTrigger(Trigger t) throws TriggerManagerException {
+ public void removeTrigger(final Trigger t) throws TriggerManagerException {
logger.info("Removing trigger " + t + " from TriggerManager");
- synchronized (syncObj) {
- runnerThread.deleteTrigger(t);
+ synchronized (this.syncObj) {
+ this.runnerThread.deleteTrigger(t);
triggerIdMap.remove(t.getTriggerId());
try {
t.stopCheckers();
- triggerLoader.removeTrigger(t);
- } catch (TriggerLoaderException e) {
+ this.triggerLoader.removeTrigger(t);
+ } catch (final TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
@@ -161,24 +158,24 @@ public class TriggerManager extends EventHandler implements
}
public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
- return checkerTypeLoader.getSupportedCheckers();
+ return this.checkerTypeLoader.getSupportedCheckers();
}
- public Trigger getTrigger(int triggerId) {
- synchronized (syncObj) {
+ public Trigger getTrigger(final int triggerId) {
+ synchronized (this.syncObj) {
return triggerIdMap.get(triggerId);
}
}
- public void expireTrigger(int triggerId) {
- Trigger t = getTrigger(triggerId);
+ public void expireTrigger(final int triggerId) {
+ final Trigger t = getTrigger(triggerId);
t.setStatus(TriggerStatus.EXPIRED);
}
@Override
- public List<Trigger> getTriggers(String triggerSource) {
- List<Trigger> triggers = new ArrayList<>();
- for (Trigger t : triggerIdMap.values()) {
+ public List<Trigger> getTriggers(final String triggerSource) {
+ final List<Trigger> triggers = new ArrayList<>();
+ for (final Trigger t : triggerIdMap.values()) {
if (t.getSource().equals(triggerSource)) {
triggers.add(t);
}
@@ -187,10 +184,10 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public List<Trigger> getTriggerUpdates(String triggerSource,
- long lastUpdateTime) throws TriggerManagerException {
- List<Trigger> triggers = new ArrayList<>();
- for (Trigger t : triggerIdMap.values()) {
+ public List<Trigger> getTriggerUpdates(final String triggerSource,
+ final long lastUpdateTime) throws TriggerManagerException {
+ final List<Trigger> triggers = new ArrayList<>();
+ for (final Trigger t : triggerIdMap.values()) {
if (t.getSource().equals(triggerSource)
&& t.getLastModifyTime() > lastUpdateTime) {
triggers.add(t);
@@ -200,10 +197,10 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public List<Trigger> getAllTriggerUpdates(long lastUpdateTime)
+ public List<Trigger> getAllTriggerUpdates(final long lastUpdateTime)
throws TriggerManagerException {
- List<Trigger> triggers = new ArrayList<>();
- for (Trigger t : triggerIdMap.values()) {
+ final List<Trigger> triggers = new ArrayList<>();
+ for (final Trigger t : triggerIdMap.values()) {
if (t.getLastModifyTime() > lastUpdateTime) {
triggers.add(t);
}
@@ -212,25 +209,25 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public void insertTrigger(Trigger t, String user)
+ public void insertTrigger(final Trigger t, final String user)
throws TriggerManagerException {
insertTrigger(t);
}
@Override
- public void removeTrigger(int id, String user) throws TriggerManagerException {
+ public void removeTrigger(final int id, final String user) throws TriggerManagerException {
removeTrigger(id);
}
@Override
- public void updateTrigger(Trigger t, String user)
+ public void updateTrigger(final Trigger t, final String user)
throws TriggerManagerException {
updateTrigger(t);
}
@Override
public void shutdown() {
- runnerThread.shutdown();
+ this.runnerThread.shutdown();
}
@Override
@@ -239,15 +236,15 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public void registerCheckerType(String name,
- Class<? extends ConditionChecker> checker) {
- checkerTypeLoader.registerCheckerType(name, checker);
+ public void registerCheckerType(final String name,
+ final Class<? extends ConditionChecker> checker) {
+ this.checkerTypeLoader.registerCheckerType(name, checker);
}
@Override
- public void registerActionType(String name,
- Class<? extends TriggerAction> action) {
- actionTypeLoader.registerActionType(name, action);
+ public void registerActionType(final String name,
+ final Class<? extends TriggerAction> action) {
+ this.actionTypeLoader.registerActionType(name, action);
}
private class TriggerScannerThread extends Thread {
@@ -255,63 +252,63 @@ public class TriggerManager extends EventHandler implements
private final BlockingQueue<Trigger> triggers;
private boolean shutdown = false;
- public TriggerScannerThread(long scannerInterval) {
- triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
+ public TriggerScannerThread(final long scannerInterval) {
+ this.triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
this.scannerInterval = scannerInterval;
}
public void shutdown() {
logger.error("Shutting down trigger manager thread " + this.getName());
- shutdown = true;
+ this.shutdown = true;
this.interrupt();
}
- public void addTrigger(Trigger t) {
- synchronized (syncObj) {
+ public void addTrigger(final Trigger t) {
+ synchronized (TriggerManager.this.syncObj) {
t.updateNextCheckTime();
- triggers.add(t);
+ this.triggers.add(t);
}
}
- public void deleteTrigger(Trigger t) {
- triggers.remove(t);
+ public void deleteTrigger(final Trigger t) {
+ this.triggers.remove(t);
}
@Override
public void run() {
- while (!shutdown) {
- synchronized (syncObj) {
+ while (!this.shutdown) {
+ synchronized (TriggerManager.this.syncObj) {
try {
- lastRunnerThreadCheckTime = System.currentTimeMillis();
+ TriggerManager.this.lastRunnerThreadCheckTime = System.currentTimeMillis();
- scannerStage =
+ TriggerManager.this.scannerStage =
"Ready to start a new scan cycle at "
- + lastRunnerThreadCheckTime;
+ + TriggerManager.this.lastRunnerThreadCheckTime;
try {
checkAllTriggers();
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
- } catch (Throwable t) {
+ } catch (final Throwable t) {
t.printStackTrace();
logger.error(t.getMessage());
}
- scannerStage = "Done flipping all triggers.";
+ TriggerManager.this.scannerStage = "Done flipping all triggers.";
- runnerThreadIdleTime =
- scannerInterval
- - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
+ TriggerManager.this.runnerThreadIdleTime =
+ this.scannerInterval
+ - (System.currentTimeMillis() - TriggerManager.this.lastRunnerThreadCheckTime);
- if (runnerThreadIdleTime < 0) {
+ if (TriggerManager.this.runnerThreadIdleTime < 0) {
logger.error("Trigger manager thread " + this.getName()
+ " is too busy!");
} else {
- syncObj.wait(runnerThreadIdleTime);
+ TriggerManager.this.syncObj.wait(TriggerManager.this.runnerThreadIdleTime);
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
}
@@ -320,9 +317,9 @@ public class TriggerManager extends EventHandler implements
private void checkAllTriggers() throws TriggerManagerException {
// sweep through the rest of them
- for (Trigger t : triggers) {
+ for (final Trigger t : this.triggers) {
try {
- scannerStage = "Checking for trigger " + t.getTriggerId();
+ TriggerManager.this.scannerStage = "Checking for trigger " + t.getTriggerId();
if (t.getStatus().equals(TriggerStatus.READY)) {
@@ -343,22 +340,22 @@ public class TriggerManager extends EventHandler implements
} else {
t.updateNextCheckTime();
}
- } catch (Throwable th) {
+ } catch (final Throwable th) {
//skip this trigger, moving on to the next one
logger.error("Failed to process trigger with id : " + t, th);
}
}
}
- private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
- List<TriggerAction> actions = t.getTriggerActions();
- for (TriggerAction action : actions) {
+ private void onTriggerTrigger(final Trigger t) throws TriggerManagerException {
+ final List<TriggerAction> actions = t.getTriggerActions();
+ for (final TriggerAction action : actions) {
try {
logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
action.doAction();
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("Failed to do action " + action.getDescription() + " for " + t, e);
- } catch (Throwable th) {
+ } catch (final Throwable th) {
logger.error("Failed to do action " + action.getDescription() + " for " + t, th);
}
}
@@ -369,38 +366,38 @@ public class TriggerManager extends EventHandler implements
t.setStatus(TriggerStatus.EXPIRED);
}
try {
- triggerLoader.updateTrigger(t);
- } catch (TriggerLoaderException e) {
+ TriggerManager.this.triggerLoader.updateTrigger(t);
+ } catch (final TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
- private void onTriggerPause(Trigger t) throws TriggerManagerException {
- List<TriggerAction> expireActions = t.getExpireActions();
- for (TriggerAction action : expireActions) {
+ private void onTriggerPause(final Trigger t) throws TriggerManagerException {
+ final List<TriggerAction> expireActions = t.getExpireActions();
+ for (final TriggerAction action : expireActions) {
try {
logger.info("Doing expire actions for "+ action.getDescription() + " for " + t);
action.doAction();
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("Failed to do expire action " + action.getDescription() + " for " + t, e);
- } catch (Throwable th) {
+ } catch (final Throwable th) {
logger.error("Failed to do expire action " + action.getDescription() + " for " + t, th);
}
}
logger.info("Pausing Trigger " + t.getDescription());
t.setStatus(TriggerStatus.PAUSED);
try {
- triggerLoader.updateTrigger(t);
- } catch (TriggerLoaderException e) {
+ TriggerManager.this.triggerLoader.updateTrigger(t);
+ } catch (final TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
private class TriggerComparator implements Comparator<Trigger> {
@Override
- public int compare(Trigger arg0, Trigger arg1) {
- long first = arg1.getNextCheckTime();
- long second = arg0.getNextCheckTime();
+ public int compare(final Trigger arg0, final Trigger arg1) {
+ final long first = arg1.getNextCheckTime();
+ final long second = arg0.getNextCheckTime();
if (first == second) {
return 0;
@@ -416,12 +413,12 @@ public class TriggerManager extends EventHandler implements
@Override
public long getLastRunnerThreadCheckTime() {
- return lastRunnerThreadCheckTime;
+ return TriggerManager.this.lastRunnerThreadCheckTime;
}
@Override
public boolean isRunnerThreadActive() {
- return runnerThread.isAlive();
+ return TriggerManager.this.runnerThread.isAlive();
}
@Override
@@ -436,8 +433,8 @@ public class TriggerManager extends EventHandler implements
@Override
public String getTriggerSources() {
- Set<String> sources = new HashSet<>();
- for (Trigger t : triggerIdMap.values()) {
+ final Set<String> sources = new HashSet<>();
+ for (final Trigger t : triggerIdMap.values()) {
sources.add(t.getSource());
}
return sources.toString();
@@ -450,7 +447,7 @@ public class TriggerManager extends EventHandler implements
@Override
public long getScannerIdleTime() {
- return runnerThreadIdleTime;
+ return TriggerManager.this.runnerThreadIdleTime;
}
@Override
@@ -460,7 +457,7 @@ public class TriggerManager extends EventHandler implements
@Override
public String getScannerThreadStage() {
- return scannerStage;
+ return TriggerManager.this.scannerStage;
}
}