azkaban-aplcache

Finalize running flows without matching executor (#1833) A

7/16/2018 11:57:47 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index 2a26c1a..9f942e6 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -16,6 +16,9 @@
 
 package azkaban.executor;
 
+import java.util.Optional;
+import javax.annotation.Nullable;
+
 public class ExecutionReference {
 
   private final int execId;
@@ -30,11 +33,7 @@ public class ExecutionReference {
     this.execId = execId;
   }
 
-  public ExecutionReference(final int execId, final Executor executor) {
-    if (executor == null) {
-      throw new IllegalArgumentException(String.format(
-          "Executor cannot be null for exec id: %d ExecutionReference", execId));
-    }
+  public ExecutionReference(final int execId, @Nullable final Executor executor) {
     this.execId = execId;
     this.executor = executor;
   }
@@ -59,14 +58,6 @@ public class ExecutionReference {
     return this.execId;
   }
 
-  public String getHost() {
-    return this.executor.getHost();
-  }
-
-  public int getPort() {
-    return this.executor.getPort();
-  }
-
   public int getNumErrors() {
     return this.numErrors;
   }
@@ -75,11 +66,11 @@ public class ExecutionReference {
     this.numErrors = numErrors;
   }
 
-  public Executor getExecutor() {
-    return this.executor;
+  public Optional<Executor> getExecutor() {
+    return Optional.ofNullable(this.executor);
   }
 
-  public void setExecutor(final Executor executor) {
+  public void setExecutor(final @Nullable Executor executor) {
     this.executor = executor;
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
index b3ad9c7..b536555 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -46,14 +46,16 @@ public class ExecutorApiGateway {
 
   Map<String, Object> callWithReference(final ExecutionReference ref, final String action,
       final Pair<String, String>... params) throws ExecutorManagerException {
-    return callWithExecutionId(ref.getHost(), ref.getPort(), action, ref.getExecId(),
+    final Executor executor = ref.getExecutor().get();
+    return callWithExecutionId(executor.getHost(), executor.getPort(), action, ref.getExecId(),
         null, params);
   }
 
   Map<String, Object> callWithReferenceByUser(final ExecutionReference ref,
       final String action, final String user, final Pair<String, String>... params)
       throws ExecutorManagerException {
-    return callWithExecutionId(ref.getHost(), ref.getPort(), action,
+    final Executor executor = ref.getExecutor().get();
+    return callWithExecutionId(executor.getHost(), executor.getPort(), action,
         ref.getExecId(), user, params);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 5ec1d96..8f2976f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -396,10 +397,6 @@ public class ExecutorManager extends EventHandler implements
     return this.lastThreadCheckTime;
   }
 
-  public long getLastCleanerThreadCheckTime() {
-    return this.lastCleanerThreadCheckTime;
-  }
-
   @Override
   public Collection<Executor> getAllActiveExecutors() {
     return Collections.unmodifiableCollection(this.activeExecutors);
@@ -441,7 +438,10 @@ public class ExecutorManager extends EventHandler implements
     for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows
         .values()) {
       final ExecutionReference ref = running.getFirst();
-      ports.add(ref.getHost() + ":" + ref.getPort());
+      if (ref.getExecutor().isPresent()) {
+        final Executor executor = ref.getExecutor().get();
+        ports.add(executor.getHost() + ":" + executor.getPort());
+      }
     }
     return ports;
   }
@@ -508,9 +508,9 @@ public class ExecutorManager extends EventHandler implements
    * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
    */
   @Override
-  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+  public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
       throws IOException {
-    final List<Pair<ExecutableFlow, Executor>> flows =
+    final List<Pair<ExecutableFlow, Optional<Executor>>> flows =
         new ArrayList<>();
     getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
     getActiveFlowsWithExecutorHelper(flows, this.runningFlows.values());
@@ -519,7 +519,7 @@ public class ExecutorManager extends EventHandler implements
 
   /* Helper method for getActiveFlowsWithExecutor */
   private void getActiveFlowsWithExecutorHelper(
-      final List<Pair<ExecutableFlow, Executor>> flows,
+      final List<Pair<ExecutableFlow, Optional<Executor>>> flows,
       final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
     for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       flows.add(new Pair<>(ref.getSecond(), ref
@@ -1445,15 +1445,15 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /* Group Executable flow by Executors to reduce number of REST calls */
-  private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
-    final HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+  private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
+    final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
         new HashMap<>();
 
     for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows
         .values()) {
       final ExecutionReference ref = runningFlow.getFirst();
       final ExecutableFlow flow = runningFlow.getSecond();
-      final Executor executor = ref.getExecutor();
+      final Optional<Executor> executor = ref.getExecutor();
 
       // We can set the next check time to prevent the checking of certain
       // flows.
@@ -1553,20 +1553,27 @@ public class ExecutorManager extends EventHandler implements
           ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
           ExecutorManager.this.updaterStage = "Starting update all flows.";
 
-          final Map<Executor, List<ExecutableFlow>> exFlowMap =
+          final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
               getFlowToExecutorMap();
-          final ArrayList<ExecutableFlow> finishedFlows =
-              new ArrayList<>();
           final ArrayList<ExecutableFlow> finalizeFlows =
               new ArrayList<>();
 
           if (exFlowMap.size() > 0) {
-            for (final Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+            for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
                 .entrySet()) {
               final List<Long> updateTimesList = new ArrayList<>();
               final List<Integer> executionIdsList = new ArrayList<>();
 
-              final Executor executor = entry.getKey();
+              final Optional<Executor> executorOption = entry.getKey();
+              if (!executorOption.isPresent()) {
+                for (final ExecutableFlow flow : entry.getValue()) {
+                  logger.warn("Finalizing execution " + flow.getExecutionId()
+                      + ". Executor id of this execution doesn't exist");
+                  finalizeFlows.add(flow);
+                }
+                continue;
+              }
+              final Executor executor = executorOption.get();
 
               ExecutorManager.this.updaterStage =
                   "Starting update flows on " + executor.getHost() + ":"
@@ -1609,7 +1616,7 @@ public class ExecutorManager extends EventHandler implements
                           + this.errorThreshold);
                       ref.setNumErrors(++numErrors);
                     } else {
-                      logger.error("Evicting flow " + flow.getExecutionId()
+                      logger.warn("Evicting execution " + flow.getExecutionId()
                           + ". The executor is unresponsive.");
                       // TODO should send out an unresponsive email here.
                       finalizeFlows.add(pair.getSecond());
@@ -1630,7 +1637,6 @@ public class ExecutorManager extends EventHandler implements
                     ExecutorManager.this.updaterStage = "Updated flow " + flow.getExecutionId();
 
                     if (isFinished(flow)) {
-                      finishedFlows.add(flow);
                       finalizeFlows.add(flow);
                     }
                   } catch (final ExecutorManagerException e) {
@@ -1638,7 +1644,7 @@ public class ExecutorManager extends EventHandler implements
                     logger.error(e);
 
                     if (flow != null) {
-                      logger.error("Finalizing flow " + flow.getExecutionId());
+                      logger.warn("Finalizing execution " + flow.getExecutionId());
                       finalizeFlows.add(flow);
                     }
                   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index c939d6f..2c2cd7a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -25,6 +25,7 @@ import java.lang.Thread.State;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 public interface ExecutorManagerAdapter {
@@ -44,7 +45,7 @@ public interface ExecutorManagerAdapter {
    * Note, returns empty list if there isn't any running or queued flows
    * </pre>
    */
-  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+  public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
       throws IOException;
 
   public List<ExecutableFlow> getRecentlyFinishedFlows();
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 93796a8..938b5d4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -16,8 +16,8 @@
 
 package azkaban.executor;
 
-import azkaban.db.EncodingType;
 import azkaban.db.DatabaseOperator;
+import azkaban.db.EncodingType;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.Pair;
 import java.io.IOException;
@@ -29,10 +29,13 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
 
 @Singleton
 public class FetchActiveFlowDao {
 
+  private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
+
   private final DatabaseOperator dbOperator;
 
   @Inject
@@ -56,9 +59,9 @@ public class FetchActiveFlowDao {
     // Select running and executor assigned flows
     private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
         "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
-            + "et.port port, et.id executorId, et.active executorStatus"
+            + "et.port port, ex.executor_id executorId, et.active executorStatus"
             + " FROM execution_flows ex"
-            + " INNER JOIN "
+            + " LEFT JOIN "
             + " executors et ON ex.executor_id = et.id"
             + " Where ex.status NOT IN ("
             + Status.SUCCEEDED.getNumVal() + ", "
@@ -91,8 +94,13 @@ public class FetchActiveFlowDao {
             final ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(
                     GZIPUtils.transformBytesToObject(data, encType));
-
-            final Executor executor = new Executor(executorId, host, port, executorStatus);
+            final Executor executor;
+            if (host == null) {
+              logger.warn("Executor id " + executorId + " (on execution " + id + ") wasn't found");
+              executor = null;
+            } else {
+              executor = new Executor(executorId, host, port, executorStatus);
+            }
             final ExecutionReference ref = new ExecutionReference(id, executor);
             execFlows.put(id, new Pair<>(ref, exFlow));
           } catch (final IOException e) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index e833bd6..31ff91c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -280,7 +280,11 @@ public class ExecutionFlowDaoTest {
         this.fetchActiveFlowDao.fetchActiveFlows();
 
     assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
-    assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isFalse();
+    assertThat(activeFlows1.get(flow1.getExecutionId()).getFirst().getExecutor().isPresent())
+        .isTrue();
+    assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isTrue();
+    assertThat(activeFlows1.get(flow2.getExecutionId()).getFirst().getExecutor().isPresent())
+        .isFalse();
     final ExecutableFlow flow1Result =
         activeFlows1.get(flow1.getExecutionId()).getSecond();
     assertTwoFlowSame(flow1Result, flow1);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 5ac23b4..67467fa 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.junit.After;
@@ -357,12 +358,12 @@ public class ExecutorManagerTest {
   @Test
   public void testFetchActiveFlowWithExecutor() throws Exception {
     testSetUpForRunningFlows();
-    final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
+    final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
         this.manager.getActiveFlowsWithExecutor();
     Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow1,
-        this.manager.fetchExecutor(this.flow1.getExecutionId()))));
+        Optional.ofNullable(this.manager.fetchExecutor(this.flow1.getExecutionId())))));
     Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow2,
-        this.manager.fetchExecutor(this.flow2.getExecutionId()))));
+        Optional.ofNullable(this.manager.fetchExecutor(this.flow2.getExecutionId())))));
   }
 
   @Test
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 6551df9..61a8325 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -57,6 +57,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -357,7 +358,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
         newPage(req, resp, session,
             "azkaban/webapp/servlet/velocity/executionspage.vm");
 
-    final List<Pair<ExecutableFlow, Executor>> runningFlows =
+    final List<Pair<ExecutableFlow, Optional<Executor>>> runningFlows =
         this.executorManager.getActiveFlowsWithExecutor();
     page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
 
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 385fc3b..2fbfefb 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -93,8 +93,8 @@
                 <a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
               </td>
               <td>
-                #if (${flow.getSecond()})
-                  ${flow.getSecond().getId()}
+                #if (${flow.getSecond().isPresent()})
+                  ${flow.getSecond().get().getId()}
                 #else
                   -
                 #end