azkaban-developers
Changes
azkaban-common/build.gradle 2(+1 -1)
Details
azkaban-common/build.gradle 2(+1 -1)
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 8199c70..0063e49 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -67,7 +67,7 @@ dependencies {
testRuntime deps.h2
testCompile(project(':test').sourceSets.test.output)
-
+ testCompile project(':azkaban-db').sourceSets.test.output
}
tasks.withType(JavaCompile) {
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index abdbfb2..f528556 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -29,7 +29,7 @@ import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
-import azkaban.project.JdbcProjectLoader;
+import azkaban.project.JdbcProjectImpl;
import azkaban.project.ProjectLoader;
import azkaban.spi.AzkabanException;
import azkaban.spi.Storage;
@@ -77,12 +77,12 @@ public class AzkabanCommonModule extends AbstractModule {
@Override
protected void configure() {
bind(ExecutorLoader.class).to(JdbcExecutorLoader.class).in(Scopes.SINGLETON);
- bind(ProjectLoader.class).to(JdbcProjectLoader.class).in(Scopes.SINGLETON);
bind(Props.class).toInstance(this.config.getProps());
bind(Storage.class).to(resolveStorageClassType()).in(Scopes.SINGLETON);
bind(HdfsAuth.class).in(Scopes.SINGLETON);
bind(DatabaseOperator.class).to(DatabaseOperatorImpl.class).in(Scopes.SINGLETON);
bind(TriggerLoader.class).to(JdbcTriggerImpl.class).in(Scopes.SINGLETON);
+ bind(ProjectLoader.class).to(JdbcProjectImpl.class).in(Scopes.SINGLETON);
bind(DataSource.class).to(AzkabanDataSource.class);
bind(ExecutorManager.class).in(Scopes.SINGLETON);
bind(AlerterHolder.class).in(Scopes.SINGLETON);
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
new file mode 100644
index 0000000..16b9bf0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.project;
+
+import azkaban.database.EncodingType;
+import azkaban.flow.Flow;
+import azkaban.user.Permission;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Triple;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+
+/**
+ * This is a JDBC Handler collection place for all project handler classes.
+ */
+class JdbcProjectHandlerSet {
+
+ public static class ProjectResultHandler implements ResultSetHandler<List<Project>> {
+ public static String SELECT_PROJECT_BY_NAME =
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
+
+ public static String SELECT_PROJECT_BY_ID =
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
+
+ public static String SELECT_ALL_ACTIVE_PROJECTS =
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE active=true";
+
+ public static String SELECT_ACTIVE_PROJECT_BY_NAME =
+ "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true";
+
+ @Override
+ public List<Project> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final ArrayList<Project> projects = new ArrayList<>();
+ do {
+ final int id = rs.getInt(1);
+ final String name = rs.getString(2);
+ final boolean active = rs.getBoolean(3);
+ final long modifiedTime = rs.getLong(4);
+ final long createTime = rs.getLong(5);
+ final int version = rs.getInt(6);
+ final String lastModifiedBy = rs.getString(7);
+ final String description = rs.getString(8);
+ final int encodingType = rs.getInt(9);
+ final byte[] data = rs.getBytes(10);
+
+ final Project project;
+ if (data != null) {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object blobObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ blobObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ blobObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ project = Project.projectFromObject(blobObj);
+ } catch (final IOException e) {
+ throw new SQLException("Failed to get project.", e);
+ }
+ } else {
+ project = new Project(id, name);
+ }
+
+ // update the fields as they may have changed
+
+ project.setActive(active);
+ project.setLastModifiedTimestamp(modifiedTime);
+ project.setCreateTimestamp(createTime);
+ project.setVersion(version);
+ project.setLastModifiedUser(lastModifiedBy);
+ project.setDescription(description);
+
+ projects.add(project);
+ } while (rs.next());
+
+ return projects;
+ }
+ }
+
+ public static class ProjectPermissionsResultHandler implements ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
+ public static String SELECT_PROJECT_PERMISSION =
+ "SELECT project_id, modified_time, name, permissions, isGroup FROM project_permissions WHERE project_id=?";
+
+ @Override
+ public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<Triple<String, Boolean, Permission>> permissions = new ArrayList<>();
+ do {
+ final String username = rs.getString(3);
+ final int permissionFlag = rs.getInt(4);
+ final boolean val = rs.getBoolean(5);
+
+ final Permission perm = new Permission(permissionFlag);
+ permissions.add(new Triple<>(username, val, perm));
+ } while (rs.next());
+
+ return permissions;
+ }
+ }
+
+ public static class ProjectFlowsResultHandler implements ResultSetHandler<List<Flow>> {
+ public static String SELECT_PROJECT_FLOW =
+ "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=? AND flow_id=?";
+
+ public static String SELECT_ALL_PROJECT_FLOWS =
+ "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?";
+
+ @Override
+ public List<Flow> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final ArrayList<Flow> flows = new ArrayList<>();
+ do {
+ final String flowId = rs.getString(3);
+ final int encodingType = rs.getInt(5);
+ final byte[] dataBytes = rs.getBytes(6);
+
+ if (dataBytes == null) {
+ continue;
+ }
+
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+
+ Object flowObj = null;
+ try {
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ final String jsonString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(dataBytes, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ final Flow flow = Flow.flowFromObject(flowObj);
+ flows.add(flow);
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + flowId, e);
+ }
+ } while (rs.next());
+
+ return flows;
+ }
+ }
+
+ public static class ProjectPropertiesResultsHandler implements ResultSetHandler<List<Pair<String, Props>>> {
+ public static String SELECT_PROJECT_PROPERTY =
+ "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=? AND name=?";
+
+ public static String SELECT_PROJECT_PROPERTIES =
+ "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=?";
+
+ @Override
+ public List<Pair<String, Props>> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<Pair<String, Props>> properties = new ArrayList<>();
+ do {
+ final String name = rs.getString(3);
+ final int eventType = rs.getInt(5);
+ final byte[] dataBytes = rs.getBytes(6);
+
+ final EncodingType encType = EncodingType.fromInteger(eventType);
+ String propertyString = null;
+
+ try {
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ propertyString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
+ } else {
+ propertyString = new String(dataBytes, "UTF-8");
+ }
+
+ final Props props = PropsUtils.fromJSONString(propertyString);
+ props.setSource(name);
+ properties.add(new Pair<>(name, props));
+ } catch (final IOException e) {
+ throw new SQLException(e);
+ }
+ } while (rs.next());
+
+ return properties;
+ }
+ }
+
+ public static class ProjectLogsResultHandler implements ResultSetHandler<List<ProjectLogEvent>> {
+ public static String SELECT_PROJECT_EVENTS_ORDER =
+ "SELECT project_id, event_type, event_time, username, message FROM project_events WHERE project_id=? ORDER BY event_time DESC LIMIT ? OFFSET ?";
+
+ @Override
+ public List<ProjectLogEvent> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final ArrayList<ProjectLogEvent> events = new ArrayList<>();
+ do {
+ final int projectId = rs.getInt(1);
+ final int eventType = rs.getInt(2);
+ final long eventTime = rs.getLong(3);
+ final String username = rs.getString(4);
+ final String message = rs.getString(5);
+
+ final ProjectLogEvent event =
+ new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType), eventTime, username,
+ message);
+ events.add(event);
+ } while (rs.next());
+
+ return events;
+ }
+ }
+
+ public static class ProjectFileChunkResultHandler implements ResultSetHandler<List<byte[]>> {
+ public static String SELECT_PROJECT_CHUNKS_FILE =
+ "SELECT project_id, version, chunk, size, file FROM project_files WHERE project_id=? AND version=? AND chunk >= ? AND chunk < ? ORDER BY chunk ASC";
+
+ @Override
+ public List<byte[]> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final ArrayList<byte[]> data = new ArrayList<>();
+ do {
+ final byte[] bytes = rs.getBytes(5);
+
+ data.add(bytes);
+ } while (rs.next());
+
+ return data;
+ }
+ }
+
+ public static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
+ public static String SELECT_PROJECT_VERSION =
+ "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
+ + "FROM project_versions WHERE project_id=? AND version=?";
+
+ @Override
+ public List<ProjectFileHandler> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return null;
+ }
+
+ final List<ProjectFileHandler> handlers = new ArrayList<>();
+ do {
+ final int projectId = rs.getInt(1);
+ final int version = rs.getInt(2);
+ final long uploadTime = rs.getLong(3);
+ final String uploader = rs.getString(4);
+ final String fileType = rs.getString(5);
+ final String fileName = rs.getString(6);
+ final byte[] md5 = rs.getBytes(7);
+ final int numChunks = rs.getInt(8);
+ final String resourceId = rs.getString(9);
+
+ final ProjectFileHandler handler =
+ new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5,
+ resourceId);
+
+ handlers.add(handler);
+ } while (rs.next());
+
+ return handlers;
+ }
+ }
+
+ public static class IntHandler implements ResultSetHandler<Integer> {
+ public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
+
+ @Override
+ public Integer handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return 0;
+ }
+
+ return rs.getInt(1);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
new file mode 100644
index 0000000..b212a92
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -0,0 +1,903 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.project;
+
+import static azkaban.project.JdbcProjectHandlerSet.IntHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectFileChunkResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectFlowsResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectLogsResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectPermissionsResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectPropertiesResultsHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectVersionResultHandler;
+
+import azkaban.database.EncodingType;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseTransOperator;
+import azkaban.db.SQLTransaction;
+import azkaban.flow.Flow;
+import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.user.Permission;
+import azkaban.user.User;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Md5Hasher;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Triple;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class implements ProjectLoader using new azkaban-db code to allow DB failover.
+ * TODO kunkun-tang: This class is too long. In future, we should split {@link ProjectLoader} interface
+ * and have multiple short class implementations.
+ */
+public class JdbcProjectImpl implements ProjectLoader {
+ private static final Logger logger = Logger.getLogger(JdbcProjectImpl.class);
+
+ private static final int CHUCK_SIZE = 1024 * 1024 * 10;
+ private final DatabaseOperator dbOperator;
+ private final File tempDir;
+ private final EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ @Inject
+ public JdbcProjectImpl(final Props props, final DatabaseOperator databaseOperator) {
+
+ this.dbOperator = databaseOperator;
+ this.tempDir = new File(props.getString("project.temp.dir", "temp"));
+ if (!this.tempDir.exists()) {
+ if (this.tempDir.mkdirs()) {
+ logger.info("project temporary folder is being constructed.");
+ } else {
+ logger.info("project temporary folder already existed.");
+ }
+ }
+ }
+
+ @Override
+ public List<Project> fetchAllActiveProjects() throws ProjectManagerException {
+
+ final ProjectResultHandler handler = new ProjectResultHandler();
+ List<Project> projects = null;
+
+ try {
+ projects = this.dbOperator.query(ProjectResultHandler.SELECT_ALL_ACTIVE_PROJECTS, handler);
+ projects.forEach(project -> {
+ for (final Triple<String, Boolean, Permission> perm : fetchPermissionsForProject(project)) {
+ setProjectPermission(project, perm);
+ }
+ });
+ } catch (final SQLException ex) {
+ logger.error(ProjectResultHandler.SELECT_PROJECT_BY_ID + " failed.", ex);
+ throw new ProjectManagerException("Error retrieving all projects", ex);
+ }
+ return projects;
+ }
+
+ private void setProjectPermission(final Project project, final Triple<String, Boolean, Permission> perm) {
+ if (perm.getSecond()) {
+ project.setGroupPermission(perm.getFirst(), perm.getThird());
+ } else {
+ project.setUserPermission(perm.getFirst(), perm.getThird());
+ }
+ }
+
+ @Override
+ public Project fetchProjectById(final int id) throws ProjectManagerException {
+
+ Project project = null;
+ final ProjectResultHandler handler = new ProjectResultHandler();
+
+ try {
+ final List<Project> projects = this.dbOperator
+ .query(ProjectResultHandler.SELECT_PROJECT_BY_ID, handler, id);
+ if (projects.isEmpty()) {
+ throw new ProjectManagerException("No project with id " + id + " exists in db.");
+ }
+ project = projects.get(0);
+
+ // Fetch the user permissions
+ for (final Triple<String, Boolean, Permission> perm : fetchPermissionsForProject(project)) {
+ // TODO kunkun-tang: understand why we need to check permission not equal to 0 here.
+ if (perm.getThird().toFlags() != 0) {
+ setProjectPermission(project, perm);
+ }
+ }
+ } catch (final SQLException ex) {
+ logger.error(ProjectResultHandler.SELECT_PROJECT_BY_ID + " failed.", ex);
+ throw new ProjectManagerException("Query for existing project failed. Project " + id, ex);
+ }
+
+ return project;
+ }
+
+ @Override
+ public Project fetchProjectByName(final String name) throws ProjectManagerException {
+ Project project = null;
+ final ProjectResultHandler handler = new ProjectResultHandler();
+
+ // select active project from db first, if not exist, select inactive one.
+ // At most one active project with the same name exists in db.
+ try {
+ List<Project> projects = this.dbOperator
+ .query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
+ if (projects.isEmpty()) {
+ projects = this.dbOperator.query(ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
+ if (projects.isEmpty()) {
+ throw new ProjectManagerException("No project with name " + name + " exists in db.");
+ }
+ }
+ project = projects.get(0);
+ for (final Triple<String, Boolean, Permission> perm : fetchPermissionsForProject(project)) {
+ if (perm.getThird().toFlags() != 0) {
+ setProjectPermission(project, perm);
+ }
+ }
+ } catch (final SQLException ex) {
+ logger.error(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
+ throw new ProjectManagerException(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
+ }
+ return project;
+ }
+
+ private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(final Project project)
+ throws ProjectManagerException {
+ final ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
+
+ List<Triple<String, Boolean, Permission>> permissions = null;
+ try {
+ permissions =
+ this.dbOperator
+ .query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, project.getId());
+ } catch (final SQLException ex) {
+ logger.error(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION + " failed.", ex);
+ throw new ProjectManagerException("Query for permissions for " + project.getName() + " failed.", ex);
+ }
+ return permissions;
+ }
+
+ /**
+ * Creates a Project in the db.
+ *
+ * It will throw an exception if it finds an active project of the same name,
+ * or the SQL fails
+ */
+ @Override
+ public synchronized Project createNewProject(final String name, final String description, final User creator)
+ throws ProjectManagerException {
+ final ProjectResultHandler handler = new ProjectResultHandler();
+
+ // Check if the same project name exists.
+ try {
+ final List<Project> projects = this.dbOperator
+ .query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
+ if (!projects.isEmpty()) {
+ throw new ProjectManagerException("Active project with name " + name + " already exists in db.");
+ }
+ } catch (final SQLException ex) {
+ logger.error(ex);
+ throw new ProjectManagerException("Checking for existing project failed. " + name, ex);
+ }
+
+ final String INSERT_PROJECT =
+ "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
+ final SQLTransaction<Integer> insertProject = transOperator -> {
+ final long time = System.currentTimeMillis();
+ return transOperator.update(INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description,
+ this.defaultEncodingType.getNumVal(), null);
+ };
+
+ // Insert project
+ try {
+ final int numRowsInserted = this.dbOperator.transaction(insertProject);
+ if (numRowsInserted == 0) {
+ throw new ProjectManagerException("No projects have been inserted.");
+ }
+ } catch (final SQLException ex) {
+ logger.error(INSERT_PROJECT + " failed.", ex);
+ throw new ProjectManagerException("Insert project" + name + " for existing project failed. ", ex);
+ }
+ return fetchProjectByName(name);
+ }
+
+ @Override
+ public void uploadProjectFile(final int projectId, final int version, final File localFile, final String uploader)
+ throws ProjectManagerException {
+ final long startMs = System.currentTimeMillis();
+ logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]", projectId, localFile.getName(),
+ localFile.length()));
+
+ /*
+ * The below transaction uses one connection to do all operations. Ideally, we should commit
+ * after the transaction completes. However, uploadFile needs to commit every time when we
+ * upload any single chunk.
+ *
+ * Todo kunkun-tang: fix the transaction issue.
+ */
+ final SQLTransaction<Integer> uploadProjectFileTransaction = transOperator -> {
+
+ /* Step 1: Update DB with new project info */
+ addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, computeHash(localFile), null);
+ transOperator.getConnection().commit();
+
+ /* Step 2: Upload File in chunks to DB */
+ final int chunks = uploadFileInChunks(transOperator, projectId, version, localFile);
+
+ /* Step 3: Update number of chunks in DB */
+ updateChunksInProjectVersions(transOperator, projectId, version, chunks);
+ return 1;
+ };
+
+ try {
+ this.dbOperator.transaction(uploadProjectFileTransaction);
+ } catch (final SQLException e) {
+ logger.error("upload project files failed.", e);
+ throw new ProjectManagerException("upload project files failed.", e);
+ }
+
+ final long duration = (System.currentTimeMillis() - startMs) / 1000;
+ logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec", projectId, localFile.getName(),
+ localFile.length(), duration));
+ }
+
+
+ private byte[] computeHash(final File localFile) {
+ logger.info("Creating message digest for upload " + localFile.getName());
+ final byte[] md5;
+ try {
+ md5 = Md5Hasher.md5Hash(localFile);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error getting md5 hash.", e);
+ }
+
+ logger.info("Md5 hash created");
+ return md5;
+ }
+
+ @Override
+ public void addProjectVersion(
+ final int projectId,
+ final int version,
+ final File localFile,
+ final String uploader,
+ final byte[] md5,
+ final String resourceId) throws ProjectManagerException {
+
+ // when one transaction completes, it automatically commits.
+ final SQLTransaction<Integer> transaction = transOperator -> {
+ addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, md5, resourceId);
+ return 1;
+ };
+ try {
+ this.dbOperator.transaction(transaction);
+ } catch (final SQLException e) {
+ logger.error("addProjectVersion failed.", e);
+ throw new ProjectManagerException("addProjectVersion failed.", e);
+ }
+ }
+
+ /**
+ * Insert a new version record to TABLE project_versions before uploading files.
+ *
+ * The reason for this operation:
+ * When error chunking happens in remote mysql server, incomplete file data remains
+ * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
+ * the SQL exception prevents AZ from creating the new version record in Table project_versions.
+ * However, the Table project_files still reserve the incomplete files, which causes troubles
+ * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
+ * inserting new files to db.
+ *
+ * Why this operation is safe:
+ * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
+ * proj_v+1 will be used as the new version for the uploading files.
+ *
+ * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
+ * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
+ * When file uploading completes, AZ will clean all old chunks in DB afterward.
+ */
+ private void addProjectToProjectVersions(
+ final DatabaseTransOperator transOperator,
+ final int projectId,
+ final int version,
+ final File localFile,
+ final String uploader,
+ final byte[] md5,
+ final String resourceId) throws ProjectManagerException {
+ final long updateTime = System.currentTimeMillis();
+ final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
+ + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id) values "
+ + "(?,?,?,?,?,?,?,?,?)";
+
+ try {
+ /*
+ * As we don't know the num_chunks before uploading the file, we initialize it to 0,
+ * and will update it after uploading completes.
+ */
+ transOperator.update(INSERT_PROJECT_VERSION, projectId, version, updateTime, uploader,
+ Files.getFileExtension(localFile.getName()), localFile.getName(), md5, 0, resourceId);
+ } catch (final SQLException e) {
+ final String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
+ logger.error(msg, e);
+ throw new ProjectManagerException(msg, e);
+ }
+ }
+
+ private int uploadFileInChunks(final DatabaseTransOperator transOperator, final int projectId, final int version, final File localFile)
+ throws ProjectManagerException {
+
+ // Really... I doubt we'll get a > 2gig file. So int casting it is!
+ final byte[] buffer = new byte[CHUCK_SIZE];
+ final String INSERT_PROJECT_FILES =
+ "INSERT INTO project_files (project_id, version, chunk, size, file) values (?,?,?,?,?)";
+
+ BufferedInputStream bufferedStream = null;
+ int chunk = 0;
+ try {
+ bufferedStream = new BufferedInputStream(new FileInputStream(localFile));
+ int size = bufferedStream.read(buffer);
+ while (size >= 0) {
+ logger.info("Read bytes for " + localFile.getName() + " size:" + size);
+ byte[] buf = buffer;
+ if (size < buffer.length) {
+ buf = Arrays.copyOfRange(buffer, 0, size);
+ }
+ try {
+ logger.info("Running update for " + localFile.getName() + " chunk " + chunk);
+ transOperator.update(INSERT_PROJECT_FILES, projectId, version, chunk, size, buf);
+
+ /*
+ * We enforce az committing to db when uploading every single chunk,
+ * in order to reduce the transaction duration and conserve sql server resources.
+ *
+ * If the files to be uploaded is very large and we don't commit every single chunk,
+ * the remote mysql server will run into memory troubles.
+ */
+ transOperator.getConnection().commit();
+ logger.info("Finished update for " + localFile.getName() + " chunk " + chunk);
+ } catch (final SQLException e) {
+ throw new ProjectManagerException("Error Chunking during uploading files to db...");
+ }
+ ++chunk;
+ size = bufferedStream.read(buffer);
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException(
+ String.format("Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d", projectId,
+ version, localFile.getName(), localFile.length(), chunk));
+ } finally {
+ IOUtils.closeQuietly(bufferedStream);
+ }
+ return chunk;
+ }
+
+ /**
+ * we update num_chunks's actual number to db here.
+ */
+ private void updateChunksInProjectVersions(final DatabaseTransOperator transOperator, final int projectId, final int version, final int chunk)
+ throws ProjectManagerException {
+
+ final String UPDATE_PROJECT_NUM_CHUNKS =
+ "UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
+ try {
+ transOperator.update(UPDATE_PROJECT_NUM_CHUNKS, chunk, projectId, version);
+ transOperator.getConnection().commit();
+ } catch (final SQLException e) {
+ logger.error("Error updating project " + projectId + " : chunk_num " + chunk, e);
+ throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
+ }
+ }
+
+ @Override
+ public ProjectFileHandler fetchProjectMetaData(final int projectId, final int version) {
+ final ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
+ try {
+ final List<ProjectFileHandler> projectFiles =
+ this.dbOperator
+ .query(ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
+ if (projectFiles == null || projectFiles.isEmpty()) {
+ return null;
+ }
+ return projectFiles.get(0);
+ } catch (final SQLException ex) {
+ logger.error("Query for uploaded file for project id " + projectId + " failed.", ex);
+ throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", ex);
+ }
+ }
+
+ @Override
+ public ProjectFileHandler getUploadedFile(final int projectId, final int version) throws ProjectManagerException {
+ final ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
+ if (projHandler == null) {
+ return null;
+ }
+ final int numChunks = projHandler.getNumChunks();
+ BufferedOutputStream bStream = null;
+ File file;
+ try {
+ try {
+ file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), this.tempDir);
+ bStream = new BufferedOutputStream(new FileOutputStream(file));
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error creating temp file for stream.");
+ }
+
+ final int collect = 5;
+ int fromChunk = 0;
+ int toChunk = collect;
+ do {
+ final ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
+ List<byte[]> data = null;
+ try {
+ data = this.dbOperator
+ .query(ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId,
+ version, fromChunk, toChunk);
+ } catch (final SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+ }
+
+ try {
+ for (final byte[] d : data) {
+ bStream.write(d);
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error writing file", e);
+ }
+
+ // Add all the bytes to the stream.
+ fromChunk += collect;
+ toChunk += collect;
+ } while (fromChunk <= numChunks);
+ } finally {
+ IOUtils.closeQuietly(bStream);
+ }
+
+ // Check md5.
+ byte[] md5 = null;
+ try {
+ md5 = Md5Hasher.md5Hash(file);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error getting md5 hash.", e);
+ }
+
+ if (Arrays.equals(projHandler.getMd5Hash(), md5)) {
+ logger.info("Md5 Hash is valid");
+ } else {
+ throw new ProjectManagerException("Md5 Hash failed on retrieval of file");
+ }
+
+ projHandler.setLocalFile(file);
+ return projHandler;
+ }
+
+ @Override
+ public void changeProjectVersion(final Project project, final int version, final String user) throws ProjectManagerException {
+ final long timestamp = System.currentTimeMillis();
+ try {
+ final String UPDATE_PROJECT_VERSION =
+ "UPDATE projects SET version=?,modified_time=?,last_modified_by=? WHERE id=?";
+
+ this.dbOperator.update(UPDATE_PROJECT_VERSION, version, timestamp, user, project.getId());
+ project.setVersion(version);
+ project.setLastModifiedTimestamp(timestamp);
+ project.setLastModifiedUser(user);
+ } catch (final SQLException e) {
+ logger.error("Error updating switching project version " + project.getName(), e);
+ throw new ProjectManagerException("Error updating switching project version " + project.getName(), e);
+ }
+ }
+
+ @Override
+ public void updatePermission(final Project project, final String name, final Permission perm, final boolean isGroup)
+ throws ProjectManagerException {
+
+ final long updateTime = System.currentTimeMillis();
+ try {
+ if (this.dbOperator.getDataSource().allowsOnDuplicateKey()) {
+ final String INSERT_PROJECT_PERMISSION =
+ "INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)"
+ + "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
+ this.dbOperator.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+ } else {
+ final String MERGE_PROJECT_PERMISSION =
+ "MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
+ this.dbOperator.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+ }
+ } catch (final SQLException ex) {
+ logger.error("Error updating project permission", ex);
+ throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, ex);
+ }
+
+ if (isGroup) {
+ project.setGroupPermission(name, perm);
+ } else {
+ project.setUserPermission(name, perm);
+ }
+ }
+
+ @Override
+ public void updateProjectSettings(final Project project) throws ProjectManagerException {
+ updateProjectSettings(project, this.defaultEncodingType);
+ }
+
+ private byte[] convertJsonToBytes(final EncodingType type, final String json) throws IOException {
+ byte[] data = json.getBytes("UTF-8");
+ if (type == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(data);
+ }
+ return data;
+ }
+
+ private void updateProjectSettings(final Project project, final EncodingType encType) throws ProjectManagerException {
+ final String UPDATE_PROJECT_SETTINGS = "UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
+
+ final String json = JSONUtils.toJSON(project.toObject());
+ byte[] data = null;
+ try {
+ data = convertJsonToBytes(encType, json);
+ logger.debug("NumChars: " + json.length() + " Gzip:" + data.length);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Failed to encode. ", e);
+ }
+
+ try {
+ this.dbOperator.update(UPDATE_PROJECT_SETTINGS, encType.getNumVal(), data, project.getId());
+ } catch (final SQLException e) {
+ logger.error("update Project Settings failed.", e);
+ throw new ProjectManagerException(
+ "Error updating project " + project.getName() + " version " + project.getVersion(), e);
+ }
+ }
+
+ @Override
+ public void removePermission(final Project project, final String name, final boolean isGroup) throws ProjectManagerException {
+ final String DELETE_PROJECT_PERMISSION =
+ "DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
+ try {
+ this.dbOperator.update(DELETE_PROJECT_PERMISSION, project.getId(), name, isGroup);
+ } catch (final SQLException e) {
+ logger.error("remove Permission failed.", e);
+ throw new ProjectManagerException("Error deleting project " + project.getName() + " permissions for " + name, e);
+ }
+
+ if (isGroup) {
+ project.removeGroupPermission(name);
+ } else {
+ project.removeUserPermission(name);
+ }
+ }
+
+ @Override
+ public List<Triple<String, Boolean, Permission>> getProjectPermissions(final Project project) throws ProjectManagerException {
+ return fetchPermissionsForProject(project);
+ }
+
+ /**
+ * Todo kunkun-tang: the below implementation doesn't remove a project, but inactivate a project.
+ * We should rewrite the code to follow the literal meanings.
+ */
+ @Override
+ public void removeProject(final Project project, final String user) throws ProjectManagerException {
+
+ final long updateTime = System.currentTimeMillis();
+ final String UPDATE_INACTIVE_PROJECT =
+ "UPDATE projects SET active=false,modified_time=?,last_modified_by=? WHERE id=?";
+ try {
+ this.dbOperator.update(UPDATE_INACTIVE_PROJECT, updateTime, user, project.getId());
+ } catch (final SQLException e) {
+ logger.error("error remove project " + project.getName(), e);
+ throw new ProjectManagerException("Error remove project " + project.getName(), e);
+ }
+ }
+
+ @Override
+ public boolean postEvent(final Project project, final EventType type, final String user, final String message) {
+ final String INSERT_PROJECT_EVENTS =
+ "INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+ final long updateTime = System.currentTimeMillis();
+ try {
+ this.dbOperator
+ .update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(), updateTime, user, message);
+ } catch (final SQLException e) {
+ logger.error("post event failed,", e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public List<ProjectLogEvent> getProjectEvents(final Project project, final int num, final int skip) throws ProjectManagerException {
+ final ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
+ List<ProjectLogEvent> events = null;
+ try {
+ events = this.dbOperator
+ .query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER, logHandler, project.getId(), num,
+ skip);
+ } catch (final SQLException e) {
+ logger.error("Error getProjectEvents, project " + project.getName(), e);
+ throw new ProjectManagerException("Error getProjectEvents, project " + project.getName(), e);
+ }
+
+ return events;
+ }
+
+ @Override
+ public void updateDescription(final Project project, final String description, final String user) throws ProjectManagerException {
+ final String UPDATE_PROJECT_DESCRIPTION =
+ "UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
+ final long updateTime = System.currentTimeMillis();
+ try {
+ this.dbOperator.update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user, project.getId());
+ project.setDescription(description);
+ project.setLastModifiedTimestamp(updateTime);
+ project.setLastModifiedUser(user);
+ } catch (final SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Error update Description, project " + project.getName(), e);
+ }
+ }
+
+ @Override
+ public int getLatestProjectVersion(final Project project) throws ProjectManagerException {
+ final IntHandler handler = new IntHandler();
+ try {
+ return this.dbOperator.query(IntHandler.SELECT_LATEST_VERSION, handler, project.getId());
+ } catch (final SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Error marking project " + project.getName() + " as inactive", e);
+ }
+ }
+
+ @Override
+ public void uploadFlows(final Project project, final int version, final Collection<Flow> flows) throws ProjectManagerException {
+ // We do one at a time instead of batch... because well, the batch could be
+ // large.
+ logger.info("Uploading flows");
+ try {
+ for (final Flow flow : flows) {
+ uploadFlow(project, version, flow, this.defaultEncodingType);
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Flow Upload failed.", e);
+ }
+ }
+
+ @Override
+ public void uploadFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+ logger.info("Uploading flow " + flow.getId());
+ try {
+ uploadFlow(project, version, flow, this.defaultEncodingType);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Flow Upload failed.", e);
+ }
+ }
+
+ @Override
+ public void updateFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+ logger.info("Uploading flow " + flow.getId());
+ try {
+ final String json = JSONUtils.toJSON(flow.toObject());
+ final byte[] data = convertJsonToBytes(this.defaultEncodingType, json);
+ logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+ final String UPDATE_FLOW =
+ "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
+ try {
+ this.dbOperator
+ .update(UPDATE_FLOW, this.defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+ } catch (final SQLException e) {
+ logger.error("Error inserting flow", e);
+ throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Flow Upload failed.", e);
+ }
+ }
+
+ private void uploadFlow(final Project project, final int version, final Flow flow, final EncodingType encType)
+ throws ProjectManagerException, IOException {
+ final String json = JSONUtils.toJSON(flow.toObject());
+ final byte[] data = convertJsonToBytes(encType, json);
+
+ logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+ final String INSERT_FLOW =
+ "INSERT INTO project_flows (project_id, version, flow_id, modified_time, encoding_type, json) values (?,?,?,?,?,?)";
+ try {
+ this.dbOperator.update(INSERT_FLOW, project.getId(), version, flow.getId(), System.currentTimeMillis(),
+ encType.getNumVal(), data);
+ } catch (final SQLException e) {
+ logger.error("Error inserting flow", e);
+ throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+ }
+ }
+
+ @Override
+ public Flow fetchFlow(final Project project, final String flowId) throws ProjectManagerException {
+ throw new UnsupportedOperationException("this method has not been instantiated.");
+ }
+
+ @Override
+ public List<Flow> fetchAllProjectFlows(final Project project) throws ProjectManagerException {
+ final ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
+ List<Flow> flows = null;
+ try {
+ flows = this.dbOperator
+ .query(ProjectFlowsResultHandler.SELECT_ALL_PROJECT_FLOWS, handler, project.getId(),
+ project.getVersion());
+ } catch (final SQLException e) {
+ throw new ProjectManagerException(
+ "Error fetching flows from project " + project.getName() + " version " + project.getVersion(), e);
+ }
+ return flows;
+ }
+
+ @Override
+ public void uploadProjectProperties(final Project project, final List<Props> properties) throws ProjectManagerException {
+ for (final Props props : properties) {
+ try {
+ uploadProjectProperty(project, props.getSource(), props);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error uploading project property file", e);
+ }
+ }
+ }
+
+ @Override
+ public void uploadProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+ try {
+ uploadProjectProperty(project, props.getSource(), props);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error uploading project property file", e);
+ }
+ }
+
+ @Override
+ public void updateProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+ try {
+ updateProjectProperty(project, props.getSource(), props);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error uploading project property file", e);
+ }
+ }
+
+ private void updateProjectProperty(final Project project, final String name, final Props props)
+ throws ProjectManagerException, IOException {
+ final String UPDATE_PROPERTIES =
+ "UPDATE project_properties SET property=? WHERE project_id=? AND version=? AND name=?";
+
+ final byte[] propsData = getBytes(props);
+ try {
+ this.dbOperator
+ .update(UPDATE_PROPERTIES, propsData, project.getId(), project.getVersion(), name);
+ } catch (final SQLException e) {
+ throw new ProjectManagerException(
+ "Error updating property " + project.getName() + " version " + project.getVersion(), e);
+ }
+ }
+
+ private void uploadProjectProperty(final Project project, final String name, final Props props)
+ throws ProjectManagerException, IOException {
+ final String INSERT_PROPERTIES =
+ "INSERT INTO project_properties (project_id, version, name, modified_time, encoding_type, property) values (?,?,?,?,?,?)";
+
+ final byte[] propsData = getBytes(props);
+ try {
+ this.dbOperator.update(INSERT_PROPERTIES, project.getId(), project.getVersion(), name, System.currentTimeMillis(),
+ this.defaultEncodingType.getNumVal(), propsData);
+ } catch (final SQLException e) {
+ throw new ProjectManagerException(
+ "Error uploading project properties " + name + " into " + project.getName() + " version "
+ + project.getVersion(), e);
+ }
+ }
+
+ private byte[] getBytes(final Props props) throws IOException {
+ final String propertyJSON = PropsUtils.toJSONString(props, true);
+ byte[] data = propertyJSON.getBytes("UTF-8");
+ if (this.defaultEncodingType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(data);
+ }
+ return data;
+ }
+
+ @Override
+ public Props fetchProjectProperty(final int projectId, final int projectVer, final String propsName) throws ProjectManagerException {
+
+ final ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
+ try {
+ final List<Pair<String, Props>> properties =
+ this.dbOperator
+ .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId, projectVer,
+ propsName);
+
+ if (properties == null || properties.isEmpty()) {
+ logger.warn("Project " + projectId + " version " + projectVer + " property " + propsName + " is empty.");
+ return null;
+ }
+
+ return properties.get(0).getSecond();
+ } catch (final SQLException e) {
+ logger.error("Error fetching property " + propsName + " Project " + projectId + " version " + projectVer, e);
+ throw new ProjectManagerException("Error fetching property " + propsName, e);
+ }
+ }
+
+ @Override
+ public Props fetchProjectProperty(final Project project, final String propsName) throws ProjectManagerException {
+ return fetchProjectProperty(project.getId(), project.getVersion(), propsName);
+ }
+
+ @Override
+ public Map<String, Props> fetchProjectProperties(final int projectId, final int version) throws ProjectManagerException {
+
+ try {
+ final List<Pair<String, Props>> properties = this.dbOperator.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
+ new ProjectPropertiesResultsHandler(), projectId, version);
+ if (properties == null || properties.isEmpty()) {
+ return null;
+ }
+ final HashMap<String, Props> props = new HashMap<>();
+ for (final Pair<String, Props> pair : properties) {
+ props.put(pair.getFirst(), pair.getSecond());
+ }
+ return props;
+ } catch (final SQLException e) {
+ logger.error("Error fetching properties, project id" + projectId + " version " + version, e);
+ throw new ProjectManagerException("Error fetching properties", e);
+ }
+ }
+
+ @Override
+ public void cleanOlderProjectVersion(final int projectId, final int version) throws ProjectManagerException {
+ final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=? AND version<?";
+ final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
+ final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
+ final String UPDATE_PROJECT_VERSIONS = "UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
+
+ final SQLTransaction<Integer> cleanOlderProjectTransaction = transOperator -> {
+ transOperator.update(DELETE_FLOW, projectId, version);
+ transOperator.update(DELETE_PROPERTIES, projectId, version);
+ transOperator.update(DELETE_PROJECT_FILES, projectId, version);
+ return transOperator.update(UPDATE_PROJECT_VERSIONS, projectId, version);
+ };
+
+ try {
+ final int res = this.dbOperator.transaction(cleanOlderProjectTransaction);
+ if (res == 0) {
+ logger.info("clean older project given project id " + projectId + " doesn't take effect.");
+ }
+ } catch (final SQLException e) {
+ logger.error("clean older project transaction failed", e);
+ throw new ProjectManagerException("clean older project transaction failed", e);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index a54095a..90ca0e7 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -30,7 +30,6 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Triple;
import com.google.common.io.Files;
-import com.google.inject.Inject;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -64,7 +63,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
private EncodingType defaultEncodingType = EncodingType.GZIP;
- @Inject
+ // TODO kunkun-tang: This class is going to be removed soon, as we are migrating to JdbcProjectImpl.
public JdbcProjectLoader(final Props props, final CommonMetrics commonMetrics) {
super(props, commonMetrics);
this.tempDir = new File(props.getString("project.temp.dir", "temp"));
@@ -785,7 +784,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
@Override
public List<Triple<String, Boolean, Permission>> getProjectPermissions(
- final int projectId) throws ProjectManagerException {
+ final Project project) throws ProjectManagerException {
final ProjectPermissionsResultHandler permHander =
new ProjectPermissionsResultHandler();
final QueryRunner runner = createQueryRunner();
@@ -794,10 +793,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
permissions =
runner.query(
ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION,
- permHander, projectId);
+ permHander, project.getId());
} catch (final SQLException e) {
throw new ProjectManagerException("Query for permissions for "
- + projectId + " failed.", e);
+ + project.getId() + " failed.", e);
}
return permissions;
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 84d320e..9245e06 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -200,7 +200,7 @@ public interface ProjectLoader {
Props fetchProjectProperty(int projectId, int projectVer, String propsName)
throws ProjectManagerException;
- List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId)
+ List<Triple<String, Boolean, Permission>> getProjectPermissions(Project project)
throws ProjectManagerException;
void updateProjectSettings(Project project) throws ProjectManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java
index 65261b8..1d25215 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java
@@ -115,6 +115,7 @@ public class JdbcTriggerImpl implements TriggerLoader {
final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
transOperator.update(ADD_TRIGGER, DateTime.now().getMillis());
+ // This commit must be called in order to unlock trigger table and have last insert ID.
transOperator.getConnection().commit();
return transOperator.getLastInsertId();
};
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
new file mode 100644
index 0000000..031ddee
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -0,0 +1,367 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.project;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.db.AzDBTestUtility;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+import azkaban.flow.Flow;
+import azkaban.user.Permission;
+import azkaban.user.User;
+import azkaban.utils.Md5Hasher;
+import azkaban.utils.Props;
+import azkaban.utils.Triple;
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.QueryRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class JdbcProjectImplTest {
+
+ private static final String SAMPLE_FILE = "sample_flow_01.zip";
+ private static final Props props = new Props();
+ private static DatabaseOperator dbOperator;
+ private ProjectLoader loader;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final AzkabanDataSource dataSource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
+ dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
+
+ final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
+ props.put("database.sql.scripts.dir", sqlScriptsDir);
+
+ // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azakaban-db
+ final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
+ new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
+ final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
+ setup.loadTableInfo();
+ setup.updateDatabase(true, false);
+ }
+
+ @AfterClass
+ public static void destroyDB() {
+ try {
+ dbOperator.update("DROP ALL OBJECTS");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setup() {
+ this.loader = new JdbcProjectImpl(props, dbOperator);
+ }
+
+ private void createThreeProjects() {
+ final String projectName = "mytestProject";
+ final String projectDescription = "This is my new project";
+ final User user = new User("testUser1");
+ this.loader.createNewProject(projectName, projectDescription, user);
+ final String projectName2 = "mytestProject2";
+ final String projectDescription2 = "This is my new project2";
+ this.loader.createNewProject(projectName2, projectDescription2, user);
+ final String projectName3 = "mytestProject3";
+ final String projectDescription3 = "This is my new project3";
+ final User user2 = new User("testUser2");
+ this.loader.createNewProject(projectName3, projectDescription3, user2);
+ }
+
+ @Test
+ public void testCreateProject() throws Exception {
+ final String projectName = "mytestProject";
+ final String projectDescription = "This is my new project";
+ final User user = new User("testUser1");
+ final Project project = this.loader.createNewProject(projectName, projectDescription, user);
+ Assert.assertEquals(project.getName(), projectName);
+ Assert.assertEquals(project.getDescription(), projectDescription);
+ Assert.assertEquals(project.getLastModifiedUser(), "testUser1");
+ }
+
+ @Test
+ public void testFetchAllActiveProjects() throws Exception {
+ createThreeProjects();
+ final List<Project> projectList = this.loader.fetchAllActiveProjects();
+ Assert.assertEquals(projectList.size(), 3);
+ }
+
+ @Test
+ public void testFetchProjectByName() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ Assert.assertEquals(project.getName(), "mytestProject");
+ Assert.assertEquals(project.getDescription(), "This is my new project");
+ Assert.assertEquals(project.getLastModifiedUser(), "testUser1");
+ }
+
+ @Test
+ public void testFetchProjectById() throws Exception {
+ createThreeProjects();
+ final Project project1 = this.loader.fetchProjectByName("mytestProject");
+ final Project project2 = this.loader.fetchProjectById(project1.getId());
+ Assert.assertEquals(project1.getName(), project2.getName());
+ Assert.assertEquals(project1.getDescription(), project2.getDescription());
+ Assert.assertEquals(project1.getLastModifiedUser(), project2.getLastModifiedUser());
+ }
+
+ @Test
+ public void testUploadProjectFile() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+
+ final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
+ Assert.assertEquals(fileHandler.getFileName(), SAMPLE_FILE);
+ Assert.assertEquals(fileHandler.getUploader(), "uploadUser1");
+ }
+
+ @Test(expected = ProjectManagerException.class)
+ public void testDuplicateUploadProjectFile() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+ this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+ }
+
+ private byte[] computeHash(final File localFile) {
+ final byte[] md5;
+ try {
+ md5 = Md5Hasher.md5Hash(localFile);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error getting md5 hash.", e);
+ }
+ return md5;
+ }
+
+ @Test
+ public void testAddProjectVersion() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.addProjectVersion(project.getId(), newVersion, testFile, "uploadUser1", computeHash(testFile), "resourceId1");
+ final int currVersion = this.loader.getLatestProjectVersion(project);
+ Assert.assertEquals(currVersion, newVersion);
+ }
+
+ @Test
+ public void testFetchProjectMetaData() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+ final ProjectFileHandler pfh = this.loader.fetchProjectMetaData(project.getId(), newVersion);
+ Assert.assertEquals(pfh.getVersion(), newVersion);
+ }
+
+ @Test
+ public void testChangeProjectVersion() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 7;
+ this.loader.changeProjectVersion(project, newVersion, "uploadUser1");
+ final Project sameProject= this.loader.fetchProjectById(project.getId());
+ Assert.assertEquals(sameProject.getVersion(), newVersion);
+ }
+
+ @Test
+ public void testUpdatePermission() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+
+ final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+ Assert.assertEquals(permissionsTriple.size(), 1);
+ Assert.assertEquals(permissionsTriple.get(0).getFirst(), "testUser1");
+ Assert.assertEquals(permissionsTriple.get(0).getThird().toString(), "ADMIN");
+ }
+
+ @Test
+ public void testUpdateProjectSettings() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ Assert.assertEquals(project.getProxyUsers().size(), 0);
+ project.addProxyUser("ProxyUser");
+ this.loader.updateProjectSettings(project);
+ final Project sameProject = this.loader.fetchProjectByName("mytestProject");
+ Assert.assertEquals(sameProject.getProxyUsers().size(), 1);
+ }
+
+ @Test
+ public void testRemovePermission() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+ this.loader.removePermission(project, project.getLastModifiedUser(), false);
+ final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+ Assert.assertEquals(permissionsTriple.size(), 0);
+ }
+
+ @Test
+ public void testRemoveProject() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ Assert.assertEquals(project.isActive(), true);
+ this.loader.removeProject(project, "testUser1");
+ final Project removedProject = this.loader.fetchProjectByName("mytestProject");
+ Assert.assertEquals(removedProject.isActive(), false);
+ }
+
+ @Test
+ public void testPostAndGetEvent() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader
+ .postEvent(project, ProjectLogEvent.EventType.CREATED, "testUser1", "create a message bla");
+ final List<ProjectLogEvent> events = this.loader.getProjectEvents(project, 5, 0);
+ Assert.assertEquals(events.size(), 1);
+ Assert.assertEquals(events.get(0).getMessage(), "create a message bla");
+ }
+
+ @Test
+ public void testUpdateDescription() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.updateDescription(project, "new Description bla", "testUser1");
+ final Project sameProject = this.loader.fetchProjectByName("mytestProject");
+ Assert.assertEquals(sameProject.getDescription(), "new Description bla");
+ }
+
+ @Test
+ public void testUploadAndFetchFlow() throws Exception {
+ final Flow flow1 = new Flow("flow1");
+ final Flow flow2 = new Flow("flow2");
+ final List<Flow> flowList = Arrays.asList(flow1, flow2);
+
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.uploadFlows(project, project.getVersion(), flowList);
+
+ final List<Flow> flowList2 = this.loader.fetchAllProjectFlows(project);
+ Assert.assertEquals(flowList2.size(), 2);
+ }
+
+ @Test
+ public void testUpdateFlow() throws Exception {
+ final Flow flow1 = new Flow("flow1");
+ final List<Flow> flowList = Collections.singletonList(flow1);
+
+ flow1.setLayedOut(false);
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.uploadFlows(project, project.getVersion(), flowList);
+
+ flow1.setLayedOut(true);
+ this.loader.updateFlow(project, project.getVersion(), flow1);
+ final List<Flow> flowList2 = this.loader.fetchAllProjectFlows(project);
+ Assert.assertEquals(flowList2.get(0).isLayedOut(), true);
+ }
+
+ @Test
+ public void testUploadOrUpdateProjectProperty() throws Exception {
+ final Props props = new Props();
+ props.setSource("source1");
+ props.put("key1", "value1");
+ props.put("key2", "value2");
+
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.uploadProjectProperty(project, props);
+
+ final Props sameProps = this.loader.fetchProjectProperty(project, props.getSource());
+ Assert.assertEquals(sameProps.get("key1"), "value1");
+ Assert.assertEquals(sameProps.get("key2"), "value2");
+
+ props.put("key2", "value9");
+ this.loader.updateProjectProperty(project, props);
+
+ final Props sameProps2 = this.loader.fetchProjectProperty(project, props.getSource());
+ Assert.assertEquals(sameProps2.get("key2"), "value9");
+ }
+
+ @Test
+ public void testFetchProjectProperties() throws Exception {
+ final Props props1 = new Props();
+ props1.setSource("source1");
+ props1.put("key1", "value1");
+ props1.put("key2", "value2");
+
+ final Props props2 = new Props();
+ props2.setSource("source2");
+ props2.put("keykey", "valuevalue1");
+ props2.put("keyaaa", "valueaaa");
+ final List<Props> list = Arrays.asList(props1, props2);
+
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ this.loader.uploadProjectProperties(project, list);
+
+ final Map<String, Props> propsMap = this.loader.fetchProjectProperties(project.getId(), project.getVersion());
+ Assert.assertEquals(propsMap.get("source1").get("key2"), "value2");
+ Assert.assertEquals(propsMap.get("source2").get("keyaaa"), "valueaaa");
+ }
+
+ @Test
+ public void cleanOlderProjectVersion() throws Exception {
+ createThreeProjects();
+ final Project project = this.loader.fetchProjectByName("mytestProject");
+ final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+ final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+ this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+
+ final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
+ Assert.assertEquals(fileHandler.getNumChunks(), 1);
+
+ this.loader.cleanOlderProjectVersion(project.getId(), newVersion+1);
+
+ final ProjectFileHandler fileHandler2 = this.loader
+ .fetchProjectMetaData(project.getId(), newVersion);
+ Assert.assertEquals(fileHandler2.getNumChunks(), 0);
+ }
+
+ @After
+ public void clearDB() {
+ try {
+ dbOperator.update("DELETE FROM projects");
+ dbOperator.update("DELETE FROM project_versions");
+ dbOperator.update("DELETE FROM project_properties");
+ dbOperator.update("DELETE FROM project_permissions");
+ dbOperator.update("DELETE FROM project_flows");
+ dbOperator.update("DELETE FROM project_files");
+ dbOperator.update("DELETE FROM project_events");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index c05b596..f41a505 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -231,7 +231,7 @@ public class MockProjectLoader implements ProjectLoader {
@Override
public List<Triple<String, Boolean, Permission>> getProjectPermissions(
- final int projectId) throws ProjectManagerException {
+ final Project project) throws ProjectManagerException {
// TODO Auto-generated method stub
return null;
}
diff --git a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
index a44d31b..ac3d6bd 100644
--- a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
+++ b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
@@ -21,7 +21,7 @@ import static azkaban.ServiceProvider.SERVICE_PROVIDER;
import static org.junit.Assert.assertNotNull;
import azkaban.db.DatabaseOperator;
-import azkaban.project.JdbcProjectLoader;
+import azkaban.project.JdbcProjectImpl;
import azkaban.spi.Storage;
import azkaban.storage.DatabaseStorage;
import azkaban.storage.LocalStorage;
@@ -58,7 +58,7 @@ public class ServiceProviderTest {
SERVICE_PROVIDER.unsetInjector();
SERVICE_PROVIDER.setInjector(injector);
- assertNotNull(SERVICE_PROVIDER.getInstance(JdbcProjectLoader.class));
+ assertNotNull(SERVICE_PROVIDER.getInstance(JdbcProjectImpl.class));
assertNotNull(SERVICE_PROVIDER.getInstance(StorageManager.class));
assertNotNull(SERVICE_PROVIDER.getInstance(DatabaseStorage.class));
assertNotNull(SERVICE_PROVIDER.getInstance(LocalStorage.class));
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
index 5af5532..c001ac6 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
@@ -59,4 +59,9 @@ public interface DatabaseOperator {
* @return The number of rows updated.
*/
int update(String updateClause, Object... params) throws SQLException;
+
+ /**
+ * @return datasource wrapped in the database operator.
+ */
+ AzkabanDataSource getDataSource();
}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
index 68abc9f..556a534 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
@@ -15,7 +15,7 @@
*/
package azkaban.db;
-import static java.util.Objects.*;
+import static java.util.Objects.requireNonNull;
import com.google.inject.Inject;
import java.sql.Connection;
@@ -101,4 +101,9 @@ public class DatabaseOperatorImpl implements DatabaseOperator {
throw ex;
}
}
+
+ @Override
+ public AzkabanDataSource getDataSource() {
+ return (AzkabanDataSource) this.queryRunner.getDataSource();
+ }
}
diff --git a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
index d070730..c6bc817 100644
--- a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
+++ b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
@@ -16,11 +16,11 @@
*/
package azkaban.db;
-class AzDBTestUtility {
+public class AzDBTestUtility {
public static class EmbeddedH2BasicDataSource extends AzkabanDataSource {
- EmbeddedH2BasicDataSource() {
+ public EmbeddedH2BasicDataSource() {
super();
final String url = "jdbc:h2:mem:test";
setDriverClassName("org.h2.Driver");