azkaban-aplcache

Revert running flows cache (#1121) * Revert "Fix race condition

5/27/2017 12:34:19 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 92feba9..4549ef0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -94,12 +94,14 @@ public class ExecutorManager extends EventHandler implements
 
   private CleanerThread cleanerThread;
 
+  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
   private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
-      new ConcurrentHashMap<>();
+      new ConcurrentHashMap<Integer, ExecutableFlow>();
 
   QueuedExecutions queuedFlows;
 
-  final private Set<Executor> activeExecutors = new HashSet<>();
+  final private Set<Executor> activeExecutors = new HashSet<Executor>();
   private QueueProcessorThread queueProcessor;
   private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
 
@@ -128,6 +130,7 @@ public class ExecutorManager extends EventHandler implements
     this.azkProps = azkProps;
     this.executorLoader = loader;
     this.setupExecutors();
+    this.loadRunningFlows();
 
     queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
@@ -417,23 +420,23 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
-    HashSet<String> ports = new HashSet<>();
+    HashSet<String> ports = new HashSet<String>();
     for (Executor executor : activeExecutors) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
-    try {
-      // include executor which were initially active and still has flows running
-      for (Pair<ExecutionReference, ExecutableFlow> running :
-          executorLoader.fetchActiveFlows().values()) {
-        ExecutionReference ref = running.getFirst();
-        ports.add(ref.getHost() + ":" + ref.getPort());
-      }
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
+    // include executor which were initially active and still has flows running
+    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
+      .values()) {
+      ExecutionReference ref = running.getFirst();
+      ports.add(ref.getHost() + ":" + ref.getPort());
     }
     return ports;
   }
 
+  private void loadRunningFlows() throws ExecutorManagerException {
+    runningFlows.putAll(executorLoader.fetchActiveFlows());
+  }
+
   /*
    * load queued flows i.e with active_execution_reference and not assigned to
    * any executor
@@ -466,12 +469,8 @@ public class ExecutorManager extends EventHandler implements
     if (runningCandidate != null) {
       executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
     }
-    try {
-      executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
-          executorLoader.fetchActiveFlows().values()));
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
-    }
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      runningFlows.values()));
     Collections.sort(executionIds);
     return executionIds;
   }
@@ -498,13 +497,10 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
     throws IOException {
-    List<Pair<ExecutableFlow, Executor>> flows = new ArrayList<>();
+    List<Pair<ExecutableFlow, Executor>> flows =
+      new ArrayList<Pair<ExecutableFlow, Executor>>();
     getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
-    try {
-      getActiveFlowsWithExecutorHelper(flows, executorLoader.fetchActiveFlows().values());
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
-    }
+    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
     return flows;
   }
 
@@ -531,12 +527,9 @@ public class ExecutorManager extends EventHandler implements
     isRunning =
       isRunning
         || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
-    try {
-      isRunning = isRunning || isFlowRunningHelper(projectId, flowId,
-          executorLoader.fetchActiveFlows().values());
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
-    }
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
     return isRunning;
   }
 
@@ -572,13 +565,9 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
-    ArrayList<ExecutableFlow> flows = new ArrayList<>();
+    ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
     getActiveFlowHelper(flows, queuedFlows.getAllEntries());
-    try {
-      getActiveFlowHelper(flows, executorLoader.fetchActiveFlows().values());
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
-    }
+    getActiveFlowHelper(flows, runningFlows.values());
     return flows;
   }
 
@@ -601,13 +590,9 @@ public class ExecutorManager extends EventHandler implements
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
    */
   public String getRunningFlowIds() {
-    List<Integer> allIds = new ArrayList<>();
+    List<Integer> allIds = new ArrayList<Integer>();
     getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
-    try {
-      getRunningFlowsIdsHelper(allIds, executorLoader.fetchActiveFlows().values());
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
-    }
+    getRunningFlowsIdsHelper(allIds, runningFlows.values());
     Collections.sort(allIds);
     return allIds.toString();
   }
@@ -702,7 +687,7 @@ public class ExecutorManager extends EventHandler implements
   public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
       int length) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
       Pair<String, String> offsetParam =
@@ -727,7 +712,7 @@ public class ExecutorManager extends EventHandler implements
   public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
       int offset, int length, int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
@@ -756,7 +741,7 @@ public class ExecutorManager extends EventHandler implements
   public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
       int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair == null) {
       return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
           attempt);
@@ -781,9 +766,9 @@ public class ExecutorManager extends EventHandler implements
   public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
       String jobId, int offset, int length, int attempt)
       throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> activeFlow =
