azkaban-aplcache

Don't delete (clean) project versions with running executions

1/11/2019 3:57:21 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 6483e22..bc9e9df 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -52,6 +52,7 @@ public class ExecutionFlowDao {
         + "values (?,?,?,?,?,?,?)";
     final long submitTime = System.currentTimeMillis();
     flow.setStatus(Status.PREPARING);
+    flow.setSubmitTime(submitTime);
 
     /**
      * Why we need a transaction to get last insert ID?
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index db0e807..5c0e990 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -42,6 +42,9 @@ public interface ExecutorLoader {
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
       throws ExecutorManagerException;
 
+  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+      throws ExecutorManagerException;
+
   Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
       throws ExecutorManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 628c369..a137f3e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -18,6 +18,8 @@ package azkaban.executor;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.db.EncodingType;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.Pair;
 import com.google.common.annotations.VisibleForTesting;
@@ -46,13 +48,9 @@ public class FetchActiveFlowDao {
 
   private static Pair<ExecutionReference, ExecutableFlow> getExecutableFlowHelper(
       final ResultSet rs) throws SQLException {
-    final int id = rs.getInt(1);
-    final int encodingType = rs.getInt(2);
-    final byte[] data = rs.getBytes(3);
-    final String host = rs.getString(4);
-    final int port = rs.getInt(5);
-    final int executorId = rs.getInt(6);
-    final boolean executorStatus = rs.getBoolean(7);
+    final int id = rs.getInt("exec_id");
+    final int encodingType = rs.getInt("enc_type");
+    final byte[] data = rs.getBytes("flow_data");
 
     if (data == null) {
       logger.warn("Execution id " + id + " has flow_data = null. To clean up, update status to "
@@ -60,27 +58,51 @@ public class FetchActiveFlowDao {
           + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
     } else {
       final EncodingType encType = EncodingType.fromInteger(encodingType);
+      final ExecutableFlow exFlow;
       try {
-        final ExecutableFlow exFlow =
-            ExecutableFlow.createExecutableFlowFromObject(
-                GZIPUtils.transformBytesToObject(data, encType));
-        final Executor executor;
-        if (host == null) {
-          logger.warn("Executor id " + executorId + " (on execution " +
-              id + ") wasn't found");
-          executor = null;
-        } else {
-          executor = new Executor(executorId, host, port, executorStatus);
-        }
-        final ExecutionReference ref = new ExecutionReference(id, executor);
-        return new Pair<>(ref, exFlow);
+        exFlow = ExecutableFlow.createExecutableFlowFromObject(
+            GZIPUtils.transformBytesToObject(data, encType));
       } catch (final IOException e) {
         throw new SQLException("Error retrieving flow data " + id, e);
       }
+      return getPairWithExecutorInfo(rs, exFlow);
     }
     return null;
   }
 
+  private static Pair<ExecutionReference, ExecutableFlow> getPairWithExecutorInfo(final ResultSet rs,
+      final ExecutableFlow exFlow) throws SQLException {
+    final int executorId = rs.getInt("executorId");
+    final String host = rs.getString("host");
+    final int port = rs.getInt("port");
+    final Executor executor;
+    if (host == null) {
+      logger.warn("Executor id " + executorId + " (on execution " +
+          exFlow.getExecutionId() + ") wasn't found");
+      executor = null;
+    } else {
+      final boolean executorStatus = rs.getBoolean("executorStatus");
+      executor = new Executor(executorId, host, port, executorStatus);
+    }
+    final ExecutionReference ref = new ExecutionReference(exFlow.getExecutionId(), executor);
+    return new Pair<>(ref, exFlow);
+  }
+
+  private static Pair<ExecutionReference, ExecutableFlow> getExecutableFlowMetadataHelper(
+      final ResultSet rs) throws SQLException {
+    final Flow flow = new Flow(rs.getString("flow_id"));
+    final Project project = new Project(rs.getInt("project_id"), null);
+    project.setVersion(rs.getInt("version"));
+    final ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+    exFlow.setExecutionId(rs.getInt("exec_id"));
+    exFlow.setStatus(Status.fromInteger(rs.getInt("status")));
+    exFlow.setSubmitTime(rs.getLong("submit_time"));
+    exFlow.setStartTime(rs.getLong("start_time"));
+    exFlow.setEndTime(rs.getLong("end_time"));
+    exFlow.setSubmitUser(rs.getString("submit_user"));
+    return getPairWithExecutorInfo(rs, exFlow);
+  }
+
   /**
    * Fetch flows that are not in finished status, including both dispatched and non-dispatched
    * flows.
@@ -99,6 +121,22 @@ public class FetchActiveFlowDao {
   }
 
   /**
+   * Fetch unfinished flows similar to {@link #fetchUnfinishedFlows}, excluding flow data.
+   *
+   * @return unfinished flows map
+   * @throws ExecutorManagerException the executor manager exception
+   */
+  public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchUnfinishedFlowsMetadata.FETCH_UNFINISHED_FLOWS_METADATA,
+          new FetchUnfinishedFlowsMetadata());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching unfinished flows metadata", e);
+    }
+  }
+
+  /**
    * Fetch flows that are dispatched and not yet finished.
    *
    * @return active flows map
@@ -177,7 +215,45 @@ public class FetchActiveFlowDao {
       do {
         final Pair<ExecutionReference, ExecutableFlow> exFlow = getExecutableFlowHelper(rs);
         if (exFlow != null) {
-          execFlows.put(rs.getInt(1), exFlow);
+          execFlows.put(rs.getInt("exec_id"), exFlow);
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
+  @VisibleForTesting
+  static class FetchUnfinishedFlowsMetadata implements
+      ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+
+    // Select flows that are not in finished status
+    private static final String FETCH_UNFINISHED_FLOWS_METADATA =
+        "SELECT ex.exec_id exec_id, ex.project_id project_id, ex.version version, "
+            + "ex.flow_id flow_id, et.host host, et.port port, ex.executor_id executorId, "
+            + "ex.status status, ex.submit_time submit_time, ex.start_time start_time, "
+            + "ex.end_time end_time, ex.submit_user submit_user, et.active executorStatus"
+            + " FROM execution_flows ex"
+            + " LEFT JOIN "
+            + " executors et ON ex.executor_id = et.id"
+            + " Where ex.status NOT IN ("
+            + Status.SUCCEEDED.getNumVal() + ", "
+            + Status.KILLED.getNumVal() + ", "
+            + Status.FAILED.getNumVal() + ")";
+
+    @Override
+    public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
+        final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyMap();
+      }
+
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
+          new HashMap<>();
+      do {
+        final Pair<ExecutionReference, ExecutableFlow> exFlow = getExecutableFlowMetadataHelper(rs);
+        if (exFlow != null) {
+          execFlows.put(rs.getInt("exec_id"), exFlow);
         }
       } while (rs.next());
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 302b889..da40fb7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -107,6 +107,12 @@ public class JdbcExecutorLoader implements ExecutorLoader {
   }
 
   @Override
+  public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+      throws ExecutorManagerException {
+    return this.fetchActiveFlowDao.fetchUnfinishedFlowsMetadata();
+  }
+
+  @Override
   public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
       throws ExecutorManagerException {
     return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 1ccf1eb..71c1168 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -21,6 +21,10 @@ import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionReference;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.FlowLoaderUtils.DirFilter;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
@@ -32,13 +36,16 @@ import azkaban.project.validator.ValidatorManager;
 import azkaban.project.validator.XmlValidatorManager;
 import azkaban.storage.StorageManager;
 import azkaban.user.User;
+import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import java.util.zip.ZipFile;
 import javax.inject.Inject;
 import org.slf4j.Logger;
@@ -59,16 +66,19 @@ class AzkabanProjectLoader {
   private final FlowLoaderFactory flowLoaderFactory;
   private final File tempDir;
   private final int projectVersionRetention;
+  private final ExecutorLoader executorLoader;
 
   @Inject
   AzkabanProjectLoader(final Props props, final ProjectLoader projectLoader,
-      final StorageManager storageManager, final FlowLoaderFactory flowLoaderFactory) {
+      final StorageManager storageManager, final FlowLoaderFactory flowLoaderFactory,
+      final ExecutorLoader executorLoader) {
     this.props = requireNonNull(props, "Props is null");
     this.projectLoader = requireNonNull(projectLoader, "project Loader is null");
     this.storageManager = requireNonNull(storageManager, "Storage Manager is null");
     this.flowLoaderFactory = requireNonNull(flowLoaderFactory, "Flow Loader Factory is null");
 
     this.tempDir = new File(props.getString(ConfigurationKeys.PROJECT_TEMP_DIR, "temp"));
+    this.executorLoader = executorLoader;
     if (!this.tempDir.exists()) {
       log.info("Creating temp dir: " + this.tempDir.getAbsolutePath());
       this.tempDir.mkdirs();
@@ -81,7 +91,7 @@ class AzkabanProjectLoader {
 
   public Map<String, ValidationReport> uploadProject(final Project project,
       final File archive, final String fileType, final User uploader, final Props additionalProps)
-      throws ProjectManagerException {
+      throws ProjectManagerException, ExecutorManagerException {
     log.info("Uploading files to " + project.getName());
     final Map<String, ValidationReport> reports;
 
@@ -236,11 +246,18 @@ class AzkabanProjectLoader {
   }
 
   private void cleanUpProjectOldInstallations(final Project project)
-      throws ProjectManagerException{
+      throws ProjectManagerException, ExecutorManagerException {
     log.info("Cleaning up old install files older than "
         + (project.getVersion() - this.projectVersionRetention));
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = this.executorLoader
+        .fetchUnfinishedFlowsMetadata();
+    final List<Integer> versionsWithUnfinishedExecutions = unfinishedFlows.values()
+        .stream().map(pair -> pair.getSecond())
+        .filter(exflow -> exflow.getProjectId() == project.getId())
+        .map(exflow -> exflow.getVersion())
+        .collect(Collectors.toList());
     this.projectLoader.cleanOlderProjectVersion(project.getId(),
-        project.getVersion() - this.projectVersionRetention);
+        project.getVersion() - this.projectVersionRetention, versionsWithUnfinishedExecutions);
 
     // Clean up storage
     this.storageManager.cleanupProjectArtifacts(project.getId());
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index f229b51..677bfa6 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -54,6 +54,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.io.IOUtils;
@@ -959,12 +960,22 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void cleanOlderProjectVersion(final int projectId, final int version)
-      throws ProjectManagerException {
-    final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=? AND version<?";
-    final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
-    final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
-    final String UPDATE_PROJECT_VERSIONS = "UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
+  public void cleanOlderProjectVersion(final int projectId, final int version,
+      final List<Integer> excludedVersions) throws ProjectManagerException {
+
+    // Would use param of type Array from transOperator.getConnection().createArrayOf() but
+    // h2 doesn't support the Array type, so format the filter manually.
+    final String EXCLUDED_VERSIONS_FILTER = excludedVersions.stream()
+        .map(excluded -> " AND version != " + excluded).collect(Collectors.joining());
+    final String VERSION_FILTER = " AND version < ?" + EXCLUDED_VERSIONS_FILTER;
+
+    final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=?" + VERSION_FILTER;
+    final String DELETE_PROPERTIES =
+        "DELETE FROM project_properties WHERE project_id=?" + VERSION_FILTER;
+    final String DELETE_PROJECT_FILES =
+        "DELETE FROM project_files WHERE project_id=?" + VERSION_FILTER;
+    final String UPDATE_PROJECT_VERSIONS =
+        "UPDATE project_versions SET num_chunks=0 WHERE project_id=?" + VERSION_FILTER;
     // Todo jamiesjc: delete flow files
 
     final SQLTransaction<Integer> cleanOlderProjectTransaction = transOperator -> {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 41b5ebd..38c46a2 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -186,9 +186,10 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   /**
-   * Cleans all project versions less tha
+   * Cleans all project versions less than the provided version, except the versions to exclude
+   * given as argument
    */
