Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 6483e22..bc9e9df 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -52,6 +52,7 @@ public class ExecutionFlowDao {
+ "values (?,?,?,?,?,?,?)";
final long submitTime = System.currentTimeMillis();
flow.setStatus(Status.PREPARING);
+ flow.setSubmitTime(submitTime);
/**
* Why we need a transaction to get last insert ID?
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index db0e807..5c0e990 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -42,6 +42,9 @@ public interface ExecutorLoader {
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
throws ExecutorManagerException;
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+ throws ExecutorManagerException;
+
Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 628c369..a137f3e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -18,6 +18,8 @@ package azkaban.executor;
import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
import azkaban.utils.GZIPUtils;
import azkaban.utils.Pair;
import com.google.common.annotations.VisibleForTesting;
@@ -46,13 +48,9 @@ public class FetchActiveFlowDao {
private static Pair<ExecutionReference, ExecutableFlow> getExecutableFlowHelper(
final ResultSet rs) throws SQLException {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
- final String host = rs.getString(4);
- final int port = rs.getInt(5);
- final int executorId = rs.getInt(6);
- final boolean executorStatus = rs.getBoolean(7);
+ final int id = rs.getInt("exec_id");
+ final int encodingType = rs.getInt("enc_type");
+ final byte[] data = rs.getBytes("flow_data");
if (data == null) {
logger.warn("Execution id " + id + " has flow_data = null. To clean up, update status to "
@@ -60,27 +58,51 @@ public class FetchActiveFlowDao {
+ "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
} else {
final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final ExecutableFlow exFlow;
try {
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(
- GZIPUtils.transformBytesToObject(data, encType));
- final Executor executor;
- if (host == null) {
- logger.warn("Executor id " + executorId + " (on execution " +
- id + ") wasn't found");
- executor = null;
- } else {
- executor = new Executor(executorId, host, port, executorStatus);
- }
- final ExecutionReference ref = new ExecutionReference(id, executor);
- return new Pair<>(ref, exFlow);
+ exFlow = ExecutableFlow.createExecutableFlowFromObject(
+ GZIPUtils.transformBytesToObject(data, encType));
} catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
+ return getPairWithExecutorInfo(rs, exFlow);
}
return null;
}
+ private static Pair<ExecutionReference, ExecutableFlow> getPairWithExecutorInfo(final ResultSet rs,
+ final ExecutableFlow exFlow) throws SQLException {
+ final int executorId = rs.getInt("executorId");
+ final String host = rs.getString("host");
+ final int port = rs.getInt("port");
+ final Executor executor;
+ if (host == null) {
+ logger.warn("Executor id " + executorId + " (on execution " +
+ exFlow.getExecutionId() + ") wasn't found");
+ executor = null;
+ } else {
+ final boolean executorStatus = rs.getBoolean("executorStatus");
+ executor = new Executor(executorId, host, port, executorStatus);
+ }
+ final ExecutionReference ref = new ExecutionReference(exFlow.getExecutionId(), executor);
+ return new Pair<>(ref, exFlow);
+ }
+
+ private static Pair<ExecutionReference, ExecutableFlow> getExecutableFlowMetadataHelper(
+ final ResultSet rs) throws SQLException {
+ final Flow flow = new Flow(rs.getString("flow_id"));
+ final Project project = new Project(rs.getInt("project_id"), null);
+ project.setVersion(rs.getInt("version"));
+ final ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionId(rs.getInt("exec_id"));
+ exFlow.setStatus(Status.fromInteger(rs.getInt("status")));
+ exFlow.setSubmitTime(rs.getLong("submit_time"));
+ exFlow.setStartTime(rs.getLong("start_time"));
+ exFlow.setEndTime(rs.getLong("end_time"));
+ exFlow.setSubmitUser(rs.getString("submit_user"));
+ return getPairWithExecutorInfo(rs, exFlow);
+ }
+
/**
* Fetch flows that are not in finished status, including both dispatched and non-dispatched
* flows.
@@ -99,6 +121,22 @@ public class FetchActiveFlowDao {
}
/**
+ * Fetch unfinished flows similar to {@link #fetchUnfinishedFlows}, excluding flow data.
+ *
+ * @return unfinished flows map
+ * @throws ExecutorManagerException the executor manager exception
+ */
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchUnfinishedFlowsMetadata.FETCH_UNFINISHED_FLOWS_METADATA,
+ new FetchUnfinishedFlowsMetadata());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching unfinished flows metadata", e);
+ }
+ }
+
+ /**
* Fetch flows that are dispatched and not yet finished.
*
* @return active flows map
@@ -177,7 +215,45 @@ public class FetchActiveFlowDao {
do {
final Pair<ExecutionReference, ExecutableFlow> exFlow = getExecutableFlowHelper(rs);
if (exFlow != null) {
- execFlows.put(rs.getInt(1), exFlow);
+ execFlows.put(rs.getInt("exec_id"), exFlow);
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
+ @VisibleForTesting
+ static class FetchUnfinishedFlowsMetadata implements
+ ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+
+ // Select flows that are not in finished status
+ private static final String FETCH_UNFINISHED_FLOWS_METADATA =
+ "SELECT ex.exec_id exec_id, ex.project_id project_id, ex.version version, "
+ + "ex.flow_id flow_id, et.host host, et.port port, ex.executor_id executorId, "
+ + "ex.status status, ex.submit_time submit_time, ex.start_time start_time, "
+ + "ex.end_time end_time, ex.submit_user submit_user, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " LEFT JOIN "
+ + " executors et ON ex.executor_id = et.id"
+ + " Where ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")";
+
+ @Override
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
+ final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyMap();
+ }
+
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new HashMap<>();
+ do {
+ final Pair<ExecutionReference, ExecutableFlow> exFlow = getExecutableFlowMetadataHelper(rs);
+ if (exFlow != null) {
+ execFlows.put(rs.getInt("exec_id"), exFlow);
}
} while (rs.next());
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 302b889..da40fb7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -107,6 +107,12 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+ throws ExecutorManagerException {
+ return this.fetchActiveFlowDao.fetchUnfinishedFlowsMetadata();
+ }
+
+ @Override
public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
throws ExecutorManagerException {
return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 1ccf1eb..71c1168 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -21,6 +21,10 @@ import static java.util.Objects.requireNonNull;
import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionReference;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.FlowLoaderUtils.DirFilter;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
@@ -32,13 +36,16 @@ import azkaban.project.validator.ValidatorManager;
import azkaban.project.validator.XmlValidatorManager;
import azkaban.storage.StorageManager;
import azkaban.user.User;
+import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
import java.util.zip.ZipFile;
import javax.inject.Inject;
import org.slf4j.Logger;
@@ -59,16 +66,19 @@ class AzkabanProjectLoader {
private final FlowLoaderFactory flowLoaderFactory;
private final File tempDir;
private final int projectVersionRetention;
+ private final ExecutorLoader executorLoader;
@Inject
AzkabanProjectLoader(final Props props, final ProjectLoader projectLoader,
- final StorageManager storageManager, final FlowLoaderFactory flowLoaderFactory) {
+ final StorageManager storageManager, final FlowLoaderFactory flowLoaderFactory,
+ final ExecutorLoader executorLoader) {
this.props = requireNonNull(props, "Props is null");
this.projectLoader = requireNonNull(projectLoader, "project Loader is null");
this.storageManager = requireNonNull(storageManager, "Storage Manager is null");
this.flowLoaderFactory = requireNonNull(flowLoaderFactory, "Flow Loader Factory is null");
this.tempDir = new File(props.getString(ConfigurationKeys.PROJECT_TEMP_DIR, "temp"));
+ this.executorLoader = executorLoader;
if (!this.tempDir.exists()) {
log.info("Creating temp dir: " + this.tempDir.getAbsolutePath());
this.tempDir.mkdirs();
@@ -81,7 +91,7 @@ class AzkabanProjectLoader {
public Map<String, ValidationReport> uploadProject(final Project project,
final File archive, final String fileType, final User uploader, final Props additionalProps)
- throws ProjectManagerException {
+ throws ProjectManagerException, ExecutorManagerException {
log.info("Uploading files to " + project.getName());
final Map<String, ValidationReport> reports;
@@ -236,11 +246,18 @@ class AzkabanProjectLoader {
}
private void cleanUpProjectOldInstallations(final Project project)
- throws ProjectManagerException{
+ throws ProjectManagerException, ExecutorManagerException {
log.info("Cleaning up old install files older than "
+ (project.getVersion() - this.projectVersionRetention));
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = this.executorLoader
+ .fetchUnfinishedFlowsMetadata();
+ final List<Integer> versionsWithUnfinishedExecutions = unfinishedFlows.values()
+ .stream().map(pair -> pair.getSecond())
+ .filter(exflow -> exflow.getProjectId() == project.getId())
+ .map(exflow -> exflow.getVersion())
+ .collect(Collectors.toList());
this.projectLoader.cleanOlderProjectVersion(project.getId(),
- project.getVersion() - this.projectVersionRetention);
+ project.getVersion() - this.projectVersionRetention, versionsWithUnfinishedExecutions);
// Clean up storage
this.storageManager.cleanupProjectArtifacts(project.getId());
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index f229b51..677bfa6 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -54,6 +54,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
@@ -959,12 +960,22 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void cleanOlderProjectVersion(final int projectId, final int version)
- throws ProjectManagerException {
- final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=? AND version<?";
- final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
- final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
- final String UPDATE_PROJECT_VERSIONS = "UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
+ public void cleanOlderProjectVersion(final int projectId, final int version,
+ final List<Integer> excludedVersions) throws ProjectManagerException {
+
+ // Would use param of type Array from transOperator.getConnection().createArrayOf() but
+ // h2 doesn't support the Array type, so format the filter manually.
+ final String EXCLUDED_VERSIONS_FILTER = excludedVersions.stream()
+ .map(excluded -> " AND version != " + excluded).collect(Collectors.joining());
+ final String VERSION_FILTER = " AND version < ?" + EXCLUDED_VERSIONS_FILTER;
+
+ final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=?" + VERSION_FILTER;
+ final String DELETE_PROPERTIES =
+ "DELETE FROM project_properties WHERE project_id=?" + VERSION_FILTER;
+ final String DELETE_PROJECT_FILES =
+ "DELETE FROM project_files WHERE project_id=?" + VERSION_FILTER;
+ final String UPDATE_PROJECT_VERSIONS =
+ "UPDATE project_versions SET num_chunks=0 WHERE project_id=?" + VERSION_FILTER;
// Todo jamiesjc: delete flow files
final SQLTransaction<Integer> cleanOlderProjectTransaction = transOperator -> {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 41b5ebd..38c46a2 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -186,9 +186,10 @@ public interface ProjectLoader {
throws ProjectManagerException;
/**
- * Cleans all project versions less tha
+ * Cleans all project versions less than the provided version, except the versions to exclude
+ * given as argument
*/
- void cleanOlderProjectVersion(int projectId, int version)
+ void cleanOlderProjectVersion(int projectId, int version, final List<Integer> excludedVersions)
throws ProjectManagerException;
void updateProjectProperty(Project project, Props props)
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 2ab5cc0..0b059ae 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -19,6 +19,7 @@ package azkaban.project;
import static java.util.Objects.requireNonNull;
import azkaban.Constants;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.validator.ValidationReport;
@@ -35,6 +36,7 @@ import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -309,7 +311,7 @@ public class ProjectManager {
public synchronized Project purgeProject(final Project project, final User deleter)
throws ProjectManagerException {
this.projectLoader.cleanOlderProjectVersion(project.getId(),
- project.getVersion() + 1);
+ project.getVersion() + 1, Collections.emptyList());
this.projectLoader
.postEvent(project, EventType.PURGE, deleter.getUserId(), String
.format("Purged versions before %d", project.getVersion() + 1));
@@ -501,7 +503,7 @@ public class ProjectManager {
public Map<String, ValidationReport> uploadProject(final Project project,
final File archive, final String fileType, final User uploader, final Props additionalProps)
- throws ProjectManagerException {
+ throws ProjectManagerException, ExecutorManagerException {
return this.azkabanProjectLoader
.uploadProject(project, archive, fileType, uploader, additionalProps);
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index ee2692f..cff5bfd 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -337,6 +337,24 @@ public class ExecutionFlowDaoTest {
}
@Test
+ public void testFetchUnfinishedFlowsMetadata() throws Exception {
+ final List<ExecutableFlow> flows = createExecutions();
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows =
+ this.fetchActiveFlowDao.fetchUnfinishedFlowsMetadata();
+ assertFound(unfinishedFlows, flows.get(0), true);
+ assertFound(unfinishedFlows, flows.get(1), false);
+ assertFound(unfinishedFlows, flows.get(2), true);
+ assertNotFound(unfinishedFlows, flows.get(3), "Returned an execution with a finished status");
+ assertFound(unfinishedFlows, flows.get(4), false);
+ assertTwoFlowSame(unfinishedFlows.get(flows.get(0).getExecutionId()).getSecond(), flows.get(0),
+ false);
+ assertTwoFlowSame(unfinishedFlows.get(flows.get(1).getExecutionId()).getSecond(), flows.get(1),
+ false);
+ assertTwoFlowSame(unfinishedFlows.get(flows.get(2).getExecutionId()).getSecond(), flows.get(2),
+ false);
+ }
+
+ @Test
public void testFetchActiveFlowByExecId() throws Exception {
final List<ExecutableFlow> flows = createExecutions();
assertTwoFlowSame(
@@ -356,15 +374,27 @@ public class ExecutionFlowDaoTest {
private List<ExecutableFlow> createExecutions() throws Exception {
final Executor executor = this.executorDao.addExecutor("test", 1);
+
final ExecutableFlow flow1 = createExecutionAndAssign(Status.PREPARING, executor);
+
// flow2 is not assigned
final ExecutableFlow flow2 = createExecution(Status.PREPARING);
+
final ExecutableFlow flow3 = createExecutionAndAssign(Status.RUNNING, executor);
+ flow3.setStartTime(System.currentTimeMillis() + 1);
+ this.executionFlowDao.updateExecutableFlow(flow3);
+
final ExecutableFlow flow4 = createExecutionAndAssign(Status.SUCCEEDED, executor);
+ flow4.setStartTime(System.currentTimeMillis() - 2);
+ flow4.setEndTime(System.currentTimeMillis() - 1);
+ this.executionFlowDao.updateExecutableFlow(flow4);
final Executor executor2 = this.executorDao.addExecutor("test2", 2);
// flow5 is assigned to an executor that is then removed
final ExecutableFlow flow5 = createExecutionAndAssign(Status.RUNNING, executor2);
+ flow5.setStartTime(System.currentTimeMillis() + 1);
+ this.executionFlowDao.updateExecutableFlow(flow5);
+
this.executorDao.removeExecutor(executor2.getHost(), executor2.getPort());
return ImmutableList.of(flow1, flow2, flow3, flow4, flow5);
}
@@ -482,6 +512,11 @@ public class ExecutionFlowDaoTest {
}
private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
+ assertTwoFlowSame(flow1, flow2, true);
+ }
+
+ private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2,
+ final boolean compareFlowData) {
assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
assertThat(flow1.getEndTime()).isEqualTo(flow2.getEndTime());
@@ -490,9 +525,12 @@ public class ExecutionFlowDaoTest {
assertThat(flow1.getFlowId()).isEqualTo(flow2.getFlowId());
assertThat(flow1.getProjectId()).isEqualTo(flow2.getProjectId());
assertThat(flow1.getVersion()).isEqualTo(flow2.getVersion());
- assertThat(flow1.getExecutionOptions().getFailureAction())
- .isEqualTo(flow2.getExecutionOptions().getFailureAction());
- assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
+ assertThat(flow1.getSubmitUser()).isEqualTo(flow2.getSubmitUser());
+ if (compareFlowData) {
+ assertThat(flow1.getExecutionOptions().getFailureAction())
+ .isEqualTo(flow2.getExecutionOptions().getFailureAction());
+ assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
+ }
}
/**
diff --git a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
index 1307a8f..84a9aac 100644
--- a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
@@ -60,19 +60,19 @@ public class FetchActiveFlowDaoTest {
private void mockExecution(final int encodingType, final byte[] flowData) throws SQLException {
when(this.rs.next()).thenReturn(true).thenReturn(false);
// execution id
- when(this.rs.getInt(1)).thenReturn(1);
+ when(this.rs.getInt("exec_id")).thenReturn(1);
// encodingType
- when(this.rs.getInt(2)).thenReturn(encodingType);
+ when(this.rs.getInt("enc_type")).thenReturn(encodingType);
// data
- when(this.rs.getBytes(3)).thenReturn(flowData);
+ when(this.rs.getBytes("flow_data")).thenReturn(flowData);
// executor host
- when(this.rs.getString(4)).thenReturn(null);
+ when(this.rs.getString("host")).thenReturn(null);
// executor port
- when(this.rs.getInt(5)).thenReturn(0);
+ when(this.rs.getInt("port")).thenReturn(0);
// executorId
- when(this.rs.getInt(6)).thenReturn(1);
+ when(this.rs.getInt("executorId")).thenReturn(1);
// executorStatus
- when(this.rs.getBoolean(7)).thenReturn(false);
+ when(this.rs.getBoolean("executorStatus")).thenReturn(false);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index b22a385..6ee4050 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -17,6 +17,8 @@
package azkaban.executor;
import azkaban.executor.ExecutorLogEvent.EventType;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -27,7 +29,9 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -78,7 +82,33 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
throws ExecutorManagerException {
- return new ConcurrentHashMap<>();
+ return this.activeFlows;
+ }
+
+ @Override
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+ throws ExecutorManagerException {
+ return this.activeFlows.entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, e -> {
+ final ExecutableFlow metadata = getExecutableFlowMetadata(e.getValue().getSecond());
+ return new Pair<>(e.getValue().getFirst(), metadata);
+ }));
+ }
+
+ private ExecutableFlow getExecutableFlowMetadata(
+ ExecutableFlow fullExFlow) {
+ final Flow flow = new Flow(fullExFlow.getId());
+ final Project project = new Project(fullExFlow.getProjectId(), null);
+ project.setVersion(fullExFlow.getVersion());
+ flow.setVersion(fullExFlow.getVersion());
+ final ExecutableFlow metadata = new ExecutableFlow(project, flow);
+ metadata.setExecutionId(fullExFlow.getExecutionId());
+ metadata.setStatus(fullExFlow.getStatus());
+ metadata.setSubmitTime(fullExFlow.getSubmitTime());
+ metadata.setStartTime(fullExFlow.getStartTime());
+ metadata.setEndTime(fullExFlow.getEndTime());
+ metadata.setSubmitUser(fullExFlow.getSubmitUser());
+ return metadata;
}
@Override
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index fe36a47..c836779 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -26,14 +26,21 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidationStatus;
import azkaban.storage.StorageManager;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.user.User;
+import azkaban.utils.Pair;
import azkaban.utils.Props;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.net.URL;
+import java.util.Arrays;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
@@ -57,6 +64,7 @@ public class AzkabanProjectLoaderTest {
private AzkabanProjectLoader azkabanProjectLoader;
private StorageManager storageManager;
private ProjectLoader projectLoader;
+ private ExecutorLoader executorLoader;
@Before
public void setUp() throws Exception {
@@ -65,13 +73,14 @@ public class AzkabanProjectLoaderTest {
this.storageManager = mock(StorageManager.class);
this.projectLoader = mock(ProjectLoader.class);
+ this.executorLoader = mock(ExecutorLoader.class);
this.azkabanProjectLoader = new AzkabanProjectLoader(props, this.projectLoader,
- this.storageManager, new FlowLoaderFactory(props));
+ this.storageManager, new FlowLoaderFactory(props), this.executorLoader);
}
@Test
- public void uploadProject() throws Exception {
+ public void uploadProject() throws ExecutorManagerException {
when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
final URL resource = requireNonNull(
@@ -79,11 +88,20 @@ public class AzkabanProjectLoaderTest {
final File projectZipFile = new File(resource.getPath());
final User uploader = new User("test_user");
+ // to test excluding running versions in args of cleanOlderProjectVersion
+ final ExecutableFlow runningFlow = new ExecutableFlow(this.project, new Flow("x"));
+ runningFlow.setVersion(this.VERSION);
+ when(this.executorLoader.fetchUnfinishedFlowsMetadata())
+ .thenReturn(ImmutableMap.of(-1, new Pair<>(null, runningFlow)));
+
+ this.project.setVersion(this.VERSION);
checkValidationReport(this.azkabanProjectLoader
.uploadProject(this.project, projectZipFile, "zip", uploader, null));
verify(this.storageManager)
.uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
+ verify(this.projectLoader).cleanOlderProjectVersion(this.project.getId(), this.VERSION - 3,
+ Arrays.asList(this.VERSION));
}
@Test
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 7f73e57..4892047 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -365,12 +365,31 @@ public class JdbcProjectImplTest {
Assert.assertEquals(fileHandler.getNumChunks(), 1);
assertNumChunks(project, newVersion, 1);
- this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1);
+ this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1, Collections.emptyList());
assertNumChunks(project, newVersion, 0);
assertGetUploadedFileOfCleanedVersion(project.getId(), newVersion);
}
+ @Test
+ public void cleanOlderProjectVersionExcludedVersion() {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+ final int newVersion2 = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.uploadProjectFile(project.getId(), newVersion2, testFile, "uploadUser1");
+ this.loader.cleanOlderProjectVersion(project.getId(), newVersion2 + 1,
+ Arrays.asList(newVersion, newVersion2));
+ assertNumChunks(project, newVersion, 1);
+ assertNumChunks(project, newVersion2, 1);
+ this.loader.cleanOlderProjectVersion(project.getId(), newVersion2 + 1,
+ Arrays.asList(newVersion));
+ assertNumChunks(project, newVersion, 1);
+ assertNumChunks(project, newVersion2, 0);
+ }
+
private void assertNumChunks(final Project project, final int version, final int expectedChunks) {
final ProjectFileHandler fileHandler = this.loader
.fetchProjectMetaData(project.getId(), version);
diff --git a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
index 44a9e7a..94353ff 100644
--- a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
@@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import azkaban.executor.ExecutorLoader;
import azkaban.storage.StorageManager;
import azkaban.user.User;
import azkaban.utils.Props;
@@ -35,14 +36,16 @@ public class ProjectManagerTest {
private ProjectLoader projectLoader;
private StorageManager storageManager;
private Props props;
+ private ExecutorLoader executorLoader;
@Before
public void setUp() throws Exception {
this.props = new Props();
this.storageManager = mock(StorageManager.class);
this.projectLoader = mock(ProjectLoader.class);
+ this.executorLoader = mock(ExecutorLoader.class);
this.azkabanProjectLoader = new AzkabanProjectLoader(this.props, this.projectLoader,
- this.storageManager, mock(FlowLoaderFactory.class));
+ this.storageManager, mock(FlowLoaderFactory.class), executorLoader);
this.manager = new ProjectManager(this.azkabanProjectLoader, this.projectLoader,
this.storageManager, this.props);
}
diff --git a/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java b/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
index 5f418e1..cd4088a 100644
--- a/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
+++ b/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
@@ -16,6 +16,7 @@
package azkaban.restli;
import azkaban.Constants.ConfigurationKeys;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
@@ -58,7 +59,7 @@ public class ProjectManagerResource extends ResourceContextHolder {
@ActionParam("projectName") final String projectName,
@ActionParam("packageUrl") final String packageUrl)
throws ProjectManagerException, RestLiServiceException, UserManagerException,
- ServletException, IOException, SchedulerException {
+ ServletException, IOException, SchedulerException, ExecutorManagerException {
logger.info("Deploy called. {projectName: " + projectName + ", packageUrl:" + packageUrl + "}");
final String ip = ResourceUtils.getRealClientIpAddr(this.getContext());
@@ -144,7 +145,7 @@ public class ProjectManagerResource extends ResourceContextHolder {
logger.info("Deploy: project " + projectName + " version is " + project.getVersion()
+ ", reference is " + System.identityHashCode(project));
return Integer.toString(project.getVersion());
- } catch (final ProjectManagerException | SchedulerException e) {
+ } catch (final ProjectManagerException | SchedulerException | ExecutorManagerException e) {
final String errorMsg = "Upload of project " + project + " from " + archiveFile + " failed";
logger.error(errorMsg, e);
throw e;