-        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
-    if (activeFlow != null) {
+    Pair<ExecutionReference, ExecutableFlow> pair =
+        runningFlows.get(exFlow.getExecutionId());
+    if (pair != null) {
 
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
@@ -797,7 +782,7 @@ public class ExecutorManager extends EventHandler implements
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-          callExecutorServer(activeFlow.getFirst(), ConnectorParams.METADATA_ACTION,
+          callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
               typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return JobMetaData.createJobMetaDataFromObject(result);
     } else {
@@ -816,10 +801,10 @@ public class ExecutorManager extends EventHandler implements
   public void cancelFlow(ExecutableFlow exFlow, String userId)
     throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> activeFlow =
-          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
-      if(activeFlow != null) {
-        callExecutorServer(activeFlow.getFirst(), ConnectorParams.CANCEL_ACTION,
+      if (runningFlows.containsKey(exFlow.getExecutionId())) {
+        Pair<ExecutionReference, ExecutableFlow> pair =
+          runningFlows.get(exFlow.getExecutionId());
+        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
           userId);
       } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
         queuedFlows.dequeue(exFlow.getExecutionId());
@@ -836,14 +821,14 @@ public class ExecutorManager extends EventHandler implements
   public void resumeFlow(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> activeFlow =
-          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
-      if (activeFlow == null) {
+      Pair<ExecutionReference, ExecutableFlow> pair =
+          runningFlows.get(exFlow.getExecutionId());
+      if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
             + " isn't running.");
       }
-      callExecutorServer(activeFlow.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+      callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
     }
   }
 
@@ -851,14 +836,14 @@ public class ExecutorManager extends EventHandler implements
   public void pauseFlow(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> activeFlow =
-          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
-      if (activeFlow == null) {
+      Pair<ExecutionReference, ExecutableFlow> pair =
+          runningFlows.get(exFlow.getExecutionId());
+      if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
             + " isn't running.");
       }
-      callExecutorServer(activeFlow.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+      callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
     }
   }
 
@@ -916,7 +901,7 @@ public class ExecutorManager extends EventHandler implements
       throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+          runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1157,7 +1142,6 @@ public class ExecutorManager extends EventHandler implements
       paramList = new ArrayList<Pair<String, String>>();
     }
 
-    // TODO: refactor using Guice, inject ExecutorApiClient in ExecutorManager
     ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
     @SuppressWarnings("unchecked")
     URI uri =
@@ -1256,8 +1240,6 @@ public class ExecutorManager extends EventHandler implements
               new ArrayList<ExecutableFlow>();
           ArrayList<ExecutableFlow> finalizeFlows =
               new ArrayList<ExecutableFlow>();
-          Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
-              executorLoader.fetchActiveFlows();
 
           if (exFlowMap.size() > 0) {
             for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
@@ -1293,22 +1275,15 @@ public class ExecutorManager extends EventHandler implements
               } catch (IOException e) {
                 logger.error(e);
                 for (ExecutableFlow flow : entry.getValue()) {
-                  Pair<ExecutionReference, ExecutableFlow> activeFlow =
-                      activeFlows.get(flow.getExecutionId());
+                  Pair<ExecutionReference, ExecutableFlow> pair =
+                      runningFlows.get(flow.getExecutionId());
 
                   updaterStage =
                       "Failed to get update. Doing some clean up for flow "
-                          + flow.getExecutionId();
-
-                  // The failure retry logic below won't work after removing the runningFlow
-                  // cache. numErrors and nextCheckTime are not stored in DB. So whenever we
-                  // fetch active flows from DB, numErrors will be initialized to default 0
-                  // and nexCheckTime will be -1. numErrors will never reach threshold and
-                  // flows will never be finalized in below case.
-                  // todo: jamiesjc will remove updaterThread and add separate clean up code
-                  // to handle errors.
-                  if (activeFlow != null) {
-                    ExecutionReference ref = activeFlow.getFirst();
+                          + pair.getSecond().getExecutionId();
+
+                  if (pair != null) {
+                    ExecutionReference ref = pair.getFirst();
                     int numErrors = ref.getNumErrors();
                     if (ref.getNumErrors() < this.numErrors) {
                       ref.setNextCheckTime(System.currentTimeMillis()
@@ -1318,7 +1293,7 @@ public class ExecutorManager extends EventHandler implements
                       logger.error("Evicting flow " + flow.getExecutionId()
                           + ". The executor is unresponsive.");
                       // TODO should send out an unresponsive email here.
-                      finalizeFlows.add(activeFlow.getSecond());
+                      finalizeFlows.add(pair.getSecond());
                     }
                   }
                 }
@@ -1340,9 +1315,13 @@ public class ExecutorManager extends EventHandler implements
                       finalizeFlows.add(flow);
                     }
                   } catch (ExecutorManagerException e) {
-                    // Currently just ignore the update error. Will remove UpdaterThread and
-                    // add separate clean up code to handle errors.
-                    logger.error("Update execution failed. Ignored. ", e);
+                    ExecutableFlow flow = e.getExecutableFlow();
+                    logger.error(e);
+
+                    if (flow != null) {
+                      logger.error("Finalizing flow " + flow.getExecutionId());
+                      finalizeFlows.add(flow);
+                    }
                   }
                 }
               }
