azkaban-aplcache

Fix NPE in FetchActiveFlowDao (#1862) Ignore active executions

7/21/2018 9:34:31 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 938b5d4..c8de86d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -20,6 +20,7 @@ import azkaban.db.DatabaseOperator;
 import azkaban.db.EncodingType;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.Pair;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -53,7 +54,8 @@ public class FetchActiveFlowDao {
     }
   }
 
-  private static class FetchActiveExecutableFlows implements
+  @VisibleForTesting
+  static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
 
     // Select running and executor assigned flows
@@ -87,7 +89,9 @@ public class FetchActiveFlowDao {
         final boolean executorStatus = rs.getBoolean(7);
 
         if (data == null) {
-          execFlows.put(id, null);
+          logger.warn("Execution id " + id + " has flow_data=null. To clean up, update status to "
+              + "FAILED manually, eg. "
+              + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
         } else {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
           try {
diff --git a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
new file mode 100644
index 0000000..3093b97
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
@@ -0,0 +1,75 @@
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import azkaban.db.EncodingType;
+import azkaban.executor.FetchActiveFlowDao.FetchActiveExecutableFlows;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.TestUtils;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FetchActiveFlowDaoTest {
+
+  private ResultSet rs;
+
+  @Before
+  public void setUp() throws Exception {
+    this.rs = Mockito.mock(ResultSet.class);
+  }
+
+  @Test
+  public void handleResultMissingExecutor() throws Exception {
+    final FetchActiveExecutableFlows resultHandler = new FetchActiveExecutableFlows();
+    mockResultWithData();
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> result = resultHandler
+        .handle(this.rs);
+    assertThat(result.containsKey(1)).isTrue();
+    assertThat(result.get(1).getFirst().getExecutor().isPresent()).isFalse();
+  }
+
+  @Test
+  public void handleResultNullData() throws Exception {
+    final FetchActiveExecutableFlows resultHandler = new FetchActiveExecutableFlows();
+    mockResultWithNullData();
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> result = resultHandler
+        .handle(this.rs);
+    assertThat(result).isEmpty();
+  }
+
+  private void mockResultWithData() throws Exception {
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    final String json = JSONUtils.toJSON(flow.toObject());
+    final byte[] data = json.getBytes("UTF-8");
+    mockExecution(EncodingType.PLAIN.getNumVal(), data);
+  }
+
+  private void mockResultWithNullData() throws SQLException {
+    mockExecution(0, null);
+  }
+
+  private void mockExecution(final int encodingType, final byte[] flowData) throws SQLException {
+    when(this.rs.next()).thenReturn(true).thenReturn(false);
+    // execution id
+    when(this.rs.getInt(1)).thenReturn(1);
+    // encodingType
+    when(this.rs.getInt(2)).thenReturn(encodingType);
+    // data
+    when(this.rs.getBytes(3)).thenReturn(flowData);
+    // executor host
+    when(this.rs.getString(4)).thenReturn(null);
+    // executor port
+    when(this.rs.getInt(5)).thenReturn(0);
+    // executorId
+    when(this.rs.getInt(6)).thenReturn(1);
+    // executorStatus
+    when(this.rs.getBoolean(7)).thenReturn(false);
+  }
+
+}