Details
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 83aaf7c..11424f6 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -49,7 +49,7 @@ public class AzkabanCommonModule extends AbstractModule {
public AzkabanCommonModule(Props props) {
this.props = props;
- this.storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE, LOCAL.name());
+ this.storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE, DATABASE.name());
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 6f55a9c..806f7b0 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -16,6 +16,7 @@
package azkaban.project;
+import com.google.common.io.Files;
import com.google.inject.Inject;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -350,20 +351,23 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
}
@Override
- public void uploadProjectFile(Project project, int version, String filetype,
- String filename, File localFile, String uploader)
+ public void uploadProjectFile(int projectId, int version, File localFile, String uploader)
throws ProjectManagerException {
long startMs = System.currentTimeMillis();
- logger.info("Uploading to " + project.getName() + " version:" + version
- + " file:" + filename);
+ logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]",
+ projectId, localFile.getName(), localFile.length()));
Connection connection = getConnection();
try {
- uploadProjectFile(connection, project, version, filetype, filename,
- localFile, uploader);
+ /* Update DB with new project info */
+ addProjectToProjectVersions(connection, projectId, version, localFile, uploader);
+
+ uploadProjectFile(connection, projectId, version, localFile);
+
connection.commit();
- logger.info("project " + project.getName() + " commiting upload " + localFile.getName()
- + " took " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+ long duration = (System.currentTimeMillis() - startMs) / 1000;
+ logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec",
+ projectId, localFile.getName(), localFile.length(), duration));
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error getting DB connection.", e);
@@ -372,14 +376,43 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
}
}
- private void uploadProjectFile(Connection connection, Project project,
- int version, String filetype, String filename, File localFile,
+ private void uploadProjectFile(Connection connection, int projectId, int version, File localFile)
+ throws ProjectManagerException {
+ /* Step 1: Upload File in chunks to DB */
+ int chunks = uploadFileInChunks(connection, projectId, version, localFile);
+
+ /* Step 2: Update number of chunks in DB */
+ updateChunksInProjectVersions(connection, projectId, version, chunks);
+ }
+
+ /**
+ * Insert a new version record to TABLE project_versions before uploading files.
+ *
+ * The reason for this operation:
+ * When error chunking happens in remote mysql server, incomplete file data remains
+ * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
+ * the SQL exception prevents AZ from creating the new version record in Table project_versions.
+ * However, the Table project_files still reserve the incomplete files, which causes troubles
+ * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
+ * inserting new files to db.
+ *
+ * Why this operation is safe:
+ * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
+ * proj_v+1 will be used as the new version for the uploading files.
+ *
+ * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
+ * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
+ * When file uploading completes, AZ will clean all old chunks in DB afterward.
+ */
+ private void addProjectToProjectVersions(Connection connection,
+ int projectId,
+ int version,
+ File localFile,
String uploader) throws ProjectManagerException {
+ final long updateTime = System.currentTimeMillis();
QueryRunner runner = new QueryRunner();
- long updateTime = System.currentTimeMillis();
-
logger.info("Creating message digest for upload " + localFile.getName());
- byte[] md5 = null;
+ byte[] md5;
try {
md5 = Md5Hasher.md5Hash(localFile);
} catch (IOException e) {
@@ -388,41 +421,35 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
logger.info("Md5 hash created");
- /**
- * Insert a new version record to TABLE project_versions before uploading files.
- *
- * The reason for this operation:
- * When error chunking happens in remote mysql server, incomplete file data remains
- * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
- * the SQL exception prevents AZ from creating the new version record in Table project_versions.
- * However, the Table project_files still reserve the incomplete files, which causes troubles
- * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
- * inserting new files to db.
- *
- * Why this operation is safe:
- * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
- * proj_v+1 will be used as the new version for the uploading files.
- *
- * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
- * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
- * When file uploading completes, AZ will clean all old chunks in DB afterward.
- */
- final String INSERT_PROJECT_VERSION =
- "INSERT INTO project_versions (project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values (?,?,?,?,?,?,?,?)";
+ final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
+ + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values "
+ + "(?,?,?,?,?,?,?,?)";
try {
-
/**
* As we don't know the num_chunks before uploading the file, we initialize it to 0,
* and will update it after uploading completes.
*/
- runner.update(connection, INSERT_PROJECT_VERSION, project.getId(),
- version, updateTime, uploader, filetype, filename, md5, 0);
+ runner.update(connection,
+ INSERT_PROJECT_VERSION,
+ projectId,
+ version,
+ updateTime,
+ uploader,
+ Files.getFileExtension(localFile.getName()),
+ localFile.getName(),
+ md5,
+ 0);
} catch (SQLException e) {
- logger.error(e);
- throw new ProjectManagerException("Error initializing project version "
- + project.getName(), e);
+ String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
+ logger.error(msg, e);
+ throw new ProjectManagerException(msg, e);
}
+ }
+
+ private int uploadFileInChunks(Connection connection, int projectId, int version, File localFile)
+ throws ProjectManagerException {
+ QueryRunner runner = new QueryRunner();
// Really... I doubt we'll get a > 2gig file. So int casting it is!
byte[] buffer = new byte[CHUCK_SIZE];
@@ -435,15 +462,14 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
bufferedStream = new BufferedInputStream(new FileInputStream(localFile));
int size = bufferedStream.read(buffer);
while (size >= 0) {
- logger.info("Read bytes for " + filename + " size:" + size);
+ logger.info("Read bytes for " + localFile.getName() + " size:" + size);
byte[] buf = buffer;
if (size < buffer.length) {
buf = Arrays.copyOfRange(buffer, 0, size);
}
try {
- logger.info("Running update for " + filename + " chunk " + chunk);
- runner.update(connection, INSERT_PROJECT_FILES, project.getId(),
- version, chunk, size, buf);
+ logger.info("Running update for " + localFile.getName() + " chunk " + chunk);
+ runner.update(connection, INSERT_PROJECT_FILES, projectId, version, chunk, size, buf);
/**
* We enforce az committing to db when uploading every single chunk,
@@ -453,7 +479,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
* the remote mysql server will run into memory troubles.
*/
connection.commit();
- logger.info("Finished update for " + filename + " chunk " + chunk);
+ logger.info("Finished update for " + localFile.getName() + " chunk " + chunk);
} catch (SQLException e) {
throw new ProjectManagerException("Error Chunking during uploading files to db...");
}
@@ -462,27 +488,34 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
size = bufferedStream.read(buffer);
}
} catch (IOException e) {
- throw new ProjectManagerException("Error chunking file " + filename);
+ throw new ProjectManagerException(String.format(
+ "Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d",
+ projectId, version, localFile.getName(), localFile.length(), chunk));
} finally {
IOUtils.closeQuietly(bufferedStream);
}
+ return chunk;
+ }
+
+ /**
+ * we update num_chunks's actual number to db here.
+ */
+ private void updateChunksInProjectVersions(Connection connection, int projectId, int version, int chunk)
+ throws ProjectManagerException {
- /**
- * we update num_chunks's actual number to db here.
- */
final String UPDATE_PROJECT_NUM_CHUNKS =
"UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
+ QueryRunner runner = new QueryRunner();
try {
- runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, project.getId(), version);
+ runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, projectId, version);
connection.commit();
} catch (SQLException e) {
- throw new ProjectManagerException(
- "Error updating project " + project.getId() + " : chunk_num "
- + chunk, e);
+ throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
}
}
+
@Override
public ProjectFileHandler getUploadedFile(Project project, int version)
throws ProjectManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index aa886d6..c7af71c 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -36,7 +36,7 @@ public interface ProjectLoader {
* @return
* @throws ProjectManagerException
*/
- public List<Project> fetchAllActiveProjects() throws ProjectManagerException;
+ List<Project> fetchAllActiveProjects() throws ProjectManagerException;
/**
* Loads whole project, including permissions, by the project id.
@@ -45,7 +45,7 @@ public interface ProjectLoader {
* @return
* @throws ProjectManagerException
*/
- public Project fetchProjectById(int id) throws ProjectManagerException;
+ Project fetchProjectById(int id) throws ProjectManagerException;
/**
* Loads whole project, including permissions, by the project name.
@@ -53,7 +53,7 @@ public interface ProjectLoader {
* @return
* @throws ProjectManagerException
*/
- public Project fetchProjectByName(String name) throws ProjectManagerException;
+ Project fetchProjectByName(String name) throws ProjectManagerException;
/**
* Should create an empty project with the given name and user and adds it to
@@ -69,7 +69,7 @@ public interface ProjectLoader {
* @throws ProjectManagerException if an active project of the same name
* exists.
*/
- public Project createNewProject(String name, String description, User creator)
+ Project createNewProject(String name, String description, User creator)
throws ProjectManagerException;
/**
@@ -78,7 +78,7 @@ public interface ProjectLoader {
* @param project
* @throws ProjectManagerException
*/
- public void removeProject(Project project, String user)
+ void removeProject(Project project, String user)
throws ProjectManagerException;
/**
@@ -92,10 +92,10 @@ public interface ProjectLoader {
* @param isGroup
* @throws ProjectManagerException
*/
- public void updatePermission(Project project, String name, Permission perm,
+ void updatePermission(Project project, String name, Permission perm,
boolean isGroup) throws ProjectManagerException;
- public void removePermission(Project project, String name, boolean isGroup)
+ void removePermission(Project project, String name, boolean isGroup)
throws ProjectManagerException;
/**
@@ -105,7 +105,7 @@ public interface ProjectLoader {
* @param description
* @throws ProjectManagerException
*/
- public void updateDescription(Project project, String description, String user)
+ void updateDescription(Project project, String description, String user)
throws ProjectManagerException;
/**
@@ -116,7 +116,7 @@ public interface ProjectLoader {
* @param type
* @param message return true if the posting was success.
*/
- public boolean postEvent(Project project, EventType type, String user,
+ boolean postEvent(Project project, EventType type, String user,
String message);
/**
@@ -125,14 +125,13 @@ public interface ProjectLoader {
* @param project
* @return
*/
- public List<ProjectLogEvent> getProjectEvents(Project project, int num,
+ List<ProjectLogEvent> getProjectEvents(Project project, int num,
int skip) throws ProjectManagerException;
/**
* Will upload the files and return the version number of the file uploaded.
*/
- public void uploadProjectFile(Project project, int version, String filetype,
- String filename, File localFile, String user)
+ void uploadProjectFile(int projectId, int version, File localFile, String user)
throws ProjectManagerException;
/**
@@ -140,7 +139,7 @@ public interface ProjectLoader {
*
* @return
*/
- public ProjectFileHandler getUploadedFile(Project project, int version)
+ ProjectFileHandler getUploadedFile(Project project, int version)
throws ProjectManagerException;
/**
@@ -148,7 +147,7 @@ public interface ProjectLoader {
*
* @return
*/
- public ProjectFileHandler getUploadedFile(int projectId, int version)
+ ProjectFileHandler getUploadedFile(int projectId, int version)
throws ProjectManagerException;
/**
@@ -158,10 +157,10 @@ public interface ProjectLoader {
* @param version
* @throws ProjectManagerException
*/
- public void changeProjectVersion(Project project, int version, String user)
+ void changeProjectVersion(Project project, int version, String user)
throws ProjectManagerException;
- public void updateFlow(Project project, int version, Flow flow)
+ void updateFlow(Project project, int version, Flow flow)
throws ProjectManagerException;
/**
@@ -172,7 +171,7 @@ public interface ProjectLoader {
* @param flows
* @throws ProjectManagerException
*/
- public void uploadFlows(Project project, int version, Collection<Flow> flows)
+ void uploadFlows(Project project, int version, Collection<Flow> flows)
throws ProjectManagerException;
/**
@@ -183,7 +182,7 @@ public interface ProjectLoader {
* @param flow
* @throws ProjectManagerException
*/
- public void uploadFlow(Project project, int version, Flow flow)
+ void uploadFlow(Project project, int version, Flow flow)
throws ProjectManagerException;
/**
@@ -194,7 +193,7 @@ public interface ProjectLoader {
* @param flowId
* @throws ProjectManagerException
*/
- public Flow fetchFlow(Project project, String flowId)
+ Flow fetchFlow(Project project, String flowId)
throws ProjectManagerException;
/**
@@ -205,13 +204,13 @@ public interface ProjectLoader {
* @param flowId
* @throws ProjectManagerException
*/
- public List<Flow> fetchAllProjectFlows(Project project)
+ List<Flow> fetchAllProjectFlows(Project project)
throws ProjectManagerException;
/**
* Gets the latest upload version.
*/
- public int getLatestProjectVersion(Project project)
+ int getLatestProjectVersion(Project project)
throws ProjectManagerException;
/**
@@ -222,7 +221,7 @@ public interface ProjectLoader {
* @param properties
* @throws ProjectManagerException
*/
- public void uploadProjectProperty(Project project, Props props)
+ void uploadProjectProperty(Project project, Props props)
throws ProjectManagerException;
/**
@@ -233,7 +232,7 @@ public interface ProjectLoader {
* @param properties
* @throws ProjectManagerException
*/
- public void uploadProjectProperties(Project project, List<Props> properties)
+ void uploadProjectProperties(Project project, List<Props> properties)
throws ProjectManagerException;
/**
@@ -244,7 +243,7 @@ public interface ProjectLoader {
* @return
* @throws ProjectManagerException
*/
- public Props fetchProjectProperty(Project project, String propsName)
+ Props fetchProjectProperty(Project project, String propsName)
throws ProjectManagerException;
/**
@@ -254,7 +253,7 @@ public interface ProjectLoader {
* @return
* @throws ProjectManagerException
*/
- public Map<String, Props> fetchProjectProperties(int projectId, int version)
+ Map<String, Props> fetchProjectProperties(int projectId, int version)
throws ProjectManagerException;
/**
@@ -264,10 +263,10 @@ public interface ProjectLoader {
* @param version
* @throws ProjectManagerException
*/
- public void cleanOlderProjectVersion(int projectId, int version)
+ void cleanOlderProjectVersion(int projectId, int version)
throws ProjectManagerException;
- public void updateProjectProperty(Project project, Props props)
+ void updateProjectProperty(Project project, Props props)
throws ProjectManagerException;
Props fetchProjectProperty(int projectId, int projectVer, String propsName)
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 6ea28c3..6e77d78 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -16,6 +16,8 @@
package azkaban.project;
+import azkaban.storage.StorageManager;
+import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -23,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.zip.ZipFile;
@@ -45,17 +46,24 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
+import static java.util.Objects.*;
+
+
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
private final ProjectLoader projectLoader;
+ private final StorageManager storageManager;
private final Props props;
private final File tempDir;
private final int projectVersionRetention;
private final boolean creatorDefaultPermissions;
- public ProjectManager(ProjectLoader loader, Props props) {
- this.projectLoader = loader;
- this.props = props;
+ @Inject
+ public ProjectManager(ProjectLoader loader, StorageManager storageManager, Props props) {
+ this.projectLoader = requireNonNull(loader);
+ this.storageManager = requireNonNull(storageManager);
+ this.props = requireNonNull(props);
+
this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
this.projectVersionRetention =
(props.getInt("project.version.retention", 3));
@@ -492,9 +500,8 @@ public class ProjectManager {
flow.setVersion(newVersion);
}
- logger.info("Uploading file to db " + archive.getName());
- projectLoader.uploadProjectFile(project, newVersion, fileType,
- archive.getName(), archive, uploader.getUserId());
+ storageManager.uploadProject(project, newVersion, archive, uploader);
+
logger.info("Uploading flow to db " + archive.getName());
projectLoader.uploadFlows(project, newVersion, flows.values());
logger.info("Changing project versions " + archive.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java b/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java
index 6ad88bf..fe1b50a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java
@@ -16,7 +16,10 @@
package azkaban.project;
-public class ProjectManagerException extends Exception {
+import azkaban.spi.AzkabanException;
+
+
+public class ProjectManagerException extends AzkabanException {
private static final long serialVersionUID = 1L;
public ProjectManagerException(String message) {
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 583fa78..3149d9c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -17,9 +17,10 @@
package azkaban.storage;
-import azkaban.project.JdbcProjectLoader;
+import azkaban.project.ProjectLoader;
import azkaban.spi.Storage;
import azkaban.spi.StorageMetadata;
+import java.io.File;
import java.io.InputStream;
import java.net.URI;
import javax.inject.Inject;
@@ -33,18 +34,26 @@ import javax.inject.Inject;
*/
public class DatabaseStorage implements Storage {
+ private final ProjectLoader projectLoader;
+
@Inject
- public DatabaseStorage(JdbcProjectLoader jdbcProjectLoader) {
+ public DatabaseStorage(ProjectLoader projectLoader) {
+ this.projectLoader = projectLoader;
}
@Override
public InputStream get(URI key) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
- public URI put(StorageMetadata metadata, InputStream is) {
+ public URI put(StorageMetadata metadata, File localFile) {
+ projectLoader.uploadProjectFile(
+ metadata.getProjectId(),
+ metadata.getVersion(),
+ localFile, metadata.getUploader());
+
return null;
}
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index a71f131..a1a709e 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -21,6 +21,7 @@ import azkaban.spi.Storage;
import azkaban.spi.StorageException;
import azkaban.spi.StorageMetadata;
import azkaban.utils.FileIOUtils;
+import com.google.common.io.Files;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -56,14 +57,15 @@ public class LocalStorage implements Storage {
}
@Override
- public URI put(StorageMetadata metadata, InputStream is) {
+ public URI put(StorageMetadata metadata, File localFile) {
- final File projectDir = new File(baseDirectory, metadata.getProjectId());
+ final File projectDir = new File(baseDirectory, String.valueOf(metadata.getProjectId()));
if (projectDir.mkdir()) {
log.info("Created project dir: " + projectDir.getAbsolutePath());
}
- final File targetFile = new File(projectDir, metadata.getVersion() + "." + metadata.getExtension());
+ final File targetFile = new File(projectDir,
+ metadata.getVersion() + "." + Files.getFileExtension(localFile.getName()));
if (targetFile.exists()) {
throw new StorageException(String.format(
@@ -71,7 +73,7 @@ public class LocalStorage implements Storage {
targetFile, metadata));
}
try {
- FileUtils.copyInputStreamToFile(is, targetFile);
+ FileUtils.copyFile(localFile, targetFile);
} catch (IOException e) {
log.error("LocalStorage error in put(): Metadata: " + metadata);
throw new StorageException(e);
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 6d0f714..d09e5f5 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -18,9 +18,12 @@
package azkaban.storage;
import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
import azkaban.spi.Storage;
import azkaban.spi.StorageException;
import azkaban.spi.StorageMetadata;
+import azkaban.user.User;
import com.google.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
@@ -47,41 +50,25 @@ public class StorageManager {
/**
* API to a project file into Azkaban Storage
*
- * @param project project ID
- * @param fileExtension extension of the file
- * @param filename name of the file
+ * TODO clean up interface
+ *
+ * @param project project
+ * @param version The new version to be uploaded
* @param localFile local file
* @param uploader the user who uploaded
*/
public void uploadProject(
Project project,
- String fileExtension,
- String filename,
+ int version,
File localFile,
- String uploader) {
+ User uploader) {
final StorageMetadata metadata = new StorageMetadata(
- String.valueOf(project.getId()),
- String.valueOf(getLatestVersion(project)),
- fileExtension
+ project.getId(),
+ version,
+ uploader.getUserId()
);
- log.info(String.format(
- "Uploading project. Uploader: %s, Metadata:%s, filename: %s[%d bytes]",
- uploader, metadata, filename, localFile.length()
- ));
- try {
- uploadProject(metadata, new FileInputStream(localFile));
- } catch (FileNotFoundException e) {
- throw new StorageException(e);
- }
- }
-
- private int getLatestVersion(Project project) {
- // TODO Implement
- return -1;
- }
-
- public void uploadProject(StorageMetadata metadata, InputStream is) {
- // TODO Implement
- URI key = storage.put(metadata, is);
+ log.info(String.format("Adding archive to storage. Meta:%s File: %s[%d bytes]",
+ metadata, localFile.getName(), localFile.length()));
+ storage.put(metadata, localFile);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index 67efed9..fb0543b 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -537,10 +537,9 @@ public class JdbcProjectLoaderTest {
Assert.assertEquals("Project description", projectDescription,
project.getDescription());
- File testDir = new File("unit/project/testjob/testjob.zip");
+ File testFile = new File("unit/project/testjob/testjob.zip");
- loader.uploadProjectFile(project, 1, "zip", "testjob.zip", testDir,
- user.getUserId());
+ loader.uploadProjectFile(project.getId(), 1, testFile, user.getUserId());
ProjectFileHandler handler = loader.getUploadedFile(project, 1);
Assert.assertEquals(handler.getProjectId(), project.getId());
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 6d13146..1e1d37d 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -116,8 +116,7 @@ public class MockProjectLoader implements ProjectLoader {
}
@Override
- public void uploadProjectFile(Project project, int version, String filetype,
- String filename, File localFile, String user)
+ public void uploadProjectFile(int projectId, int version, File localFile, String user)
throws ProjectManagerException {
// TODO Auto-generated method stub
diff --git a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
index ca731d3..6a1674b 100644
--- a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
@@ -1,5 +1,6 @@
package azkaban.project;
+import azkaban.storage.StorageManager;
import azkaban.user.User;
import azkaban.utils.Props;
import java.io.File;
@@ -15,6 +16,7 @@ import org.mockito.stubbing.Answer;
public class ProjectManagerTest {
private ProjectManager manager;
private ProjectLoader loader;
+ private StorageManager storageManager;
private User user;
private static final String PROJECT_NAME = "myTest";
private static final String PROJECT_NAME_2 = "myTest_2";
@@ -30,7 +32,8 @@ public class ProjectManagerTest {
public void setUp() throws Exception {
Props props = new Props();
loader = mock(ProjectLoader.class);
- manager = new ProjectManager(loader, props);
+ storageManager = mock(StorageManager.class);
+ manager = new ProjectManager(loader, storageManager, props);
user = new User(TEST_USER);
Project project1 = new Project(PROJECT_ID, PROJECT_NAME);
project1.setDescription(PROJECT_DESCRIPTION);
@@ -116,18 +119,19 @@ public class ProjectManagerTest {
public void testUploadProject() throws Exception {
System.out.println("TestUploadProject");
Project project = manager.createProject(PROJECT_NAME, PROJECT_DESCRIPTION, user);
- File testDir = new File(this.getClass().getClassLoader().getResource("project/testjob/testjob.zip").getFile());
- System.out.println("Uploading zip file: " + testDir.getAbsolutePath());
+ File testFile = new File(this.getClass().getClassLoader().getResource("project/testjob/testjob.zip").getFile());
+ System.out.println("Uploading zip file: " + testFile.getAbsolutePath());
Props props = new Props();
- manager.uploadProject(project, testDir, FILE_TYPE, user, props);
- verify(loader).uploadProjectFile(project, PROJECT_VERSION + 1, FILE_TYPE, testDir.getName(),
- testDir, user.getUserId());
+ manager.uploadProject(project, testFile, FILE_TYPE, user, props);
+
+ verify(storageManager).uploadProject(project, PROJECT_VERSION + 1, testFile, user);
+
verify(loader).uploadFlows(eq(project), eq(PROJECT_VERSION + 1), anyCollection());
verify(loader).changeProjectVersion(project, PROJECT_VERSION + 1, user.getUserId());
//uploadProjectProperties should be called twice, one for jobProps, the other for propProps
verify(loader, times(2)).uploadProjectProperties(eq(project), anyList());
verify(loader).postEvent(project, ProjectLogEvent.EventType.UPLOADED, user.getUserId(),
- "Uploaded project files zip " + testDir.getName());
+ "Uploaded project files zip " + testFile.getName());
verify(loader).cleanOlderProjectVersion(project.getId(), PROJECT_VERSION + 1 - PROJECT_VERSION_RETENTIION);
}
diff --git a/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
new file mode 100644
index 0000000..5e61869
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import azkaban.project.ProjectLoader;
+import azkaban.spi.StorageMetadata;
+import java.io.File;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class DatabaseStorageTest {
+ private final ProjectLoader projectLoader = mock(ProjectLoader.class);
+ private final DatabaseStorage databaseStorage = new DatabaseStorage(projectLoader);
+
+ @Test
+ public void testPut() throws Exception {
+ final File file = mock(File.class);
+ int projectId = 1234;
+ int version = 1;
+ String uploader = "testuser";
+ final StorageMetadata metadata = new StorageMetadata(projectId, version, uploader);
+ databaseStorage.put(metadata, file);
+ verify(projectLoader).uploadProjectFile(projectId, version, file, uploader);
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
index f4ff2b4..6f4d029 100644
--- a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
@@ -19,7 +19,6 @@ package azkaban.storage;
import azkaban.spi.StorageMetadata;
import java.io.File;
-import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import org.apache.commons.io.FileUtils;
@@ -56,18 +55,16 @@ public class LocalStorageTest {
ClassLoader classLoader = getClass().getClassLoader();
File testFile = new File(classLoader.getResource(SAMPLE_FILE).getFile());
- URI key;
- try (InputStream is = new FileInputStream(testFile)) {
- // test put
- key = localStorage.put(new StorageMetadata("testProjectId", "1", "zip"), is);
- }
+ final StorageMetadata metadata = new StorageMetadata(1, 1, "testuser");
+ final URI key = localStorage.put(metadata, testFile);
assertNotNull(key);
log.info("Key URI: " + key);
File expectedTargetFile = new File(BASE_DIRECTORY, new StringBuilder()
- .append("testProjectId")
+ .append(metadata.getProjectId())
.append(File.separator)
- .append("1.zip")
+ .append(metadata.getVersion())
+ .append(".zip")
.toString()
);
assertTrue(expectedTargetFile.exists());
diff --git a/azkaban-spi/src/main/java/azkaban/spi/Storage.java b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
index b8f4d94..24497d4 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/Storage.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
@@ -17,6 +17,7 @@
package azkaban.spi;
+import java.io.File;
import java.io.InputStream;
import java.net.URI;
@@ -45,11 +46,11 @@ public interface Storage {
* Put an object and return a key.
*
* @param metadata Metadata related to the input stream
- * @param is The input stream from which the value is read to the store. The value is read completely
+ * @param localFile Read data from a local file
*
* @return the URI of the data
*/
- URI put(StorageMetadata metadata, InputStream is);
+ URI put(StorageMetadata metadata, File localFile);
/**
* Delete an object from Storage.
diff --git a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
index 9ae84c9..b562344 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
@@ -23,14 +23,14 @@ import static java.util.Objects.*;
public class StorageMetadata {
- private final String projectId;
- private final String version;
- private final String extension;
+ private final int projectId;
+ private final int version;
+ private final String uploader;
- public StorageMetadata(String projectId, String version, String extension) {
- this.projectId = requireNonNull(projectId);
- this.version = requireNonNull(version);
- this.extension = requireNonNull(extension);
+ public StorageMetadata(int projectId, int version, String uploader) {
+ this.projectId = projectId;
+ this.version = version;
+ this.uploader = requireNonNull(uploader);
}
@Override
@@ -38,16 +38,16 @@ public class StorageMetadata {
return "StorageMetadata{" + "projectId='" + projectId + '\'' + ", version='" + version + '\'' + '}';
}
- public String getProjectId() {
+ public int getProjectId() {
return projectId;
}
- public String getVersion() {
+ public int getVersion() {
return version;
}
- public String getExtension() {
- return extension;
+ public String getUploader() {
+ return uploader;
}
@Override
@@ -61,11 +61,11 @@ public class StorageMetadata {
StorageMetadata that = (StorageMetadata) o;
return Objects.equals(projectId, that.projectId) &&
Objects.equals(version, that.version) &&
- Objects.equals(extension, that.extension);
+ Objects.equals(uploader, that.uploader);
}
@Override
public int hashCode() {
- return Objects.hash(projectId, version, extension);
+ return Objects.hash(projectId, version, uploader);
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 8a9e4fb..3b650e6 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -16,8 +16,10 @@
package azkaban.webapp;
+import azkaban.AzkabanCommonModule;
import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Guice;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -65,7 +67,6 @@ import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
import azkaban.metrics.MetricsUtility;
-import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
@@ -108,6 +109,10 @@ import azkaban.metrics.MetricsManager;
import com.linkedin.restli.server.RestliServlet;
+import static azkaban.ServiceProvider.*;
+import static java.util.Objects.*;
+
+
/**
* The Azkaban Jetty server class
*
@@ -134,7 +139,6 @@ public class AzkabanWebServer extends AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
- public static final String AZKABAN_HOME = "AZKABAN_HOME";
public static final String DEFAULT_CONF_PATH = "conf";
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
@@ -159,23 +163,22 @@ public class AzkabanWebServer extends AzkabanServer {
//queuedThreadPool is mainly used to monitor jetty threadpool.
private QueuedThreadPool queuedThreadPool;
- private UserManager userManager;
- private ProjectManager projectManager;
- // private ExecutorManagerAdapter executorManager;
- private ExecutorManager executorManager;
- private ScheduleManager scheduleManager;
- private TriggerManager triggerManager;
- private Map<String, Alerter> alerters;
+ private final UserManager userManager;
+ private final ProjectManager projectManager;
+ private final ExecutorManager executorManager;
+ private final ScheduleManager scheduleManager;
+ private final TriggerManager triggerManager;
+ private final Map<String, Alerter> alerters;
private final ClassLoader baseClassLoader;
- private Props props;
- private SessionCache sessionCache;
- private File tempDir;
- private Map<String, TriggerPlugin> triggerPlugins;
+ private final Props props;
+ private final SessionCache sessionCache;
+ private final File tempDir;
+ private final List<ObjectName> registeredMBeans = new ArrayList<>();
+ private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
- private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
public static AzkabanWebServer getInstance() {
return app;
@@ -193,18 +196,22 @@ public class AzkabanWebServer extends AzkabanServer {
* Constructor
*/
public AzkabanWebServer(Server server, Props props) throws Exception {
- this.props = props;
+ this.props = requireNonNull(props);
this.server = server;
- velocityEngine =
- configureVelocityEngine(props
- .getBoolean(VELOCITY_DEV_MODE_PARAM, false));
+
+ /* Initialize Guice Injector */
+ // TODO move this to a common static context.
+ SERVICE_PROVIDER.setInjector(Guice.createInjector(new AzkabanCommonModule(props)));
+
+ velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
sessionCache = new SessionCache(props);
userManager = loadUserManager(props);
alerters = loadAlerters(props);
executorManager = loadExecutorManager(props);
- projectManager = loadProjectManager(props);
+ // TODO remove hack. Move injection to constructor
+ projectManager = SERVICE_PROVIDER.getInstance(ProjectManager.class);
triggerManager = loadTriggerManager(props);
loadBuiltinCheckersAndActions();
@@ -267,17 +274,14 @@ public class AzkabanWebServer extends AzkabanServer {
private UserManager loadUserManager(Props props) {
Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
- logger.info("Loading user manager class " + userManagerClass.getName());
- UserManager manager = null;
- if (userManagerClass != null
- && userManagerClass.getConstructors().length > 0) {
+ UserManager manager;
+ if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
+ logger.info("Loading user manager class " + userManagerClass.getName());
try {
- Constructor<?> userManagerConstructor =
- userManagerClass.getConstructor(Props.class);
+ Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
manager = (UserManager) userManagerConstructor.newInstance(props);
} catch (Exception e) {
- logger.error("Could not instantiate UserManager "
- + userManagerClass.getName());
+ logger.error("Could not instantiate UserManager " + userManagerClass.getName());
throw new RuntimeException(e);
}
} else {
@@ -286,13 +290,6 @@ public class AzkabanWebServer extends AzkabanServer {
return manager;
}
- private ProjectManager loadProjectManager(Props props) {
- logger.info("Loading JDBC for project management");
- JdbcProjectLoader loader = new JdbcProjectLoader(props);
- ProjectManager manager = new ProjectManager(loader, props);
- return manager;
- }
-
private ExecutorManager loadExecutorManager(Props props) throws Exception {
JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
ExecutorManager execManager = new ExecutorManager(props, loader, alerters);