azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 7702a73..7657588 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -39,9 +39,6 @@ public interface ExecutorLoader {
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException;
 
-  Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
-      throws ExecutorManagerException;
-
   List<ExecutableFlow> fetchFlowHistory(int skip, int num)
       throws ExecutorManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 4e3b643..93796a8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -23,20 +23,16 @@ import azkaban.utils.Pair;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.log4j.Logger;
 
 @Singleton
 public class FetchActiveFlowDao {
 
-  private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
   private final DatabaseOperator dbOperator;
 
   @Inject
@@ -54,23 +50,6 @@ public class FetchActiveFlowDao {
     }
   }
 
-  Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
-      throws ExecutorManagerException {
-    try {
-      final List<Pair<ExecutionReference, ExecutableFlow>> flows =
-          this.dbOperator
-              .query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
-                  new FetchActiveExecutableFlowByExecId(), execId);
-      if (flows.isEmpty()) {
-        return null;
-      } else {
-        return flows.get(0);
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows by exec id", e);
-    }
-  }
-
   private static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
 
@@ -126,57 +105,4 @@ public class FetchActiveFlowDao {
     }
   }
 
-  private static class FetchActiveExecutableFlowByExecId implements
-      ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
-
-    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
-        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
-            + "et.port port, et.id executorId, et.active executorStatus"
-            + " FROM execution_flows ex"
-            + " INNER JOIN "
-            + " executors et ON ex.executor_id = et.id"
-            + " Where ex.exec_id = ? AND ex.status NOT IN ("
-            + Status.SUCCEEDED.getNumVal() + ", "
-            + Status.KILLED.getNumVal() + ", "
-            + Status.FAILED.getNumVal() + ")";
-
-    @Override
-    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
-        throws SQLException {
-      if (!rs.next()) {
-        return Collections.emptyList();
-      }
-
-      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
-          new ArrayList<>();
-      do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-        final String host = rs.getString(4);
-        final int port = rs.getInt(5);
-        final int executorId = rs.getInt(6);
-        final boolean executorStatus = rs.getBoolean(7);
-
-        if (data == null) {
-          logger.error("Found a flow with empty data blob exec_id: " + id);
-        } else {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          try {
-            final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(
-                    GZIPUtils.transformBytesToObject(data, encType));
-
-            final Executor executor = new Executor(executorId, host, port, executorStatus);
-            final ExecutionReference ref = new ExecutionReference(id, executor);
-            execFlows.add(new Pair<>(ref, exFlow));
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
-        }
-      } while (rs.next());
-
-      return execFlows;
-    }
-  }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index debe44e..eca1744 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -102,13 +102,6 @@ public class JdbcExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
-      throws ExecutorManagerException {
-
-    return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
-  }
-
-  @Override
   public int fetchNumExecutableFlows() throws ExecutorManagerException {
     return this.numExecutionsDao.fetchNumExecutableFlows();
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index d6e3ca0..e833bd6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -39,21 +39,19 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class ExecutionFlowDaoTest {
 
   private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
   private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
-
+  private static final Props props = new Props();
   private static DatabaseOperator dbOperator;
   private ExecutionFlowDao executionFlowDao;
   private ExecutorDao executorDao;
   private AssignExecutorDao assignExecutor;
   private FetchActiveFlowDao fetchActiveFlowDao;
   private ExecutionJobDao executionJobDao;
-  private static final Props props = new Props();
   private ProjectLoader loader;
 
   @BeforeClass
@@ -97,9 +95,9 @@ public class ExecutionFlowDaoTest {
   }
 
   private void createTestProject() {
-    String projectName = "exectest1";
-    String projectDescription = "This is my new project";
-    User user = new User("testUser1");
+    final String projectName = "exectest1";
+    final String projectDescription = "This is my new project";
+    final User user = new User("testUser1");
     this.loader.createNewProject(projectName, projectDescription, user);
   }
 
@@ -156,11 +154,12 @@ public class ExecutionFlowDaoTest {
     createTestProject();
     final ExecutableFlow flow = createTestFlow();
     this.executionFlowDao.uploadExecutableFlow(flow);
-    final List<ExecutableFlow> flowList1 = this.executionFlowDao.fetchFlowHistory("exectest1", "", "",0, -1, -1, 0, 16);
+    final List<ExecutableFlow> flowList1 = this.executionFlowDao
+        .fetchFlowHistory("exectest1", "", "", 0, -1, -1, 0, 16);
     assertThat(flowList1.size()).isEqualTo(1);
 
     final ExecutableFlow fetchFlow =
-            this.executionFlowDao.fetchExecutableFlow(flow.getExecutionId());
+        this.executionFlowDao.fetchExecutableFlow(flow.getExecutionId());
     assertTwoFlowSame(flowList1.get(0), fetchFlow);
   }
 
@@ -317,20 +316,6 @@ public class ExecutionFlowDaoTest {
   }
 
   @Test
-  @Ignore
-  // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
-  // test methods as well.
-  public void testFetchActiveFlowsReferenceChanged() throws Exception {
-  }
-
-  @Test
-  @Ignore
-  // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
-  // test methods as well.
-  public void testFetchActiveFlowByExecId() throws Exception {
-  }
-
-  @Test
   public void testUploadAndFetchExecutableNode() throws Exception {
 
     final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 6b4db57..0bf798d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -76,12 +76,6 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
-      throws ExecutorManagerException {
-    return this.activeFlows.get(execId);
-  }
-
-  @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
       final int skip, final int num) throws ExecutorManagerException {
     return null;
@@ -99,10 +93,6 @@ public class MockExecutorLoader implements ExecutorLoader {
     this.refs.remove(execId);
   }
 
-  public boolean hasActiveExecutableReference(final int execId) {
-    return this.refs.containsKey(execId);
-  }
-
   @Override
   public void uploadLogFile(final int execId, final String name, final int attempt,
       final File... files)
@@ -171,10 +161,6 @@ public class MockExecutorLoader implements ExecutorLoader {
     return 0;
   }
 
-  public int getFlowUpdateCount() {
-    return this.flowUpdateCount;
-  }
-
   public Integer getNodeUpdateCount(final String jobId) {
     return this.jobUpdateCount.get(jobId);
   }