diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 938b5d4..c8de86d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -20,6 +20,7 @@ import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
import azkaban.utils.GZIPUtils;
import azkaban.utils.Pair;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -53,7 +54,8 @@ public class FetchActiveFlowDao {
}
}
- private static class FetchActiveExecutableFlows implements
+ @VisibleForTesting
+ static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
// Select running and executor assigned flows
@@ -87,7 +89,9 @@ public class FetchActiveFlowDao {
final boolean executorStatus = rs.getBoolean(7);
if (data == null) {
- execFlows.put(id, null);
+ logger.warn("Execution id " + id + " has flow_data=null. To clean up, update status to "
+ + "FAILED manually, eg. "
+ + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
} else {
final EncodingType encType = EncodingType.fromInteger(encodingType);
try {
diff --git a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
new file mode 100644
index 0000000..3093b97
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
@@ -0,0 +1,75 @@
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import azkaban.db.EncodingType;
+import azkaban.executor.FetchActiveFlowDao.FetchActiveExecutableFlows;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.TestUtils;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FetchActiveFlowDaoTest {
+
+ private ResultSet rs;
+
+ @Before
+ public void setUp() throws Exception {
+ this.rs = Mockito.mock(ResultSet.class);
+ }
+
+ @Test
+ public void handleResultMissingExecutor() throws Exception {
+ final FetchActiveExecutableFlows resultHandler = new FetchActiveExecutableFlows();
+ mockResultWithData();
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> result = resultHandler
+ .handle(this.rs);
+ assertThat(result.containsKey(1)).isTrue();
+ assertThat(result.get(1).getFirst().getExecutor().isPresent()).isFalse();
+ }
+
+ @Test
+ public void handleResultNullData() throws Exception {
+ final FetchActiveExecutableFlows resultHandler = new FetchActiveExecutableFlows();
+ mockResultWithNullData();
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> result = resultHandler
+ .handle(this.rs);
+ assertThat(result).isEmpty();
+ }
+
+ private void mockResultWithData() throws Exception {
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ final String json = JSONUtils.toJSON(flow.toObject());
+ final byte[] data = json.getBytes("UTF-8");
+ mockExecution(EncodingType.PLAIN.getNumVal(), data);
+ }
+
+ private void mockResultWithNullData() throws SQLException {
+ mockExecution(0, null);
+ }
+
+ private void mockExecution(final int encodingType, final byte[] flowData) throws SQLException {
+ when(this.rs.next()).thenReturn(true).thenReturn(false);
+ // execution id
+ when(this.rs.getInt(1)).thenReturn(1);
+ // encodingType
+ when(this.rs.getInt(2)).thenReturn(encodingType);
+ // data
+ when(this.rs.getBytes(3)).thenReturn(flowData);
+ // executor host
+ when(this.rs.getString(4)).thenReturn(null);
+ // executor port
+ when(this.rs.getInt(5)).thenReturn(0);
+ // executorId
+ when(this.rs.getInt(6)).thenReturn(1);
+ // executorStatus
+ when(this.rs.getBoolean(7)).thenReturn(false);
+ }
+
+}