azkaban-aplcache

Clean up updater thread - remove firing FLOW_FINISHED event

6/7/2017 8:40:32 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a3f761f..27c9292 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1343,7 +1343,6 @@ public class ExecutorManager extends EventHandler implements
                 ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
                     cacheDir);
               }
-              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED, new EventData(flow)));
             }
 
             updaterStage =
@@ -1408,8 +1407,6 @@ public class ExecutorManager extends EventHandler implements
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
       runningFlows.remove(execId);
-      fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
-
     } catch (ExecutorManagerException e) {
       alertUser = false; // failed due to azkaban internal error, not to alert user
       logger.error(e);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 2a2f44a..44ad6ca 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -50,8 +50,6 @@ public class TriggerManager extends EventHandler implements
   private final ActionTypeLoader actionTypeLoader;
   private final TriggerLoader triggerLoader;
   private final LocalTriggerJMX jmxStats = new LocalTriggerJMX();
-  private final ExecutorManagerEventListener listener =
-      new ExecutorManagerEventListener();
   private long lastRunnerThreadCheckTime = -1;
   private long runnerThreadIdleTime = -1;
   private String scannerStage = "";
@@ -81,8 +79,6 @@ public class TriggerManager extends EventHandler implements
     Condition.setCheckerLoader(checkerTypeLoader);
     Trigger.setActionTypeLoader(actionTypeLoader);
 
-    executorManager.addListener(listener);
-
     logger.info("TriggerManager loaded.");
   }
 
@@ -255,12 +251,10 @@ public class TriggerManager extends EventHandler implements
   private class TriggerScannerThread extends Thread {
     private final long scannerInterval;
     private final BlockingQueue<Trigger> triggers;
-    private final Map<Integer, ExecutableFlow> justFinishedFlows;
     private boolean shutdown = false;
 
     public TriggerScannerThread(long scannerInterval) {
       triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
-      justFinishedFlows = new ConcurrentHashMap<>();
       this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
       this.scannerInterval = scannerInterval;
     }
@@ -271,12 +265,6 @@ public class TriggerManager extends EventHandler implements
       this.interrupt();
     }
 
-    public void addJustFinishedFlow(ExecutableFlow flow) {
-      synchronized (syncObj) {
-        justFinishedFlows.put(flow.getExecutionId(), flow);
-      }
-    }
-
     public void addTrigger(Trigger t) {
       synchronized (syncObj) {
         t.updateNextCheckTime();
@@ -301,7 +289,6 @@ public class TriggerManager extends EventHandler implements
 
             try {
               checkAllTriggers();
-              justFinishedFlows.clear();
             } catch (Exception e) {
               e.printStackTrace();
               logger.error(e.getMessage());
@@ -475,22 +462,4 @@ public class TriggerManager extends EventHandler implements
     }
 
   }
-
-  private class ExecutorManagerEventListener implements EventListener {
-    public ExecutorManagerEventListener() {
-    }
-
-    @Override
-    public void handleEvent(Event event) {
-      // this needs to be fixed for perf
-      synchronized (syncObj) {
-        ExecutableFlow flow = (ExecutableFlow) event.getRunner();
-        if (event.getType() == Type.FLOW_FINISHED) {
-          logger.info("Flow finish event received. " + flow.getExecutionId());
-          runnerThread.addJustFinishedFlow(flow);
-        }
-      }
-    }
-  }
-
 }