-  void cleanOlderProjectVersion(int projectId, int version)
+  void cleanOlderProjectVersion(int projectId, int version, final List<Integer> excludedVersions)
       throws ProjectManagerException;
 
   void updateProjectProperty(Project project, Props props)
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 2ab5cc0..0b059ae 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -19,6 +19,7 @@ package azkaban.project;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.validator.ValidationReport;
@@ -35,6 +36,7 @@ import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -309,7 +311,7 @@ public class ProjectManager {
   public synchronized Project purgeProject(final Project project, final User deleter)
       throws ProjectManagerException {
     this.projectLoader.cleanOlderProjectVersion(project.getId(),
-        project.getVersion() + 1);
+        project.getVersion() + 1, Collections.emptyList());
     this.projectLoader
         .postEvent(project, EventType.PURGE, deleter.getUserId(), String
             .format("Purged versions before %d", project.getVersion() + 1));
@@ -501,7 +503,7 @@ public class ProjectManager {
 
   public Map<String, ValidationReport> uploadProject(final Project project,
       final File archive, final String fileType, final User uploader, final Props additionalProps)
-      throws ProjectManagerException {
+      throws ProjectManagerException, ExecutorManagerException {
     return this.azkabanProjectLoader
         .uploadProject(project, archive, fileType, uploader, additionalProps);
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index ee2692f..cff5bfd 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -337,6 +337,24 @@ public class ExecutionFlowDaoTest {
   }
 
   @Test
+  public void testFetchUnfinishedFlowsMetadata() throws Exception {
+    final List<ExecutableFlow> flows = createExecutions();
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows =
+        this.fetchActiveFlowDao.fetchUnfinishedFlowsMetadata();
+    assertFound(unfinishedFlows, flows.get(0), true);
+    assertFound(unfinishedFlows, flows.get(1), false);
+    assertFound(unfinishedFlows, flows.get(2), true);
+    assertNotFound(unfinishedFlows, flows.get(3), "Returned an execution with a finished status");
+    assertFound(unfinishedFlows, flows.get(4), false);
+    assertTwoFlowSame(unfinishedFlows.get(flows.get(0).getExecutionId()).getSecond(), flows.get(0),
+        false);
+    assertTwoFlowSame(unfinishedFlows.get(flows.get(1).getExecutionId()).getSecond(), flows.get(1),
+        false);
+    assertTwoFlowSame(unfinishedFlows.get(flows.get(2).getExecutionId()).getSecond(), flows.get(2),
+        false);
+  }
+
+  @Test
   public void testFetchActiveFlowByExecId() throws Exception {
     final List<ExecutableFlow> flows = createExecutions();
     assertTwoFlowSame(
@@ -356,15 +374,27 @@ public class ExecutionFlowDaoTest {
 
   private List<ExecutableFlow> createExecutions() throws Exception {
     final Executor executor = this.executorDao.addExecutor("test", 1);
+
     final ExecutableFlow flow1 = createExecutionAndAssign(Status.PREPARING, executor);
+
     // flow2 is not assigned
     final ExecutableFlow flow2 = createExecution(Status.PREPARING);
+
     final ExecutableFlow flow3 = createExecutionAndAssign(Status.RUNNING, executor);
+    flow3.setStartTime(System.currentTimeMillis() + 1);
+    this.executionFlowDao.updateExecutableFlow(flow3);
+
     final ExecutableFlow flow4 = createExecutionAndAssign(Status.SUCCEEDED, executor);
+    flow4.setStartTime(System.currentTimeMillis() - 2);
+    flow4.setEndTime(System.currentTimeMillis() - 1);
+    this.executionFlowDao.updateExecutableFlow(flow4);
 
     final Executor executor2 = this.executorDao.addExecutor("test2", 2);
     // flow5 is assigned to an executor that is then removed
     final ExecutableFlow flow5 = createExecutionAndAssign(Status.RUNNING, executor2);
+    flow5.setStartTime(System.currentTimeMillis() + 1);
+    this.executionFlowDao.updateExecutableFlow(flow5);
+
     this.executorDao.removeExecutor(executor2.getHost(), executor2.getPort());
     return ImmutableList.of(flow1, flow2, flow3, flow4, flow5);
   }
@@ -482,6 +512,11 @@ public class ExecutionFlowDaoTest {
   }
 
   private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
+    assertTwoFlowSame(flow1, flow2, true);
+  }
+
+  private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2,
+      final boolean compareFlowData) {
     assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
     assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
     assertThat(flow1.getEndTime()).isEqualTo(flow2.getEndTime());
@@ -490,9 +525,12 @@ public class ExecutionFlowDaoTest {
     assertThat(flow1.getFlowId()).isEqualTo(flow2.getFlowId());
     assertThat(flow1.getProjectId()).isEqualTo(flow2.getProjectId());
     assertThat(flow1.getVersion()).isEqualTo(flow2.getVersion());
-    assertThat(flow1.getExecutionOptions().getFailureAction())
-        .isEqualTo(flow2.getExecutionOptions().getFailureAction());
-    assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
+    assertThat(flow1.getSubmitUser()).isEqualTo(flow2.getSubmitUser());
+    if (compareFlowData) {
+      assertThat(flow1.getExecutionOptions().getFailureAction())
+          .isEqualTo(flow2.getExecutionOptions().getFailureAction());
+      assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
+    }
   }
 
   /**
diff --git a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
index 1307a8f..84a9aac 100644
--- a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
@@ -60,19 +60,19 @@ public class FetchActiveFlowDaoTest {
   private void mockExecution(final int encodingType, final byte[] flowData) throws SQLException {
     when(this.rs.next()).thenReturn(true).thenReturn(false);
     // execution id
-    when(this.rs.getInt(1)).thenReturn(1);
+    when(this.rs.getInt("exec_id")).thenReturn(1);
     // encodingType
-    when(this.rs.getInt(2)).thenReturn(encodingType);
+    when(this.rs.getInt("enc_type")).thenReturn(encodingType);
     // data
-    when(this.rs.getBytes(3)).thenReturn(flowData);
+    when(this.rs.getBytes("flow_data")).thenReturn(flowData);
     // executor host
-    when(this.rs.getString(4)).thenReturn(null);
+    when(this.rs.getString("host")).thenReturn(null);
     // executor port
-    when(this.rs.getInt(5)).thenReturn(0);
+    when(this.rs.getInt("port")).thenReturn(0);
     // executorId
-    when(this.rs.getInt(6)).thenReturn(1);
+    when(this.rs.getInt("executorId")).thenReturn(1);
     // executorStatus
-    when(this.rs.getBoolean(7)).thenReturn(false);
+    when(this.rs.getBoolean("executorStatus")).thenReturn(false);
   }
 
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index b22a385..6ee4050 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -17,6 +17,8 @@
 package azkaban.executor;
 
 import azkaban.executor.ExecutorLogEvent.EventType;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -27,7 +29,9 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
@@ -78,7 +82,33 @@ public class MockExecutorLoader implements ExecutorLoader {
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
       throws ExecutorManagerException {
-    return new ConcurrentHashMap<>();
+    return this.activeFlows;
+  }
+
+  @Override
+  public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlowsMetadata()
+      throws ExecutorManagerException {
+    return this.activeFlows.entrySet().stream()
+        .collect(Collectors.toMap(Entry::getKey, e -> {
+          final ExecutableFlow metadata = getExecutableFlowMetadata(e.getValue().getSecond());
+          return new Pair<>(e.getValue().getFirst(), metadata);
+        }));
+  }
+
+  private ExecutableFlow getExecutableFlowMetadata(
+      ExecutableFlow fullExFlow) {
+    final Flow flow = new Flow(fullExFlow.getId());
+    final Project project = new Project(fullExFlow.getProjectId(), null);
+    project.setVersion(fullExFlow.getVersion());
+    flow.setVersion(fullExFlow.getVersion());
+    final ExecutableFlow metadata = new ExecutableFlow(project, flow);
+    metadata.setExecutionId(fullExFlow.getExecutionId());
+    metadata.setStatus(fullExFlow.getStatus());
+    metadata.setSubmitTime(fullExFlow.getSubmitTime());
+    metadata.setStartTime(fullExFlow.getStartTime());
+    metadata.setEndTime(fullExFlow.getEndTime());
+    metadata.setSubmitUser(fullExFlow.getSubmitUser());
+    return metadata;
   }
 
   @Override
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index fe36a47..c836779 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -26,14 +26,21 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
 import azkaban.project.validator.ValidationReport;
 import azkaban.project.validator.ValidationStatus;
 import azkaban.storage.StorageManager;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.user.User;
+import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.Map;
 import org.junit.Before;
 import org.junit.Rule;
@@ -57,6 +64,7 @@ public class AzkabanProjectLoaderTest {
   private AzkabanProjectLoader azkabanProjectLoader;
   private StorageManager storageManager;
   private ProjectLoader projectLoader;
+  private ExecutorLoader executorLoader;
 
   @Before
   public void setUp() throws Exception {
@@ -65,13 +73,14 @@ public class AzkabanProjectLoaderTest {
 
     this.storageManager = mock(StorageManager.class);
     this.projectLoader = mock(ProjectLoader.class);
+    this.executorLoader = mock(ExecutorLoader.class);
 
     this.azkabanProjectLoader = new AzkabanProjectLoader(props, this.projectLoader,
-        this.storageManager, new FlowLoaderFactory(props));
+        this.storageManager, new FlowLoaderFactory(props), this.executorLoader);
   }
 
   @Test
-  public void uploadProject() throws Exception {
+  public void uploadProject() throws ExecutorManagerException {
     when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
 
     final URL resource = requireNonNull(
@@ -79,11 +88,20 @@ public class AzkabanProjectLoaderTest {
     final File projectZipFile = new File(resource.getPath());
     final User uploader = new User("test_user");
 
+    // to test excluding running versions in args of cleanOlderProjectVersion
+    final ExecutableFlow runningFlow = new ExecutableFlow(this.project, new Flow("x"));
+    runningFlow.setVersion(this.VERSION);
+    when(this.executorLoader.fetchUnfinishedFlowsMetadata())
+        .thenReturn(ImmutableMap.of(-1, new Pair<>(null, runningFlow)));
+
+    this.project.setVersion(this.VERSION);
     checkValidationReport(this.azkabanProjectLoader
         .uploadProject(this.project, projectZipFile, "zip", uploader, null));
 
     verify(this.storageManager)
         .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
+    verify(this.projectLoader).cleanOlderProjectVersion(this.project.getId(), this.VERSION - 3,
+        Arrays.asList(this.VERSION));
   }
 
   @Test
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 7f73e57..4892047 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -365,12 +365,31 @@ public class JdbcProjectImplTest {
     Assert.assertEquals(fileHandler.getNumChunks(), 1);
     assertNumChunks(project, newVersion, 1);
 
-    this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1);
+    this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1, Collections.emptyList());
 
     assertNumChunks(project, newVersion, 0);
     assertGetUploadedFileOfCleanedVersion(project.getId(), newVersion);
   }
 
+  @Test
+  public void cleanOlderProjectVersionExcludedVersion() {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+    final int newVersion2 = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.uploadProjectFile(project.getId(), newVersion2, testFile, "uploadUser1");
+    this.loader.cleanOlderProjectVersion(project.getId(), newVersion2 + 1,
+        Arrays.asList(newVersion, newVersion2));
+    assertNumChunks(project, newVersion, 1);
+    assertNumChunks(project, newVersion2, 1);
+    this.loader.cleanOlderProjectVersion(project.getId(), newVersion2 + 1,
+        Arrays.asList(newVersion));
+    assertNumChunks(project, newVersion, 1);
+    assertNumChunks(project, newVersion2, 0);
+  }
+
   private void assertNumChunks(final Project project, final int version, final int expectedChunks) {
     final ProjectFileHandler fileHandler = this.loader
         .fetchProjectMetaData(project.getId(), version);
diff --git a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
index 44a9e7a..94353ff 100644
--- a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
@@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import azkaban.executor.ExecutorLoader;
 import azkaban.storage.StorageManager;
 import azkaban.user.User;
 import azkaban.utils.Props;
@@ -35,14 +36,16 @@ public class ProjectManagerTest {
   private ProjectLoader projectLoader;
   private StorageManager storageManager;
   private Props props;
+  private ExecutorLoader executorLoader;
 
   @Before
   public void setUp() throws Exception {
     this.props = new Props();
     this.storageManager = mock(StorageManager.class);
     this.projectLoader = mock(ProjectLoader.class);
+    this.executorLoader = mock(ExecutorLoader.class);
     this.azkabanProjectLoader = new AzkabanProjectLoader(this.props, this.projectLoader,
-        this.storageManager, mock(FlowLoaderFactory.class));
+        this.storageManager, mock(FlowLoaderFactory.class), executorLoader);
     this.manager = new ProjectManager(this.azkabanProjectLoader, this.projectLoader,
         this.storageManager, this.props);
   }
diff --git a/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java b/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
index 5f418e1..cd4088a 100644
--- a/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
+++ b/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
@@ -16,6 +16,7 @@
 package azkaban.restli;
 
 import azkaban.Constants.ConfigurationKeys;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
@@ -58,7 +59,7 @@ public class ProjectManagerResource extends ResourceContextHolder {
       @ActionParam("projectName") final String projectName,
       @ActionParam("packageUrl") final String packageUrl)
       throws ProjectManagerException, RestLiServiceException, UserManagerException,
-      ServletException, IOException, SchedulerException {
+      ServletException, IOException, SchedulerException, ExecutorManagerException {
     logger.info("Deploy called. {projectName: " + projectName + ", packageUrl:" + packageUrl + "}");
 
     final String ip = ResourceUtils.getRealClientIpAddr(this.getContext());
@@ -144,7 +145,7 @@ public class ProjectManagerResource extends ResourceContextHolder {
       logger.info("Deploy: project " + projectName + " version is " + project.getVersion()
           + ", reference is " + System.identityHashCode(project));
       return Integer.toString(project.getVersion());
-    } catch (final ProjectManagerException | SchedulerException e) {
+    } catch (final ProjectManagerException | SchedulerException | ExecutorManagerException e) {
       final String errorMsg = "Upload of project " + project + " from " + archiveFile + " failed";
       logger.error(errorMsg, e);
       throw e;