diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a3f761f..27c9292 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1343,7 +1343,6 @@ public class ExecutorManager extends EventHandler implements
ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
cacheDir);
}
- fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow)));
}
updaterStage =
@@ -1408,8 +1407,6 @@ public class ExecutorManager extends EventHandler implements
updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
- fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
-
} catch (ExecutorManagerException e) {
alertUser = false; // failed due to azkaban internal error, not to alert user
logger.error(e);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 2a2f44a..44ad6ca 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -50,8 +50,6 @@ public class TriggerManager extends EventHandler implements
private final ActionTypeLoader actionTypeLoader;
private final TriggerLoader triggerLoader;
private final LocalTriggerJMX jmxStats = new LocalTriggerJMX();
- private final ExecutorManagerEventListener listener =
- new ExecutorManagerEventListener();
private long lastRunnerThreadCheckTime = -1;
private long runnerThreadIdleTime = -1;
private String scannerStage = "";
@@ -81,8 +79,6 @@ public class TriggerManager extends EventHandler implements
Condition.setCheckerLoader(checkerTypeLoader);
Trigger.setActionTypeLoader(actionTypeLoader);
- executorManager.addListener(listener);
-
logger.info("TriggerManager loaded.");
}
@@ -255,12 +251,10 @@ public class TriggerManager extends EventHandler implements
private class TriggerScannerThread extends Thread {
private final long scannerInterval;
private final BlockingQueue<Trigger> triggers;
- private final Map<Integer, ExecutableFlow> justFinishedFlows;
private boolean shutdown = false;
public TriggerScannerThread(long scannerInterval) {
triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
- justFinishedFlows = new ConcurrentHashMap<>();
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
this.scannerInterval = scannerInterval;
}
@@ -271,12 +265,6 @@ public class TriggerManager extends EventHandler implements
this.interrupt();
}
- public void addJustFinishedFlow(ExecutableFlow flow) {
- synchronized (syncObj) {
- justFinishedFlows.put(flow.getExecutionId(), flow);
- }
- }
-
public void addTrigger(Trigger t) {
synchronized (syncObj) {
t.updateNextCheckTime();
@@ -301,7 +289,6 @@ public class TriggerManager extends EventHandler implements
try {
checkAllTriggers();
- justFinishedFlows.clear();
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
@@ -475,22 +462,4 @@ public class TriggerManager extends EventHandler implements
}
}
-
- private class ExecutorManagerEventListener implements EventListener {
- public ExecutorManagerEventListener() {
- }
-
- @Override
- public void handleEvent(Event event) {
- // this needs to be fixed for perf
- synchronized (syncObj) {
- ExecutableFlow flow = (ExecutableFlow) event.getRunner();
- if (event.getType() == Type.FLOW_FINISHED) {
- logger.info("Flow finish event received. " + flow.getExecutionId());
- runnerThread.addJustFinishedFlow(flow);
- }
- }
- }
- }
-
}