Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 7702a73..7657588 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -39,9 +39,6 @@ public interface ExecutorLoader {
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException;
- Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
- throws ExecutorManagerException;
-
List<ExecutableFlow> fetchFlowHistory(int skip, int num)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 4e3b643..93796a8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -23,20 +23,16 @@ import azkaban.utils.Pair;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.log4j.Logger;
@Singleton
public class FetchActiveFlowDao {
- private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
private final DatabaseOperator dbOperator;
@Inject
@@ -54,23 +50,6 @@ public class FetchActiveFlowDao {
}
}
- Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
- throws ExecutorManagerException {
- try {
- final List<Pair<ExecutionReference, ExecutableFlow>> flows =
- this.dbOperator
- .query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
- new FetchActiveExecutableFlowByExecId(), execId);
- if (flows.isEmpty()) {
- return null;
- } else {
- return flows.get(0);
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows by exec id", e);
- }
- }
-
private static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
@@ -126,57 +105,4 @@ public class FetchActiveFlowDao {
}
}
- private static class FetchActiveExecutableFlowByExecId implements
- ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
-
- private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
- + "et.port port, et.id executorId, et.active executorStatus"
- + " FROM execution_flows ex"
- + " INNER JOIN "
- + " executors et ON ex.executor_id = et.id"
- + " Where ex.exec_id = ? AND ex.status NOT IN ("
- + Status.SUCCEEDED.getNumVal() + ", "
- + Status.KILLED.getNumVal() + ", "
- + Status.FAILED.getNumVal() + ")";
-
- @Override
- public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
- throws SQLException {
- if (!rs.next()) {
- return Collections.emptyList();
- }
-
- final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
- new ArrayList<>();
- do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
- final String host = rs.getString(4);
- final int port = rs.getInt(5);
- final int executorId = rs.getInt(6);
- final boolean executorStatus = rs.getBoolean(7);
-
- if (data == null) {
- logger.error("Found a flow with empty data blob exec_id: " + id);
- } else {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- try {
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(
- GZIPUtils.transformBytesToObject(data, encType));
-
- final Executor executor = new Executor(executorId, host, port, executorStatus);
- final ExecutionReference ref = new ExecutionReference(id, executor);
- execFlows.add(new Pair<>(ref, exFlow));
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
- }
- } while (rs.next());
-
- return execFlows;
- }
- }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index debe44e..eca1744 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -102,13 +102,6 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
- public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
- throws ExecutorManagerException {
-
- return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
- }
-
- @Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
return this.numExecutionsDao.fetchNumExecutableFlows();
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index d6e3ca0..e833bd6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -39,21 +39,19 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
public class ExecutionFlowDaoTest {
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
-
+ private static final Props props = new Props();
private static DatabaseOperator dbOperator;
private ExecutionFlowDao executionFlowDao;
private ExecutorDao executorDao;
private AssignExecutorDao assignExecutor;
private FetchActiveFlowDao fetchActiveFlowDao;
private ExecutionJobDao executionJobDao;
- private static final Props props = new Props();
private ProjectLoader loader;
@BeforeClass
@@ -97,9 +95,9 @@ public class ExecutionFlowDaoTest {
}
private void createTestProject() {
- String projectName = "exectest1";
- String projectDescription = "This is my new project";
- User user = new User("testUser1");
+ final String projectName = "exectest1";
+ final String projectDescription = "This is my new project";
+ final User user = new User("testUser1");
this.loader.createNewProject(projectName, projectDescription, user);
}
@@ -156,11 +154,12 @@ public class ExecutionFlowDaoTest {
createTestProject();
final ExecutableFlow flow = createTestFlow();
this.executionFlowDao.uploadExecutableFlow(flow);
- final List<ExecutableFlow> flowList1 = this.executionFlowDao.fetchFlowHistory("exectest1", "", "",0, -1, -1, 0, 16);
+ final List<ExecutableFlow> flowList1 = this.executionFlowDao
+ .fetchFlowHistory("exectest1", "", "", 0, -1, -1, 0, 16);
assertThat(flowList1.size()).isEqualTo(1);
final ExecutableFlow fetchFlow =
- this.executionFlowDao.fetchExecutableFlow(flow.getExecutionId());
+ this.executionFlowDao.fetchExecutableFlow(flow.getExecutionId());
assertTwoFlowSame(flowList1.get(0), fetchFlow);
}
@@ -317,20 +316,6 @@ public class ExecutionFlowDaoTest {
}
@Test
- @Ignore
- // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
- // test methods as well.
- public void testFetchActiveFlowsReferenceChanged() throws Exception {
- }
-
- @Test
- @Ignore
- // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
- // test methods as well.
- public void testFetchActiveFlowByExecId() throws Exception {
- }
-
- @Test
public void testUploadAndFetchExecutableNode() throws Exception {
final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 6b4db57..0bf798d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -76,12 +76,6 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
- throws ExecutorManagerException {
- return this.activeFlows.get(execId);
- }
-
- @Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num) throws ExecutorManagerException {
return null;
@@ -99,10 +93,6 @@ public class MockExecutorLoader implements ExecutorLoader {
this.refs.remove(execId);
}
- public boolean hasActiveExecutableReference(final int execId) {
- return this.refs.containsKey(execId);
- }
-
@Override
public void uploadLogFile(final int execId, final String name, final int attempt,
final File... files)
@@ -171,10 +161,6 @@ public class MockExecutorLoader implements ExecutorLoader {
return 0;
}
- public int getFlowUpdateCount() {
- return this.flowUpdateCount;
- }
-
public Integer getNodeUpdateCount(final String jobId) {
return this.jobUpdateCount.get(jobId);
}