azkaban-aplcache

StorageManager to clean up project artifacts based on config

8/24/2017 7:56:40 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 54234cd..82c62b2 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -103,6 +103,23 @@ public class Constants {
     public static final String AZKABAN_KERBEROS_PRINCIPAL = "azkaban.kerberos.principal";
     public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
     public static final String PROJECT_TEMP_DIR = "project.temp.dir";
+
+    /*
+     * The max number of artifacts retained per project.
+     * Accepted Values:
+     * - 0 : Save all artifacts. No clean up is done on storage.
+     * - 1, 2, 3, ... (any +ve integer 'n') : Maintain 'n' latest versions in storage
+     *
+     * Note: Having an unacceptable value results in an exception and the service would REFUSE
+     * to start.
+     *
+     * Example:
+     * a) azkaban.storage.artifact.max.retention=all
+     *    implies save all artifacts
+     * b) azkaban.storage.artifact.max.retention=3
+     *    implies save latest 3 versions saved in storage.
+     **/
+    public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index d8d12a6..51ffa11 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -189,6 +189,9 @@ class AzkabanProjectLoader {
     this.projectLoader.cleanOlderProjectVersion(project.getId(),
         project.getVersion() - this.projectVersionRetention);
 
+    // Clean up storage
+    this.storageManager.cleanupProjectArtifacts(project.getId());
+
     return reports;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
new file mode 100644
index 0000000..969e15a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.spi.Storage;
+import azkaban.utils.Props;
+import com.google.common.annotations.VisibleForTesting;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class StorageCleaner {
+
+  // Delete records of all older versions
+  static final String SQL_DELETE_RESOURCE_ID = "DELETE FROM project_versions WHERE resource_id=?";
+
+  /**
+   * The query must sort the versions in reverse order for the cleanup operation to work correctly!
+   * TODO spyne: Refactor database storage cleanup to use this
+   *
+   * When using DatabaseStorage, resourceId is always NULL. Hence, those rows will currently be
+   * never cleaned up.
+   */
+  static final String SQL_FETCH_PVR = "SELECT resource_id FROM project_versions WHERE project_id=? AND resource_id IS NOT NULL ORDER BY version DESC";
+
+  private static final Logger log = Logger.getLogger(StorageCleaner.class);
+  private final DatabaseOperator databaseOperator;
+  private final int maxArtifactsPerProject;
+  private final Storage storage;
+
+  @Inject
+  public StorageCleaner(final Props props, final Storage storage,
+      final DatabaseOperator databaseOperator) {
+    this.storage = storage;
+    this.databaseOperator = databaseOperator;
+
+    this.maxArtifactsPerProject = props.getInt(AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, 0);
+    checkArgument(this.maxArtifactsPerProject >= 0,
+        String.format("Invalid value for %s : %d", AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION,
+            this.maxArtifactsPerProject));
+
+    if (isCleanupPermitted()) {
+      log.info(String.format("%s Config: Max %d artifact(s) retained per project",
+          AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, this.maxArtifactsPerProject));
+    } else {
+      log.warn("Project cleanup disabled. All artifacts will be stored.");
+    }
+  }
+
+  @VisibleForTesting
+  boolean isCleanupPermitted() {
+    return this.maxArtifactsPerProject > 0;
+  }
+
+  /**
+   * Remove all but last N artifacts as configured by AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION
+   *
+   * Since multiple versions can share the same filename, the algo is to collect all filenames and
+   * from them, remove the latest ones. The remaining ones are deleted by the respective storage.
+   *
+   * From the storage perspective, cleanup just needs the {@link Storage#delete(String)} API to
+   * work.
+   *
+   * Failure cases:
+   * - If the storage cleanup fails, the cleanup will be attempted again on the next upload
+   * - If the storage cleanup succeeds and the DB cleanup fails, the DB will be cleaned up in the
+   * next attempt.
+   *
+   * @param projectId project ID
+   */
+  public void cleanupProjectArtifacts(final int projectId) {
+    if (!isCleanupPermitted()) {
+      return;
+    }
+    final Set<String> allResourceIds = findResourceIdsToDelete(projectId);
+    if (allResourceIds.size() == 0) {
+      return;
+    }
+
+    log.warn(String.format("Deleting project artifacts [id: %d]: %s", projectId, allResourceIds));
+    allResourceIds.forEach(this::delete);
+  }
+
+  private Set<String> findResourceIdsToDelete(final int projectId) {
+    final List<String> resourceIdOrderedList = fetchResourceIdOrderedList(projectId);
+    if (resourceIdOrderedList.size() <= this.maxArtifactsPerProject) {
+      return Collections.emptySet();
+    }
+
+    final Set<String> allResourceIds = new HashSet<>(resourceIdOrderedList);
+    final Set<String> doNotDeleteSet = new HashSet<>(
+        resourceIdOrderedList.subList(0, this.maxArtifactsPerProject));
+    allResourceIds.removeAll(doNotDeleteSet);
+    return allResourceIds;
+  }
+
+  /**
+   * Main Delete Utility.
+   *
+   * Delete the storage first. Then remove metadata from DB. Warning! This order cannot be reversed
+   * since if the metadata is lost, there is no reference of the storage blob.
+   *
+   * @param resourceId the storage key to be deleted.
+   * @return true if deletion was successful. false otherwise
+   */
+  private boolean delete(final String resourceId) {
+    final boolean isDeleted = this.storage.delete(resourceId) && removeDbEntry(resourceId);
+    if (!isDeleted) {
+      log.info("Failed to delete resourceId: " + resourceId);
+    }
+    return isDeleted;
+  }
+
+  private boolean removeDbEntry(final String resourceId) {
+    try {
+      final int nAffectedRows = this.databaseOperator.update(SQL_DELETE_RESOURCE_ID, resourceId);
+      return nAffectedRows > 0;
+    } catch (final SQLException e) {
+      log.error("Error while deleting DB metadata resource ID: " + resourceId, e);
+    }
+    return false;
+  }
+
+  private List<String> fetchResourceIdOrderedList(final int projectId) {
+    try {
+      return this.databaseOperator.query(SQL_FETCH_PVR,
+          rs -> {
+            final List<String> results = new ArrayList<>();
+            while (rs.next()) {
+              results.add(rs.getString(1));
+            }
+            return results;
+          }, projectId);
+    } catch (final SQLException e) {
+      log.error("Error performing cleanup of Project: " + projectId, e);
+    }
+    return Collections.emptyList();
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 95cf142..1373649 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -30,13 +30,14 @@ import azkaban.spi.StorageMetadata;
 import azkaban.user.User;
 import azkaban.utils.Md5Hasher;
 import azkaban.utils.Props;
-import com.google.inject.Inject;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
@@ -45,20 +46,24 @@ import org.apache.log4j.Logger;
  * StorageManager manages and coordinates all interactions with the Storage layer. This also
  * includes bookkeeping like updating DB with the new versionm, etc
  */
+@Singleton
 public class StorageManager {
 
   private static final Logger log = Logger.getLogger(StorageManager.class);
 
+  private final StorageCleaner storageCleaner;
   private final Storage storage;
   private final ProjectLoader projectLoader;
   private final File tempDir;
 
   @Inject
   public StorageManager(final Props props, final Storage storage,
-      final ProjectLoader projectLoader) {
+      final ProjectLoader projectLoader,
+      final StorageCleaner storageCleaner) {
     this.tempDir = new File(props.getString("project.temp.dir", "temp"));
-    this.storage = requireNonNull(storage);
-    this.projectLoader = requireNonNull(projectLoader);
+    this.storage = requireNonNull(storage, "storage is null");
+    this.projectLoader = requireNonNull(projectLoader, "projectLoader is null");
+    this.storageCleaner = requireNonNull(storageCleaner, "storageCleanUp is null");
 
     prepareTempDir();
   }
@@ -116,6 +121,18 @@ public class StorageManager {
     }
   }
 
+  /**
+   * Clean up project artifacts based on project ID.
+   * See {@link StorageCleaner#cleanupProjectArtifacts(int)}
+   */
+  public void cleanupProjectArtifacts(final int projectId) {
+    try {
+      this.storageCleaner.cleanupProjectArtifacts(projectId);
+    } catch (final Exception e) {
+      log.error("Error occured during cleanup. Ignoring and continuing...", e);
+    }
+  }
+
   private byte[] computeHash(final File localFile) {
     final byte[] md5;
     try {
diff --git a/azkaban-common/src/test/java/azkaban/storage/StorageCleanerTest.java b/azkaban-common/src/test/java/azkaban/storage/StorageCleanerTest.java
new file mode 100644
index 0000000..b2a1289
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/storage/StorageCleanerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION;
+import static azkaban.storage.StorageCleaner.SQL_DELETE_RESOURCE_ID;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyVararg;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.spi.Storage;
+import azkaban.utils.Props;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StorageCleanerTest {
+
+  public static final int TEST_PROJECT_ID = 14;
+
+  private Storage storage;
+  private DatabaseOperator databaseOperator;
+
+  @Before
+  public void setUp() throws Exception {
+    this.databaseOperator = mock(DatabaseOperator.class);
+    this.storage = mock(Storage.class);
+
+    when(this.databaseOperator.query(
+        eq(StorageCleaner.SQL_FETCH_PVR), anyObject(), eq(TEST_PROJECT_ID)))
+        .thenReturn(Arrays.asList("14/14-9.zip", "14/14-8.zip", "14/14-7.zip"));
+
+    when(this.storage.delete("14/14-8.zip")).thenReturn(true);
+    when(this.storage.delete("14/14-7.zip")).thenReturn(false);
+    when(this.databaseOperator.update(any(), anyVararg())).thenReturn(1);
+  }
+
+  /**
+   * test default behavior. By default no artifacts should be cleaned up.
+   */
+  @Test
+  public void testNoCleanupCase1() throws Exception {
+    final StorageCleaner storageCleaner = new StorageCleaner(new Props(), this.storage,
+        this.databaseOperator);
+
+    assertFalse(storageCleaner.isCleanupPermitted());
+  }
+
+  @Test
+  public void testNoCleanupCase2() throws Exception {
+    final Props props = new Props();
+    props.put(AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, 10);
+    final StorageCleaner storageCleaner = new StorageCleaner(props, this.storage,
+        this.databaseOperator);
+
+    assertTrue(storageCleaner.isCleanupPermitted());
+    storageCleaner.cleanupProjectArtifacts(TEST_PROJECT_ID);
+
+    verify(this.storage, never()).delete(anyString());
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    final Props props = new Props();
+    props.put(AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, 1);
+    final StorageCleaner storageCleaner = new StorageCleaner(props, this.storage,
+        this.databaseOperator);
+
+    assertTrue(storageCleaner.isCleanupPermitted());
+    storageCleaner.cleanupProjectArtifacts(TEST_PROJECT_ID);
+
+    verify(this.storage, never()).delete("14/14-9.zip");
+    verify(this.storage, times(1)).delete("14/14-8.zip");
+    verify(this.storage, times(1)).delete("14/14-7.zip");
+
+    verify(this.databaseOperator, never()).update(SQL_DELETE_RESOURCE_ID, "14/14-9.zip");
+    verify(this.databaseOperator, times(1)).update(SQL_DELETE_RESOURCE_ID, "14/14-8.zip");
+    verify(this.databaseOperator, never()).update(SQL_DELETE_RESOURCE_ID, "14/14-7.zip");
+  }
+}