azkaban-aplcache

Creating AzkabanProjectLoader class to handle project upload/download

7/11/2017 8:44:37 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 1780e1b..54234cd 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -102,6 +102,7 @@ public class Constants {
     public static final String AZKABAN_STORAGE_HDFS_ROOT_URI = "azkaban.storage.hdfs.root.uri";
     public static final String AZKABAN_KERBEROS_PRINCIPAL = "azkaban.kerberos.principal";
     public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
+    public static final String PROJECT_TEMP_DIR = "project.temp.dir";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
new file mode 100644
index 0000000..d8d12a6
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.flow.Flow;
+import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.project.validator.ValidationReport;
+import azkaban.project.validator.ValidationStatus;
+import azkaban.project.validator.ValidatorConfigs;
+import azkaban.project.validator.ValidatorManager;
+import azkaban.project.validator.XmlValidatorManager;
+import azkaban.storage.StorageManager;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+import com.google.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.zip.ZipFile;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the downloading and uploading of projects.
+ */
+class AzkabanProjectLoader {
+
+  private static final Logger log = LoggerFactory.getLogger(AzkabanProjectLoader.class);
+
+  private final Props props;
+
+  private final ProjectLoader projectLoader;
+  private final StorageManager storageManager;
+  private final File tempDir;
+  private final int projectVersionRetention;
+
+  @Inject
+  AzkabanProjectLoader(final Props props, final ProjectLoader projectLoader,
+      final StorageManager storageManager) {
+    this.props = requireNonNull(props, "Props is null");
+    this.projectLoader = requireNonNull(projectLoader, "project Loader is null");
+    this.storageManager = requireNonNull(storageManager, "Storage Manager is null");
+
+    this.tempDir = new File(props.getString(ConfigurationKeys.PROJECT_TEMP_DIR, "temp"));
+    if (!this.tempDir.exists()) {
+      log.info("Creating temp dir: " + this.tempDir.getAbsolutePath());
+      this.tempDir.mkdirs();
+    } else {
+      log.info("Using temp dir: " + this.tempDir.getAbsolutePath());
+    }
+    this.projectVersionRetention = props.getInt("project.version.retention", 3);
+    log.info("Project version retention is set to " + this.projectVersionRetention);
+  }
+
+  public Map<String, ValidationReport> uploadProject(final Project project,
+      final File archive, final String fileType, final User uploader, final Props additionalProps)
+      throws ProjectManagerException {
+    log.info("Uploading files to " + project.getName());
+
+    // Unzip.
+    File file = null;
+    try {
+      if (fileType == null) {
+        throw new ProjectManagerException("Unknown file type for "
+            + archive.getName());
+      } else if ("zip".equals(fileType)) {
+        file = unzipFile(archive);
+      } else {
+        throw new ProjectManagerException("Unsupported archive type for file "
+            + archive.getName());
+      }
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error unzipping file.", e);
+    }
+
+    // Since props is an instance variable of ProjectManager, and each
+    // invocation to the uploadProject manager needs to pass a different
+    // value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
+    // create a new instance of Props to make sure these different values
+    // are isolated from each other.
+    final Props prop = new Props(this.props);
+    prop.putAll(additionalProps);
+    prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH,
+        archive.getAbsolutePath());
+    // Basically, we want to make sure that for different invocations to the
+    // uploadProject method,
+    // the validators are using different values for the
+    // PROJECT_ARCHIVE_FILE_PATH configuration key.
+    // In addition, we want to reload the validator objects for each upload, so
+    // that we can change the validator configuration files without having to
+    // restart Azkaban web server. If the XmlValidatorManager is an instance
+    // variable, 2 consecutive invocations to the uploadProject
+    // method might cause the second one to overwrite the
+    // PROJECT_ARCHIVE_FILE_PATH configuration parameter
+    // of the first, thus causing a wrong archive file path to be passed to the
+    // validators. Creating a separate XmlValidatorManager object for each
+    // upload will prevent this issue without having to add
+    // synchronization between uploads. Since we're already reloading the XML
+    // config file and creating validator objects for each upload, this does
+    // not add too much additional overhead.
+    final ValidatorManager validatorManager = new XmlValidatorManager(prop);
+    log.info("Validating project " + archive.getName()
+        + " using the registered validators "
+        + validatorManager.getValidatorsInfo().toString());
+    final Map<String, ValidationReport> reports = validatorManager.validate(project, file);
+    ValidationStatus status = ValidationStatus.PASS;
+    for (final Entry<String, ValidationReport> report : reports.entrySet()) {
+      if (report.getValue().getStatus().compareTo(status) > 0) {
+        status = report.getValue().getStatus();
+      }
+    }
+    if (status == ValidationStatus.ERROR) {
+      log.error("Error found in upload to " + project.getName()
+          + ". Cleaning up.");
+
+      try {
+        FileUtils.deleteDirectory(file);
+      } catch (final IOException e) {
+        file.deleteOnExit();
+        e.printStackTrace();
+      }
+
+      return reports;
+    }
+
+    final DirectoryFlowLoader loader =
+        (DirectoryFlowLoader) validatorManager.getDefaultValidator();
+    final Map<String, Props> jobProps = loader.getJobProps();
+    final List<Props> propProps = loader.getProps();
+
+    synchronized (project) {
+      final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+      final Map<String, Flow> flows = loader.getFlowMap();
+      for (final Flow flow : flows.values()) {
+        flow.setProjectId(project.getId());
+        flow.setVersion(newVersion);
+      }
+
+      this.storageManager.uploadProject(project, newVersion, archive, uploader);
+
+      log.info("Uploading flow to db " + archive.getName());
+      this.projectLoader.uploadFlows(project, newVersion, flows.values());
+      log.info("Changing project versions " + archive.getName());
+      this.projectLoader.changeProjectVersion(project, newVersion,
+          uploader.getUserId());
+      project.setFlows(flows);
+      log.info("Uploading Job properties");
+      this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
+          jobProps.values()));
+      log.info("Uploading Props properties");
+      this.projectLoader.uploadProjectProperties(project, propProps);
+    }
+
+    log.info("Uploaded project files. Cleaning up temp files.");
+    this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+        "Uploaded project files zip " + archive.getName());
+    try {
+      FileUtils.deleteDirectory(file);
+    } catch (final IOException e) {
+      file.deleteOnExit();
+      e.printStackTrace();
+    }
+
+    log.info("Cleaning up old install files older than "
+        + (project.getVersion() - this.projectVersionRetention));
+    this.projectLoader.cleanOlderProjectVersion(project.getId(),
+        project.getVersion() - this.projectVersionRetention);
+
+    return reports;
+  }
+
+  private File unzipFile(final File archiveFile) throws IOException {
+    final ZipFile zipfile = new ZipFile(archiveFile);
+    final File unzipped = Utils.createTempDir(this.tempDir);
+    Utils.unzip(zipfile, unzipped);
+    zipfile.close();
+
+    return unzipped;
+  }
+
+  public ProjectFileHandler getProjectFile(final Project project, int version)
+      throws ProjectManagerException {
+    if (version == -1) {
+      version = this.projectLoader.getLatestProjectVersion(project);
+    }
+    return this.storageManager.getProjectFile(project.getId(), version);
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index f051e85..8a63a5d 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -21,9 +21,7 @@ import static java.util.Objects.requireNonNull;
 import azkaban.flow.Flow;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.validator.ValidationReport;
-import azkaban.project.validator.ValidationStatus;
 import azkaban.project.validator.ValidatorConfigs;
-import azkaban.project.validator.ValidatorManager;
 import azkaban.project.validator.XmlValidatorManager;
 import azkaban.storage.StorageManager;
 import azkaban.user.Permission;
@@ -31,31 +29,24 @@ import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
-import azkaban.utils.Utils;
 import com.google.inject.Inject;
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
-import java.util.zip.ZipFile;
-import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
 
 public class ProjectManager {
 
   private static final Logger logger = Logger.getLogger(ProjectManager.class);
+  private final AzkabanProjectLoader azkabanProjectLoader;
   private final ProjectLoader projectLoader;
-  private final StorageManager storageManager;
   private final Props props;
-  private final File tempDir;
-  private final int projectVersionRetention;
   private final boolean creatorDefaultPermissions;
   private final ConcurrentHashMap<Integer, Project> projectsById =
       new ConcurrentHashMap<>();
@@ -63,25 +54,17 @@ public class ProjectManager {
       new ConcurrentHashMap<>();
 
   @Inject
-  public ProjectManager(final ProjectLoader loader, final StorageManager storageManager,
+  public ProjectManager(final AzkabanProjectLoader azkabanProjectLoader,
+      final ProjectLoader loader,
+      final StorageManager storageManager,
       final Props props) {
     this.projectLoader = requireNonNull(loader);
-    this.storageManager = requireNonNull(storageManager);
     this.props = requireNonNull(props);
-
-    this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
-    this.projectVersionRetention =
-        (props.getInt("project.version.retention", 3));
-    logger.info("Project version retention is set to "
-        + this.projectVersionRetention);
+    this.azkabanProjectLoader = requireNonNull(azkabanProjectLoader);
 
     this.creatorDefaultPermissions =
         props.getBoolean("creator.default.proxy", true);
 
-    if (!this.tempDir.exists()) {
-      this.tempDir.mkdirs();
-    }
-
     // The prop passed to XmlValidatorManager is used to initialize all the
     // validators
     // Each validator will take certain key/value pairs from the prop to
@@ -457,130 +440,16 @@ public class ProjectManager {
    *         project name and version
    * @throws ProjectManagerException
    */
-  public ProjectFileHandler getProjectFileHandler(final Project project, int version)
+  public ProjectFileHandler getProjectFileHandler(final Project project, final int version)
       throws ProjectManagerException {
-
-    if (version == -1) {
-      version = this.projectLoader.getLatestProjectVersion(project);
-    }
-    return this.storageManager.getProjectFile(project.getId(), version);
+    return azkabanProjectLoader.getProjectFile(project, version);
   }
 
   public Map<String, ValidationReport> uploadProject(final Project project,
       final File archive, final String fileType, final User uploader, final Props additionalProps)
       throws ProjectManagerException {
-    logger.info("Uploading files to " + project.getName());
-
-    // Unzip.
-    File file = null;
-    try {
-      if (fileType == null) {
-        throw new ProjectManagerException("Unknown file type for "
-            + archive.getName());
-      } else if ("zip".equals(fileType)) {
-        file = unzipFile(archive);
-      } else {
-        throw new ProjectManagerException("Unsupported archive type for file "
-            + archive.getName());
-      }
-    } catch (final IOException e) {
-      throw new ProjectManagerException("Error unzipping file.", e);
-    }
-
-    // Since props is an instance variable of ProjectManager, and each
-    // invocation to the uploadProject manager needs to pass a different
-    // value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
-    // create a new instance of Props to make sure these different values
-    // are isolated from each other.
-    final Props prop = new Props(this.props);
-    prop.putAll(additionalProps);
-    prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH,
-        archive.getAbsolutePath());
-    // Basically, we want to make sure that for different invocations to the
-    // uploadProject method,
-    // the validators are using different values for the
-    // PROJECT_ARCHIVE_FILE_PATH configuration key.
-    // In addition, we want to reload the validator objects for each upload, so
-    // that we can change the validator configuration files without having to
-    // restart Azkaban web server. If the XmlValidatorManager is an instance
-    // variable, 2 consecutive invocations to the uploadProject
-    // method might cause the second one to overwrite the
-    // PROJECT_ARCHIVE_FILE_PATH configuration parameter
-    // of the first, thus causing a wrong archive file path to be passed to the
-    // validators. Creating a separate XmlValidatorManager object for each
-    // upload will prevent this issue without having to add
-    // synchronization between uploads. Since we're already reloading the XML
-    // config file and creating validator objects for each upload, this does
-    // not add too much additional overhead.
-    final ValidatorManager validatorManager = new XmlValidatorManager(prop);
-    logger.info("Validating project " + archive.getName()
-        + " using the registered validators "
-        + validatorManager.getValidatorsInfo().toString());
-    final Map<String, ValidationReport> reports = validatorManager.validate(project, file);
-    ValidationStatus status = ValidationStatus.PASS;
-    for (final Entry<String, ValidationReport> report : reports.entrySet()) {
-      if (report.getValue().getStatus().compareTo(status) > 0) {
-        status = report.getValue().getStatus();
-      }
-    }
-    if (status == ValidationStatus.ERROR) {
-      logger.error("Error found in upload to " + project.getName()
-          + ". Cleaning up.");
-
-      try {
-        FileUtils.deleteDirectory(file);
-      } catch (final IOException e) {
-        file.deleteOnExit();
-        e.printStackTrace();
-      }
-
-      return reports;
-    }
-
-    final DirectoryFlowLoader loader =
-        (DirectoryFlowLoader) validatorManager.getDefaultValidator();
-    final Map<String, Props> jobProps = loader.getJobProps();
-    final List<Props> propProps = loader.getProps();
-
-    synchronized (project) {
-      final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
-      final Map<String, Flow> flows = loader.getFlowMap();
-      for (final Flow flow : flows.values()) {
-        flow.setProjectId(project.getId());
-        flow.setVersion(newVersion);
-      }
-
-      this.storageManager.uploadProject(project, newVersion, archive, uploader);
-
-      logger.info("Uploading flow to db " + archive.getName());
-      this.projectLoader.uploadFlows(project, newVersion, flows.values());
-      logger.info("Changing project versions " + archive.getName());
-      this.projectLoader.changeProjectVersion(project, newVersion,
-          uploader.getUserId());
-      project.setFlows(flows);
-      logger.info("Uploading Job properties");
-      this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
-          jobProps.values()));
-      logger.info("Uploading Props properties");
-      this.projectLoader.uploadProjectProperties(project, propProps);
-    }
-
-    logger.info("Uploaded project files. Cleaning up temp files.");
-    this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
-        "Uploaded project files zip " + archive.getName());
-    try {
-      FileUtils.deleteDirectory(file);
-    } catch (final IOException e) {
-      file.deleteOnExit();
-      e.printStackTrace();
-    }
-
-    logger.info("Cleaning up old install files older than "
-        + (project.getVersion() - this.projectVersionRetention));
-    this.projectLoader.cleanOlderProjectVersion(project.getId(),
-        project.getVersion() - this.projectVersionRetention);
-
-    return reports;
+    return azkabanProjectLoader
+        .uploadProject(project, archive, fileType, uploader, additionalProps);
   }
 
   public void updateFlow(final Project project, final Flow flow)
@@ -588,14 +457,6 @@ public class ProjectManager {
     this.projectLoader.updateFlow(project, flow.getVersion(), flow);
   }
 
-  private File unzipFile(final File archiveFile) throws IOException {
-    final ZipFile zipfile = new ZipFile(archiveFile);
-    final File unzipped = Utils.createTempDir(this.tempDir);
-    Utils.unzip(zipfile, unzipped);
-    zipfile.close();
-
-    return unzipped;
-  }
 
   public void postProjectEvent(final Project project, final EventType type, final String user,
       final String message) {
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
new file mode 100644
index 0000000..5f98984
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static azkaban.Constants.ConfigurationKeys.PROJECT_TEMP_DIR;
+import static java.util.Objects.requireNonNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import azkaban.storage.StorageManager;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import java.io.File;
+import java.net.URL;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AzkabanProjectLoaderTest {
+
+  @Rule
+  public final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+  private final int ID = 107;
+  private final int VERSION = 10;
+  private final Project project = new Project(this.ID, "project1");
+
+  private AzkabanProjectLoader azkabanProjectLoader;
+  private StorageManager storageManager;
+  private ProjectLoader projectLoader;
+
+  @Before
+  public void setUp() throws Exception {
+    final Props props = new Props();
+    props.put(PROJECT_TEMP_DIR, this.TEMP_DIR.getRoot().getAbsolutePath());
+
+    this.storageManager = mock(StorageManager.class);
+    this.projectLoader = mock(ProjectLoader.class);
+
+    this.azkabanProjectLoader = new AzkabanProjectLoader(props, this.projectLoader,
+        this.storageManager);
+  }
+
+  @Test
+  public void uploadProject() throws Exception {
+    when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
+
+    final URL resource = requireNonNull(
+        getClass().getClassLoader().getResource("sample_flow_01.zip"));
+    final File projectZipFile = new File(resource.getPath());
+    final User uploader = new User("test_user");
+
+    this.azkabanProjectLoader.uploadProject(this.project, projectZipFile, "zip", uploader, null);
+
+    verify(this.storageManager)
+        .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
+  }
+
+  @Test
+  public void getProjectFile() throws Exception {
+    when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
+
+    // Run the code
+    this.azkabanProjectLoader.getProjectFile(this.project, -1);
+
+    verify(this.projectLoader).getLatestProjectVersion(this.project);
+    verify(this.storageManager).getProjectFile(this.ID, this.VERSION);
+  }
+
+}