Details
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 54234cd..82c62b2 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -103,6 +103,23 @@ public class Constants {
public static final String AZKABAN_KERBEROS_PRINCIPAL = "azkaban.kerberos.principal";
public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
public static final String PROJECT_TEMP_DIR = "project.temp.dir";
+
+ /*
+ * The max number of artifacts retained per project.
+ * Accepted Values:
+ * - 0 : Save all artifacts. No clean up is done on storage.
+ * - 1, 2, 3, ... (any +ve integer 'n') : Maintain 'n' latest versions in storage
+ *
+ * Note: Having an unacceptable value results in an exception and the service would REFUSE
+ * to start.
+ *
+ * Example:
+ * a) azkaban.storage.artifact.max.retention=all
+ * implies save all artifacts
+ * b) azkaban.storage.artifact.max.retention=3
+ * implies save latest 3 versions saved in storage.
+ **/
+ public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index d8d12a6..51ffa11 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -189,6 +189,9 @@ class AzkabanProjectLoader {
this.projectLoader.cleanOlderProjectVersion(project.getId(),
project.getVersion() - this.projectVersionRetention);
+ // Clean up storage
+ this.storageManager.cleanupProjectArtifacts(project.getId());
+
return reports;
}
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
new file mode 100644
index 0000000..969e15a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.spi.Storage;
+import azkaban.utils.Props;
+import com.google.common.annotations.VisibleForTesting;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class StorageCleaner {
+
+ // Delete records of all older versions
+ static final String SQL_DELETE_RESOURCE_ID = "DELETE FROM project_versions WHERE resource_id=?";
+
+ /**
+ * The query must sort the versions in reverse order for the cleanup operation to work correctly!
+ * TODO spyne: Refactor database storage cleanup to use this
+ *
+ * When using DatabaseStorage, resourceId is always NULL. Hence, those rows will currently be
+ * never cleaned up.
+ */
+ static final String SQL_FETCH_PVR = "SELECT resource_id FROM project_versions WHERE project_id=? AND resource_id IS NOT NULL ORDER BY version DESC";
+
+ private static final Logger log = Logger.getLogger(StorageCleaner.class);
+ private final DatabaseOperator databaseOperator;
+ private final int maxArtifactsPerProject;
+ private final Storage storage;
+
+ @Inject
+ public StorageCleaner(final Props props, final Storage storage,
+ final DatabaseOperator databaseOperator) {
+ this.storage = storage;
+ this.databaseOperator = databaseOperator;
+
+ this.maxArtifactsPerProject = props.getInt(AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, 0);
+ checkArgument(this.maxArtifactsPerProject >= 0,
+ String.format("Invalid value for %s : %d", AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION,
+ this.maxArtifactsPerProject));
+
+ if (isCleanupPermitted()) {
+ log.info(String.format("%s Config: Max %d artifact(s) retained per project",
+ AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, this.maxArtifactsPerProject));
+ } else {
+ log.warn("Project cleanup disabled. All artifacts will be stored.");
+ }
+ }
+
+ @VisibleForTesting
+ boolean isCleanupPermitted() {
+ return this.maxArtifactsPerProject > 0;
+ }
+
+ /**
+ * Remove all but last N artifacts as configured by AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION
+ *
+ * Since multiple versions can share the same filename, the algo is to collect all filenames and
+ * from them, remove the latest ones. The remaining ones are deleted by the respective storage.
+ *
+ * From the storage perspective, cleanup just needs the {@link Storage#delete(String)} API to
+ * work.
+ *
+ * Failure cases:
+ * - If the storage cleanup fails, the cleanup will be attempted again on the next upload
+ * - If the storage cleanup succeeds and the DB cleanup fails, the DB will be cleaned up in the
+ * next attempt.
+ *
+ * @param projectId project ID
+ */
+ public void cleanupProjectArtifacts(final int projectId) {
+ if (!isCleanupPermitted()) {
+ return;
+ }
+ final Set<String> allResourceIds = findResourceIdsToDelete(projectId);
+ if (allResourceIds.size() == 0) {
+ return;
+ }
+
+ log.warn(String.format("Deleting project artifacts [id: %d]: %s", projectId, allResourceIds));
+ allResourceIds.forEach(this::delete);
+ }
+
+ private Set<String> findResourceIdsToDelete(final int projectId) {
+ final List<String> resourceIdOrderedList = fetchResourceIdOrderedList(projectId);
+ if (resourceIdOrderedList.size() <= this.maxArtifactsPerProject) {
+ return Collections.emptySet();
+ }
+
+ final Set<String> allResourceIds = new HashSet<>(resourceIdOrderedList);
+ final Set<String> doNotDeleteSet = new HashSet<>(
+ resourceIdOrderedList.subList(0, this.maxArtifactsPerProject));
+ allResourceIds.removeAll(doNotDeleteSet);
+ return allResourceIds;
+ }
+
+ /**
+ * Main Delete Utility.
+ *
+ * Delete the storage first. Then remove metadata from DB. Warning! This order cannot be reversed
+ * since if the metadata is lost, there is no reference of the storage blob.
+ *
+ * @param resourceId the storage key to be deleted.
+ * @return true if deletion was successful. false otherwise
+ */
+ private boolean delete(final String resourceId) {
+ final boolean isDeleted = this.storage.delete(resourceId) && removeDbEntry(resourceId);
+ if (!isDeleted) {
+ log.info("Failed to delete resourceId: " + resourceId);
+ }
+ return isDeleted;
+ }
+
+ private boolean removeDbEntry(final String resourceId) {
+ try {
+ final int nAffectedRows = this.databaseOperator.update(SQL_DELETE_RESOURCE_ID, resourceId);
+ return nAffectedRows > 0;
+ } catch (final SQLException e) {
+ log.error("Error while deleting DB metadata resource ID: " + resourceId, e);
+ }
+ return false;
+ }
+
+ private List<String> fetchResourceIdOrderedList(final int projectId) {
+ try {
+ return this.databaseOperator.query(SQL_FETCH_PVR,
+ rs -> {
+ final List<String> results = new ArrayList<>();
+ while (rs.next()) {
+ results.add(rs.getString(1));
+ }
+ return results;
+ }, projectId);
+ } catch (final SQLException e) {
+ log.error("Error performing cleanup of Project: " + projectId, e);
+ }
+ return Collections.emptyList();
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 95cf142..1373649 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -30,13 +30,14 @@ import azkaban.spi.StorageMetadata;
import azkaban.user.User;
import azkaban.utils.Md5Hasher;
import azkaban.utils.Props;
-import com.google.inject.Inject;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
@@ -45,20 +46,24 @@ import org.apache.log4j.Logger;
* StorageManager manages and coordinates all interactions with the Storage layer. This also
* includes bookkeeping like updating DB with the new versionm, etc
*/
+@Singleton
public class StorageManager {
private static final Logger log = Logger.getLogger(StorageManager.class);
+ private final StorageCleaner storageCleaner;
private final Storage storage;
private final ProjectLoader projectLoader;
private final File tempDir;
@Inject
public StorageManager(final Props props, final Storage storage,
- final ProjectLoader projectLoader) {
+ final ProjectLoader projectLoader,
+ final StorageCleaner storageCleaner) {
this.tempDir = new File(props.getString("project.temp.dir", "temp"));
- this.storage = requireNonNull(storage);
- this.projectLoader = requireNonNull(projectLoader);
+ this.storage = requireNonNull(storage, "storage is null");
+ this.projectLoader = requireNonNull(projectLoader, "projectLoader is null");
+ this.storageCleaner = requireNonNull(storageCleaner, "storageCleanUp is null");
prepareTempDir();
}
@@ -116,6 +121,18 @@ public class StorageManager {
}
}
+ /**
+ * Clean up project artifacts based on project ID.
+ * See {@link StorageCleaner#cleanupProjectArtifacts(int)}
+ */
+ public void cleanupProjectArtifacts(final int projectId) {
+ try {
+ this.storageCleaner.cleanupProjectArtifacts(projectId);
+ } catch (final Exception e) {
+ log.error("Error occured during cleanup. Ignoring and continuing...", e);
+ }
+ }
+
private byte[] computeHash(final File localFile) {
final byte[] md5;
try {
diff --git a/azkaban-common/src/test/java/azkaban/storage/StorageCleanerTest.java b/azkaban-common/src/test/java/azkaban/storage/StorageCleanerTest.java
new file mode 100644
index 0000000..b2a1289
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/storage/StorageCleanerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION;
+import static azkaban.storage.StorageCleaner.SQL_DELETE_RESOURCE_ID;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyVararg;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.spi.Storage;
+import azkaban.utils.Props;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StorageCleanerTest {
+
+ public static final int TEST_PROJECT_ID = 14;
+
+ private Storage storage;
+ private DatabaseOperator databaseOperator;
+
+ @Before
+ public void setUp() throws Exception {
+ this.databaseOperator = mock(DatabaseOperator.class);
+ this.storage = mock(Storage.class);
+
+ when(this.databaseOperator.query(
+ eq(StorageCleaner.SQL_FETCH_PVR), anyObject(), eq(TEST_PROJECT_ID)))
+ .thenReturn(Arrays.asList("14/14-9.zip", "14/14-8.zip", "14/14-7.zip"));
+
+ when(this.storage.delete("14/14-8.zip")).thenReturn(true);
+ when(this.storage.delete("14/14-7.zip")).thenReturn(false);
+ when(this.databaseOperator.update(any(), anyVararg())).thenReturn(1);
+ }
+
+ /**
+ * test default behavior. By default no artifacts should be cleaned up.
+ */
+ @Test
+ public void testNoCleanupCase1() throws Exception {
+ final StorageCleaner storageCleaner = new StorageCleaner(new Props(), this.storage,
+ this.databaseOperator);
+
+ assertFalse(storageCleaner.isCleanupPermitted());
+ }
+
+ @Test
+ public void testNoCleanupCase2() throws Exception {
+ final Props props = new Props();
+ props.put(AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, 10);
+ final StorageCleaner storageCleaner = new StorageCleaner(props, this.storage,
+ this.databaseOperator);
+
+ assertTrue(storageCleaner.isCleanupPermitted());
+ storageCleaner.cleanupProjectArtifacts(TEST_PROJECT_ID);
+
+ verify(this.storage, never()).delete(anyString());
+ }
+
+ @Test
+ public void testCleanup() throws Exception {
+ final Props props = new Props();
+ props.put(AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION, 1);
+ final StorageCleaner storageCleaner = new StorageCleaner(props, this.storage,
+ this.databaseOperator);
+
+ assertTrue(storageCleaner.isCleanupPermitted());
+ storageCleaner.cleanupProjectArtifacts(TEST_PROJECT_ID);
+
+ verify(this.storage, never()).delete("14/14-9.zip");
+ verify(this.storage, times(1)).delete("14/14-8.zip");
+ verify(this.storage, times(1)).delete("14/14-7.zip");
+
+ verify(this.databaseOperator, never()).update(SQL_DELETE_RESOURCE_ID, "14/14-9.zip");
+ verify(this.databaseOperator, times(1)).update(SQL_DELETE_RESOURCE_ID, "14/14-8.zip");
+ verify(this.databaseOperator, never()).update(SQL_DELETE_RESOURCE_ID, "14/14-7.zip");
+ }
+}