@@ -1375,7 +1354,7 @@ public class ExecutorManager extends EventHandler implements
 
           synchronized (this) {
             try {
-              if (activeFlows.size() > 0) {
+              if (runningFlows.size() > 0) {
                 this.wait(waitTimeMs);
               } else {
                 this.wait(waitTimeIdleMs);
@@ -1423,6 +1402,7 @@ public class ExecutorManager extends EventHandler implements
       executorLoader.removeActiveExecutableReference(execId);
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
+      runningFlows.remove(execId);
       fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow)));
       recentlyFinished.put(execId, dsFlow);
 
@@ -1550,15 +1530,15 @@ public class ExecutorManager extends EventHandler implements
           "Response is malformed. Need exec id to update.");
     }
 
-    Pair<ExecutionReference, ExecutableFlow> activeFlow =
-        executorLoader.fetchActiveFlowByExecId(execId);
-    if (activeFlow == null) {
+    Pair<ExecutionReference, ExecutableFlow> refPair =
+        this.runningFlows.get(execId);
+    if (refPair == null) {
       throw new ExecutorManagerException(
           "No running flow found with the execution id. Removing " + execId);
     }
 
-    ExecutionReference ref = activeFlow.getFirst();
-    ExecutableFlow flow = activeFlow.getSecond();
+    ExecutionReference ref = refPair.getFirst();
+    ExecutableFlow flow = refPair.getSecond();
     if (updateData.containsKey("error")) {
       // The flow should be finished here.
       throw new ExecutorManagerException((String) updateData.get("error"), flow);
@@ -1630,31 +1610,27 @@ public class ExecutorManager extends EventHandler implements
   /* Group Executable flow by Executors to reduce number of REST calls */
   private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
     HashMap<Executor, List<ExecutableFlow>> exFlowMap =
-      new HashMap<>();
+      new HashMap<Executor, List<ExecutableFlow>>();
 
-    try {
-      for (Pair<ExecutionReference, ExecutableFlow> runningFlow :
-          executorLoader.fetchActiveFlows().values()) {
-        ExecutionReference ref = runningFlow.getFirst();
-        ExecutableFlow flow = runningFlow.getSecond();
-        Executor executor = ref.getExecutor();
-
-        // We can set the next check time to prevent the checking of certain
-        // flows.
-        if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
-          continue;
-        }
+    for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
+      .values()) {
+      ExecutionReference ref = runningFlow.getFirst();
+      ExecutableFlow flow = runningFlow.getSecond();
+      Executor executor = ref.getExecutor();
 
-        List<ExecutableFlow> flows = exFlowMap.get(executor);
-        if (flows == null) {
-          flows = new ArrayList<>();
-          exFlowMap.put(executor, flows);
-        }
+      // We can set the next check time to prevent the checking of certain
+      // flows.
+      if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+        continue;
+      }
 
-        flows.add(flow);
+      List<ExecutableFlow> flows = exFlowMap.get(executor);
+      if (flows == null) {
+        flows = new ArrayList<ExecutableFlow>();
+        exFlowMap.put(executor, flows);
       }
-    } catch(ExecutorManagerException e) {
-      logger.error(e);
+
+      flows.add(flow);
     }
 
     return exFlowMap;
@@ -1756,6 +1732,10 @@ public class ExecutorManager extends EventHandler implements
     }
     reference.setExecutor(choosenExecutor);
 
+    // move from flow to running flows
+    runningFlows.put(exflow.getExecutionId(),
+      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
     logger.info(String.format(
       "Successfully dispatched exec %d with error count %d",
       exflow.getExecutionId(), reference.getNumErrors()));
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index abfdeee..7f3727d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.Ignore;
 
 import azkaban.user.User;
 import azkaban.utils.Pair;
@@ -278,7 +279,7 @@ public class ExecutorManagerTest {
     verify(loader).addActiveExecutableReference(any());
   }
 
-  @Test
+  @Ignore @Test
   public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
     List<ExecutableFlow> flows = manager.getRunningFlows();
@@ -287,7 +288,7 @@ public class ExecutorManagerTest {
     }
   }
 
-  @Test
+  @Ignore @Test
   public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
     List<Integer> executions = manager.getRunningFlows(flow1.getProjectId(), flow1.getFlowId());
@@ -295,7 +296,7 @@ public class ExecutorManagerTest {
     Assert.assertTrue(manager.isFlowRunning(flow1.getProjectId(), flow1.getFlowId()));
   }
 
-  @Test
+  @Ignore @Test
   public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
     testSetUpForRunningFlows();
     List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index de9d88b..2e6bdb0 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -268,13 +268,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
         continue;
       }
 
-      // In the previous design, web server sends update requests to executor periodically and checks
-      // updateTime to decide whether to update the flow cache on web server side and remove active flows
-      // from DB. After removing runningFlows cache from web server, flow info will be fetched from DB
-      // directly, so both web server and executor will have the same flow updateTime here.
-      // todo jamiesjc: will investigate whether the current update mechanism is still needed. Plan to
-      // remove UpdaterThread in executorManager and deprecate handleAjaxUpdateRequest()
-      if (flow.getUpdateTime() >= updateTime) {
+      if (flow.getUpdateTime() > updateTime) {
         updateList.add(flow.toUpdateObject(updateTime));
       }
     }