Details
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 1780e1b..54234cd 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -102,6 +102,7 @@ public class Constants {
public static final String AZKABAN_STORAGE_HDFS_ROOT_URI = "azkaban.storage.hdfs.root.uri";
public static final String AZKABAN_KERBEROS_PRINCIPAL = "azkaban.kerberos.principal";
public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
+ public static final String PROJECT_TEMP_DIR = "project.temp.dir";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
new file mode 100644
index 0000000..d8d12a6
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.flow.Flow;
+import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.project.validator.ValidationReport;
+import azkaban.project.validator.ValidationStatus;
+import azkaban.project.validator.ValidatorConfigs;
+import azkaban.project.validator.ValidatorManager;
+import azkaban.project.validator.XmlValidatorManager;
+import azkaban.storage.StorageManager;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+import com.google.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.zip.ZipFile;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the downloading and uploading of projects.
+ */
+class AzkabanProjectLoader {
+
+ private static final Logger log = LoggerFactory.getLogger(AzkabanProjectLoader.class);
+
+ private final Props props;
+
+ private final ProjectLoader projectLoader;
+ private final StorageManager storageManager;
+ private final File tempDir;
+ private final int projectVersionRetention;
+
+ @Inject
+ AzkabanProjectLoader(final Props props, final ProjectLoader projectLoader,
+ final StorageManager storageManager) {
+ this.props = requireNonNull(props, "Props is null");
+ this.projectLoader = requireNonNull(projectLoader, "project Loader is null");
+ this.storageManager = requireNonNull(storageManager, "Storage Manager is null");
+
+ this.tempDir = new File(props.getString(ConfigurationKeys.PROJECT_TEMP_DIR, "temp"));
+ if (!this.tempDir.exists()) {
+ log.info("Creating temp dir: " + this.tempDir.getAbsolutePath());
+ this.tempDir.mkdirs();
+ } else {
+ log.info("Using temp dir: " + this.tempDir.getAbsolutePath());
+ }
+ this.projectVersionRetention = props.getInt("project.version.retention", 3);
+ log.info("Project version retention is set to " + this.projectVersionRetention);
+ }
+
+ public Map<String, ValidationReport> uploadProject(final Project project,
+ final File archive, final String fileType, final User uploader, final Props additionalProps)
+ throws ProjectManagerException {
+ log.info("Uploading files to " + project.getName());
+
+ // Unzip.
+ File file = null;
+ try {
+ if (fileType == null) {
+ throw new ProjectManagerException("Unknown file type for "
+ + archive.getName());
+ } else if ("zip".equals(fileType)) {
+ file = unzipFile(archive);
+ } else {
+ throw new ProjectManagerException("Unsupported archive type for file "
+ + archive.getName());
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error unzipping file.", e);
+ }
+
+ // Since props is an instance variable of ProjectManager, and each
+ // invocation to the uploadProject manager needs to pass a different
+ // value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
+ // create a new instance of Props to make sure these different values
+ // are isolated from each other.
+ final Props prop = new Props(this.props);
+ prop.putAll(additionalProps);
+ prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH,
+ archive.getAbsolutePath());
+ // Basically, we want to make sure that for different invocations to the
+ // uploadProject method,
+ // the validators are using different values for the
+ // PROJECT_ARCHIVE_FILE_PATH configuration key.
+ // In addition, we want to reload the validator objects for each upload, so
+ // that we can change the validator configuration files without having to
+ // restart Azkaban web server. If the XmlValidatorManager is an instance
+ // variable, 2 consecutive invocations to the uploadProject
+ // method might cause the second one to overwrite the
+ // PROJECT_ARCHIVE_FILE_PATH configuration parameter
+ // of the first, thus causing a wrong archive file path to be passed to the
+ // validators. Creating a separate XmlValidatorManager object for each
+ // upload will prevent this issue without having to add
+ // synchronization between uploads. Since we're already reloading the XML
+ // config file and creating validator objects for each upload, this does
+ // not add too much additional overhead.
+ final ValidatorManager validatorManager = new XmlValidatorManager(prop);
+ log.info("Validating project " + archive.getName()
+ + " using the registered validators "
+ + validatorManager.getValidatorsInfo().toString());
+ final Map<String, ValidationReport> reports = validatorManager.validate(project, file);
+ ValidationStatus status = ValidationStatus.PASS;
+ for (final Entry<String, ValidationReport> report : reports.entrySet()) {
+ if (report.getValue().getStatus().compareTo(status) > 0) {
+ status = report.getValue().getStatus();
+ }
+ }
+ if (status == ValidationStatus.ERROR) {
+ log.error("Error found in upload to " + project.getName()
+ + ". Cleaning up.");
+
+ try {
+ FileUtils.deleteDirectory(file);
+ } catch (final IOException e) {
+ file.deleteOnExit();
+ e.printStackTrace();
+ }
+
+ return reports;
+ }
+
+ final DirectoryFlowLoader loader =
+ (DirectoryFlowLoader) validatorManager.getDefaultValidator();
+ final Map<String, Props> jobProps = loader.getJobProps();
+ final List<Props> propProps = loader.getProps();
+
+ synchronized (project) {
+ final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+ final Map<String, Flow> flows = loader.getFlowMap();
+ for (final Flow flow : flows.values()) {
+ flow.setProjectId(project.getId());
+ flow.setVersion(newVersion);
+ }
+
+ this.storageManager.uploadProject(project, newVersion, archive, uploader);
+
+ log.info("Uploading flow to db " + archive.getName());
+ this.projectLoader.uploadFlows(project, newVersion, flows.values());
+ log.info("Changing project versions " + archive.getName());
+ this.projectLoader.changeProjectVersion(project, newVersion,
+ uploader.getUserId());
+ project.setFlows(flows);
+ log.info("Uploading Job properties");
+ this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
+ jobProps.values()));
+ log.info("Uploading Props properties");
+ this.projectLoader.uploadProjectProperties(project, propProps);
+ }
+
+ log.info("Uploaded project files. Cleaning up temp files.");
+ this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+ "Uploaded project files zip " + archive.getName());
+ try {
+ FileUtils.deleteDirectory(file);
+ } catch (final IOException e) {
+ file.deleteOnExit();
+ e.printStackTrace();
+ }
+
+ log.info("Cleaning up old install files older than "
+ + (project.getVersion() - this.projectVersionRetention));
+ this.projectLoader.cleanOlderProjectVersion(project.getId(),
+ project.getVersion() - this.projectVersionRetention);
+
+ return reports;
+ }
+
+ private File unzipFile(final File archiveFile) throws IOException {
+ final ZipFile zipfile = new ZipFile(archiveFile);
+ final File unzipped = Utils.createTempDir(this.tempDir);
+ Utils.unzip(zipfile, unzipped);
+ zipfile.close();
+
+ return unzipped;
+ }
+
+ public ProjectFileHandler getProjectFile(final Project project, int version)
+ throws ProjectManagerException {
+ if (version == -1) {
+ version = this.projectLoader.getLatestProjectVersion(project);
+ }
+ return this.storageManager.getProjectFile(project.getId(), version);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index f051e85..8a63a5d 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -21,9 +21,7 @@ import static java.util.Objects.requireNonNull;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.validator.ValidationReport;
-import azkaban.project.validator.ValidationStatus;
import azkaban.project.validator.ValidatorConfigs;
-import azkaban.project.validator.ValidatorManager;
import azkaban.project.validator.XmlValidatorManager;
import azkaban.storage.StorageManager;
import azkaban.user.Permission;
@@ -31,31 +29,24 @@ import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
-import azkaban.utils.Utils;
import com.google.inject.Inject;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
-import java.util.zip.ZipFile;
-import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
+ private final AzkabanProjectLoader azkabanProjectLoader;
private final ProjectLoader projectLoader;
- private final StorageManager storageManager;
private final Props props;
- private final File tempDir;
- private final int projectVersionRetention;
private final boolean creatorDefaultPermissions;
private final ConcurrentHashMap<Integer, Project> projectsById =
new ConcurrentHashMap<>();
@@ -63,25 +54,17 @@ public class ProjectManager {
new ConcurrentHashMap<>();
@Inject
- public ProjectManager(final ProjectLoader loader, final StorageManager storageManager,
+ public ProjectManager(final AzkabanProjectLoader azkabanProjectLoader,
+ final ProjectLoader loader,
+ final StorageManager storageManager,
final Props props) {
this.projectLoader = requireNonNull(loader);
- this.storageManager = requireNonNull(storageManager);
this.props = requireNonNull(props);
-
- this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
- this.projectVersionRetention =
- (props.getInt("project.version.retention", 3));
- logger.info("Project version retention is set to "
- + this.projectVersionRetention);
+ this.azkabanProjectLoader = requireNonNull(azkabanProjectLoader);
this.creatorDefaultPermissions =
props.getBoolean("creator.default.proxy", true);
- if (!this.tempDir.exists()) {
- this.tempDir.mkdirs();
- }
-
// The prop passed to XmlValidatorManager is used to initialize all the
// validators
// Each validator will take certain key/value pairs from the prop to
@@ -457,130 +440,16 @@ public class ProjectManager {
* project name and version
* @throws ProjectManagerException
*/
- public ProjectFileHandler getProjectFileHandler(final Project project, int version)
+ public ProjectFileHandler getProjectFileHandler(final Project project, final int version)
throws ProjectManagerException {
-
- if (version == -1) {
- version = this.projectLoader.getLatestProjectVersion(project);
- }
- return this.storageManager.getProjectFile(project.getId(), version);
+ return azkabanProjectLoader.getProjectFile(project, version);
}
public Map<String, ValidationReport> uploadProject(final Project project,
final File archive, final String fileType, final User uploader, final Props additionalProps)
throws ProjectManagerException {
- logger.info("Uploading files to " + project.getName());
-
- // Unzip.
- File file = null;
- try {
- if (fileType == null) {
- throw new ProjectManagerException("Unknown file type for "
- + archive.getName());
- } else if ("zip".equals(fileType)) {
- file = unzipFile(archive);
- } else {
- throw new ProjectManagerException("Unsupported archive type for file "
- + archive.getName());
- }
- } catch (final IOException e) {
- throw new ProjectManagerException("Error unzipping file.", e);
- }
-
- // Since props is an instance variable of ProjectManager, and each
- // invocation to the uploadProject manager needs to pass a different
- // value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
- // create a new instance of Props to make sure these different values
- // are isolated from each other.
- final Props prop = new Props(this.props);
- prop.putAll(additionalProps);
- prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH,
- archive.getAbsolutePath());
- // Basically, we want to make sure that for different invocations to the
- // uploadProject method,
- // the validators are using different values for the
- // PROJECT_ARCHIVE_FILE_PATH configuration key.
- // In addition, we want to reload the validator objects for each upload, so
- // that we can change the validator configuration files without having to
- // restart Azkaban web server. If the XmlValidatorManager is an instance
- // variable, 2 consecutive invocations to the uploadProject
- // method might cause the second one to overwrite the
- // PROJECT_ARCHIVE_FILE_PATH configuration parameter
- // of the first, thus causing a wrong archive file path to be passed to the
- // validators. Creating a separate XmlValidatorManager object for each
- // upload will prevent this issue without having to add
- // synchronization between uploads. Since we're already reloading the XML
- // config file and creating validator objects for each upload, this does
- // not add too much additional overhead.
- final ValidatorManager validatorManager = new XmlValidatorManager(prop);
- logger.info("Validating project " + archive.getName()
- + " using the registered validators "
- + validatorManager.getValidatorsInfo().toString());
- final Map<String, ValidationReport> reports = validatorManager.validate(project, file);
- ValidationStatus status = ValidationStatus.PASS;
- for (final Entry<String, ValidationReport> report : reports.entrySet()) {
- if (report.getValue().getStatus().compareTo(status) > 0) {
- status = report.getValue().getStatus();
- }
- }
- if (status == ValidationStatus.ERROR) {
- logger.error("Error found in upload to " + project.getName()
- + ". Cleaning up.");
-
- try {
- FileUtils.deleteDirectory(file);
- } catch (final IOException e) {
- file.deleteOnExit();
- e.printStackTrace();
- }
-
- return reports;
- }
-
- final DirectoryFlowLoader loader =
- (DirectoryFlowLoader) validatorManager.getDefaultValidator();
- final Map<String, Props> jobProps = loader.getJobProps();
- final List<Props> propProps = loader.getProps();
-
- synchronized (project) {
- final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
- final Map<String, Flow> flows = loader.getFlowMap();
- for (final Flow flow : flows.values()) {
- flow.setProjectId(project.getId());
- flow.setVersion(newVersion);
- }
-
- this.storageManager.uploadProject(project, newVersion, archive, uploader);
-
- logger.info("Uploading flow to db " + archive.getName());
- this.projectLoader.uploadFlows(project, newVersion, flows.values());
- logger.info("Changing project versions " + archive.getName());
- this.projectLoader.changeProjectVersion(project, newVersion,
- uploader.getUserId());
- project.setFlows(flows);
- logger.info("Uploading Job properties");
- this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
- jobProps.values()));
- logger.info("Uploading Props properties");
- this.projectLoader.uploadProjectProperties(project, propProps);
- }
-
- logger.info("Uploaded project files. Cleaning up temp files.");
- this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
- "Uploaded project files zip " + archive.getName());
- try {
- FileUtils.deleteDirectory(file);
- } catch (final IOException e) {
- file.deleteOnExit();
- e.printStackTrace();
- }
-
- logger.info("Cleaning up old install files older than "
- + (project.getVersion() - this.projectVersionRetention));
- this.projectLoader.cleanOlderProjectVersion(project.getId(),
- project.getVersion() - this.projectVersionRetention);
-
- return reports;
+ return azkabanProjectLoader
+ .uploadProject(project, archive, fileType, uploader, additionalProps);
}
public void updateFlow(final Project project, final Flow flow)
@@ -588,14 +457,6 @@ public class ProjectManager {
this.projectLoader.updateFlow(project, flow.getVersion(), flow);
}
- private File unzipFile(final File archiveFile) throws IOException {
- final ZipFile zipfile = new ZipFile(archiveFile);
- final File unzipped = Utils.createTempDir(this.tempDir);
- Utils.unzip(zipfile, unzipped);
- zipfile.close();
-
- return unzipped;
- }
public void postProjectEvent(final Project project, final EventType type, final String user,
final String message) {
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
new file mode 100644
index 0000000..5f98984
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static azkaban.Constants.ConfigurationKeys.PROJECT_TEMP_DIR;
+import static java.util.Objects.requireNonNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import azkaban.storage.StorageManager;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import java.io.File;
+import java.net.URL;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AzkabanProjectLoaderTest {
+
+ @Rule
+ public final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+ private final int ID = 107;
+ private final int VERSION = 10;
+ private final Project project = new Project(this.ID, "project1");
+
+ private AzkabanProjectLoader azkabanProjectLoader;
+ private StorageManager storageManager;
+ private ProjectLoader projectLoader;
+
+ @Before
+ public void setUp() throws Exception {
+ final Props props = new Props();
+ props.put(PROJECT_TEMP_DIR, this.TEMP_DIR.getRoot().getAbsolutePath());
+
+ this.storageManager = mock(StorageManager.class);
+ this.projectLoader = mock(ProjectLoader.class);
+
+ this.azkabanProjectLoader = new AzkabanProjectLoader(props, this.projectLoader,
+ this.storageManager);
+ }
+
+ @Test
+ public void uploadProject() throws Exception {
+ when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
+
+ final URL resource = requireNonNull(
+ getClass().getClassLoader().getResource("sample_flow_01.zip"));
+ final File projectZipFile = new File(resource.getPath());
+ final User uploader = new User("test_user");
+
+ this.azkabanProjectLoader.uploadProject(this.project, projectZipFile, "zip", uploader, null);
+
+ verify(this.storageManager)
+ .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
+ }
+
+ @Test
+ public void getProjectFile() throws Exception {
+ when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
+
+ // Run the code
+ this.azkabanProjectLoader.getProjectFile(this.project, -1);
+
+ verify(this.projectLoader).getLatestProjectVersion(this.project);
+ verify(this.storageManager).getProjectFile(this.ID, this.VERSION);
+ }
+
+}