/*
* Copyright 2012 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.project;
import com.google.common.io.Files;
import com.google.inject.Inject;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import azkaban.database.AbstractJdbcLoader;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Md5Hasher;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Triple;
public class JdbcProjectLoader extends AbstractJdbcLoader implements
ProjectLoader {
private static final Logger logger = Logger
.getLogger(JdbcProjectLoader.class);
private static final int CHUCK_SIZE = 1024 * 1024 * 10;
private File tempDir;
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
public JdbcProjectLoader(Props props) {
super(props);
tempDir = new File(props.getString("project.temp.dir", "temp"));
if (!tempDir.exists()) {
tempDir.mkdirs();
}
}
@Override
public List<Project> fetchAllActiveProjects() throws ProjectManagerException {
Connection connection = getConnection();
List<Project> projects = null;
try {
projects = fetchAllActiveProjects(connection);
} finally {
DbUtils.closeQuietly(connection);
}
return projects;
}
private List<Project> fetchAllActiveProjects(Connection connection)
throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
ProjectResultHandler handler = new ProjectResultHandler();
List<Project> projects = null;
try {
projects =
runner.query(connection,
ProjectResultHandler.SELECT_ALL_ACTIVE_PROJECTS, handler);
for (Project project : projects) {
List<Triple<String, Boolean, Permission>> permissions =
fetchPermissionsForProject(connection, project);
for (Triple<String, Boolean, Permission> entry : permissions) {
if (entry.getSecond()) {
project.setGroupPermission(entry.getFirst(), entry.getThird());
} else {
project.setUserPermission(entry.getFirst(), entry.getThird());
}
}
}
} catch (SQLException e) {
throw new ProjectManagerException("Error retrieving all projects", e);
} finally {
DbUtils.closeQuietly(connection);
}
return projects;
}
@Override
public Project fetchProjectById(int id) throws ProjectManagerException {
Connection connection = getConnection();
Project project = null;
try {
project = fetchProjectById(connection, id);
} finally {
DbUtils.closeQuietly(connection);
}
return project;
}
private Project fetchProjectById(Connection connection, int id)
throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
// Fetch the project
Project project = null;
ProjectResultHandler handler = new ProjectResultHandler();
try {
List<Project> projects =
runner.query(connection, ProjectResultHandler.SELECT_PROJECT_BY_ID,
handler, id);
if (projects.isEmpty()) {
throw new ProjectManagerException("No project with id " + id
+ " exists in db.");
}
project = projects.get(0);
} catch (SQLException e) {
logger.error(ProjectResultHandler.SELECT_PROJECT_BY_ID + " failed.");
throw new ProjectManagerException(
"Query for existing project failed. Project " + id, e);
}
// Fetch the user permissions
List<Triple<String, Boolean, Permission>> permissions =
fetchPermissionsForProject(connection, project);
for (Triple<String, Boolean, Permission> perm : permissions) {
if (perm.getThird().toFlags() != 0) {
if (perm.getSecond()) {
project.setGroupPermission(perm.getFirst(), perm.getThird());
} else {
project.setUserPermission(perm.getFirst(), perm.getThird());
}
}
}
return project;
}
@Override
public Project fetchProjectByName(String name)
throws ProjectManagerException {
Connection connection = getConnection();
Project project = null;
try {
project = fetchProjectByName(connection, name);
} finally {
DbUtils.closeQuietly(connection);
}
return project;
}
private Project fetchProjectByName(Connection connection, String name)
throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
// Fetch the project
Project project;
ProjectResultHandler handler = new ProjectResultHandler();
// select active project from db first, if not exist, select inactive one.
// At most one active project with the same name exists in db.
try {
List<Project> projects =
runner.query(connection,
ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
if (projects.isEmpty()) {
projects =
runner.query(connection,
ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
if (projects.isEmpty()) {
throw new ProjectManagerException(
"No project with name " + name + " exists in db.");
}
}
project = projects.get(0);
} catch (SQLException e) {
logger.error(ProjectResultHandler.SELECT_PROJECT_BY_NAME
+ " failed.");
throw new ProjectManagerException(
"Query for existing project failed. Project " + name, e);
}
// Fetch the user permissions
List<Triple<String, Boolean, Permission>> permissions =
fetchPermissionsForProject(connection, project);
for (Triple<String, Boolean, Permission> perm : permissions) {
if (perm.getThird().toFlags() != 0) {
if (perm.getSecond()) {
project
.setGroupPermission(perm.getFirst(), perm.getThird());
} else {
project.setUserPermission(perm.getFirst(), perm.getThird());
}
}
}
return project;
}
private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(
Connection connection, Project project) throws ProjectManagerException {
ProjectPermissionsResultHandler permHander =
new ProjectPermissionsResultHandler();
QueryRunner runner = new QueryRunner();
List<Triple<String, Boolean, Permission>> permissions = null;
try {
permissions =
runner.query(connection,
ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION,
permHander, project.getId());
} catch (SQLException e) {
throw new ProjectManagerException("Query for permissions for "
+ project.getName() + " failed.", e);
}
return permissions;
}
/**
* Creates a Project in the db.
*
* It will throw an exception if it finds an active project of the same name,
* or the SQL fails
*/
@Override
public Project createNewProject(String name, String description, User creator)
throws ProjectManagerException {
Connection connection = getConnection();
Project project;
try {
// No need to commit, since createNewProject should commit.
project = createNewProject(connection, name, description, creator);
} finally {
DbUtils.closeQuietly(connection);
}
return project;
}
private synchronized Project createNewProject(Connection connection,
String name, String description, User creator)
throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
ProjectResultHandler handler = new ProjectResultHandler();
// See if it exists first.
try {
List<Project> project =
runner
.query(connection,
ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler,
name);
if (!project.isEmpty()) {
throw new ProjectManagerException("Active project with name " + name
+ " already exists in db.");
}
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException(
"Checking for existing project failed. " + name, e);
}
final String INSERT_PROJECT =
"INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
// Insert project
try {
long time = System.currentTimeMillis();
int i =
runner.update(connection, INSERT_PROJECT, name, true, time, time,
null, creator.getUserId(), description,
defaultEncodingType.getNumVal(), null);
if (i == 0) {
throw new ProjectManagerException("No projects have been inserted.");
}
connection.commit();
} catch (SQLException e) {
logger.error(INSERT_PROJECT + " failed.");
try {
connection.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
throw new ProjectManagerException(
"Insert project for existing project failed. " + name, e);
}
// Do another query to grab and return the project.
Project project = null;
try {
List<Project> projects =
runner
.query(connection,
ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler,
name);
if (projects.isEmpty()) {
throw new ProjectManagerException("No active project with name " + name
+ " exists in db.");
} else if (projects.size() > 1) {
throw new ProjectManagerException("More than one active project "
+ name);
}
project = projects.get(0);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException(
"Checking for existing project failed. " + name, e);
}
return project;
}
@Override
public void uploadProjectFile(int projectId, int version, File localFile, String uploader)
throws ProjectManagerException {
long startMs = System.currentTimeMillis();
logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]",
projectId, localFile.getName(), localFile.length()));
Connection connection = getConnection();
try {
/* Update DB with new project info */
addProjectToProjectVersions(connection, projectId, version, localFile, uploader, null);
uploadProjectFile(connection, projectId, version, localFile);
connection.commit();
long duration = (System.currentTimeMillis() - startMs) / 1000;
logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec",
projectId, localFile.getName(), localFile.length(), duration));
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error getting DB connection.", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
private void uploadProjectFile(Connection connection, int projectId, int version, File localFile)
throws ProjectManagerException {
/* Step 1: Upload File in chunks to DB */
int chunks = uploadFileInChunks(connection, projectId, version, localFile);
/* Step 2: Update number of chunks in DB */
updateChunksInProjectVersions(connection, projectId, version, chunks);
}
@Override
public void addProjectVersion(
int projectId,
int version,
File localFile,
String uploader,
String uri) throws ProjectManagerException {
try (Connection connection = getConnection()) {
addProjectToProjectVersions(connection, projectId, version, localFile, uploader, uri);
connection.commit();
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException(String.format("Add ProjectVersion failed. project id: %d version: %d",
projectId, version), e);
}
}
/**
* Insert a new version record to TABLE project_versions before uploading files.
*
* The reason for this operation:
* When error chunking happens in remote mysql server, incomplete file data remains
* in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
* the SQL exception prevents AZ from creating the new version record in Table project_versions.
* However, the Table project_files still reserve the incomplete files, which causes troubles
* when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
* inserting new files to db.
*
* Why this operation is safe:
* When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
* proj_v+1 will be used as the new version for the uploading files.
*
* Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
* When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
* When file uploading completes, AZ will clean all old chunks in DB afterward.
*/
private void addProjectToProjectVersions(Connection connection,
int projectId,
int version,
File localFile,
String uploader,
String uri) throws ProjectManagerException {
final long updateTime = System.currentTimeMillis();
QueryRunner runner = new QueryRunner();
logger.info("Creating message digest for upload " + localFile.getName());
byte[] md5;
try {
md5 = Md5Hasher.md5Hash(localFile);
} catch (IOException e) {
throw new ProjectManagerException("Error getting md5 hash.", e);
}
logger.info("Md5 hash created");
final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
+ "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri) values "
+ "(?,?,?,?,?,?,?,?,?)";
try {
/*
* As we don't know the num_chunks before uploading the file, we initialize it to 0,
* and will update it after uploading completes.
*/
runner.update(connection,
INSERT_PROJECT_VERSION,
projectId,
version,
updateTime,
uploader,
Files.getFileExtension(localFile.getName()),
localFile.getName(),
md5,
0,
uri);
} catch (SQLException e) {
String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
logger.error(msg, e);
throw new ProjectManagerException(msg, e);
}
}
private int uploadFileInChunks(Connection connection, int projectId, int version, File localFile)
throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
// Really... I doubt we'll get a > 2gig file. So int casting it is!
byte[] buffer = new byte[CHUCK_SIZE];
final String INSERT_PROJECT_FILES =
"INSERT INTO project_files (project_id, version, chunk, size, file) values (?,?,?,?,?)";
BufferedInputStream bufferedStream = null;
int chunk = 0;
try {
bufferedStream = new BufferedInputStream(new FileInputStream(localFile));
int size = bufferedStream.read(buffer);
while (size >= 0) {
logger.info("Read bytes for " + localFile.getName() + " size:" + size);
byte[] buf = buffer;
if (size < buffer.length) {
buf = Arrays.copyOfRange(buffer, 0, size);
}
try {
logger.info("Running update for " + localFile.getName() + " chunk " + chunk);
runner.update(connection, INSERT_PROJECT_FILES, projectId, version, chunk, size, buf);
/**
* We enforce az committing to db when uploading every single chunk,
* in order to reduce the transaction duration and conserve sql server resources.
*
* If the files to be uploaded is very large and we don't commit every single chunk,
* the remote mysql server will run into memory troubles.
*/
connection.commit();
logger.info("Finished update for " + localFile.getName() + " chunk " + chunk);
} catch (SQLException e) {
throw new ProjectManagerException("Error Chunking during uploading files to db...");
}
++chunk;
size = bufferedStream.read(buffer);
}
} catch (IOException e) {
throw new ProjectManagerException(String.format(
"Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d",
projectId, version, localFile.getName(), localFile.length(), chunk));
} finally {
IOUtils.closeQuietly(bufferedStream);
}
return chunk;
}
/**
* we update num_chunks's actual number to db here.
*/
private void updateChunksInProjectVersions(Connection connection, int projectId, int version, int chunk)
throws ProjectManagerException {
final String UPDATE_PROJECT_NUM_CHUNKS =
"UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
QueryRunner runner = new QueryRunner();
try {
runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, projectId, version);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
}
}
@Override
public ProjectFileHandler getUploadedFile(int projectId, int version)
throws ProjectManagerException {
logger.info("Retrieving to " + projectId + " version:" + version);
Connection connection = getConnection();
ProjectFileHandler handler = null;
try {
handler = getUploadedFile(connection, projectId, version);
} finally {
DbUtils.closeQuietly(connection);
}
return handler;
}
@Override
public ProjectFileHandler fetchProjectMetaData(int projectId, int version) {
ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
try (Connection connection = getConnection()) {
List<ProjectFileHandler> projectFiles = new QueryRunner().query(connection,
ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
if (projectFiles == null || projectFiles.isEmpty()) {
return null;
}
return projectFiles.get(0);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", e);
}
}
private ProjectFileHandler getUploadedFile(Connection connection,
int projectId, int version) throws ProjectManagerException {
ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
if (projHandler == null) {
return null;
}
int numChunks = projHandler.getNumChunks();
BufferedOutputStream bStream = null;
File file;
try {
try {
file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
bStream = new BufferedOutputStream(new FileOutputStream(file));
} catch (IOException e) {
throw new ProjectManagerException(
"Error creating temp file for stream.");
}
QueryRunner runner = new QueryRunner();
int collect = 5;
int fromChunk = 0;
int toChunk = collect;
do {
ProjectFileChunkResultHandler chunkHandler =
new ProjectFileChunkResultHandler();
List<byte[]> data = null;
try {
data =
runner.query(connection,
ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE,
chunkHandler, projectId, version, fromChunk, toChunk);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Query for uploaded file for "
+ projectId + " failed.", e);
}
try {
for (byte[] d : data) {
bStream.write(d);
}
} catch (IOException e) {
throw new ProjectManagerException("Error writing file", e);
}
// Add all the bytes to the stream.
fromChunk += collect;
toChunk += collect;
} while (fromChunk <= numChunks);
} finally {
IOUtils.closeQuietly(bStream);
}
// Check md5.
byte[] md5 = null;
try {
md5 = Md5Hasher.md5Hash(file);
} catch (IOException e) {
throw new ProjectManagerException("Error getting md5 hash.", e);
}
if (Arrays.equals(projHandler.getMd5Hash(), md5)) {
logger.info("Md5 Hash is valid");
} else {
throw new ProjectManagerException("Md5 Hash failed on retrieval of file");
}
projHandler.setLocalFile(file);
return projHandler;
}
@Override
public void changeProjectVersion(Project project, int version, String user)
throws ProjectManagerException {
long timestamp = System.currentTimeMillis();
QueryRunner runner = createQueryRunner();
try {
final String UPDATE_PROJECT_VERSION =
"UPDATE projects SET version=?,modified_time=?,last_modified_by=? WHERE id=?";
runner.update(UPDATE_PROJECT_VERSION, version, timestamp, user,
project.getId());
project.setVersion(version);
project.setLastModifiedTimestamp(timestamp);
project.setLastModifiedUser(user);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException(
"Error updating switching project version " + project.getName(), e);
}
}
@Override
public void updatePermission(Project project, String name, Permission perm,
boolean isGroup) throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
if (this.allowsOnDuplicateKey()) {
long updateTime = System.currentTimeMillis();
final String INSERT_PROJECT_PERMISSION =
"INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)"
+ "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
try {
runner.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime,
name, perm.toFlags(), isGroup);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error updating project "
+ project.getName() + " permissions for " + name, e);
}
} else {
long updateTime = System.currentTimeMillis();
final String MERGE_PROJECT_PERMISSION =
"MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
try {
runner.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime,
name, perm.toFlags(), isGroup);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error updating project "
+ project.getName() + " permissions for " + name, e);
}
}
if (isGroup) {
project.setGroupPermission(name, perm);
} else {
project.setUserPermission(name, perm);
}
}
@Override
public void updateProjectSettings(Project project)
throws ProjectManagerException {
Connection connection = getConnection();
try {
updateProjectSettings(connection, project, defaultEncodingType);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error updating project settings", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
private void updateProjectSettings(Connection connection, Project project,
EncodingType encType) throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
final String UPDATE_PROJECT_SETTINGS =
"UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
String json = JSONUtils.toJSON(project.toObject());
byte[] data = null;
try {
byte[] stringData = json.getBytes("UTF-8");
data = stringData;
if (encType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length
+ " Gzip:" + data.length);
} catch (IOException e) {
throw new ProjectManagerException("Failed to encode. ", e);
}
try {
runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.getNumVal(),
data, project.getId());
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error updating project "
+ project.getName() + " version " + project.getVersion(), e);
}
}
@Override
public void removePermission(Project project, String name, boolean isGroup)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
final String DELETE_PROJECT_PERMISSION =
"DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
try {
runner.update(DELETE_PROJECT_PERMISSION, project.getId(), name, isGroup);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error deleting project "
+ project.getName() + " permissions for " + name, e);
}
if (isGroup) {
project.removeGroupPermission(name);
} else {
project.removeUserPermission(name);
}
}
@Override
public List<Triple<String, Boolean, Permission>> getProjectPermissions(
int projectId) throws ProjectManagerException {
ProjectPermissionsResultHandler permHander =
new ProjectPermissionsResultHandler();
QueryRunner runner = createQueryRunner();
List<Triple<String, Boolean, Permission>> permissions = null;
try {
permissions =
runner.query(
ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION,
permHander, projectId);
} catch (SQLException e) {
throw new ProjectManagerException("Query for permissions for "
+ projectId + " failed.", e);
}
return permissions;
}
@Override
public void removeProject(Project project, String user)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
long updateTime = System.currentTimeMillis();
final String UPDATE_INACTIVE_PROJECT =
"UPDATE projects SET active=false,modified_time=?,last_modified_by=? WHERE id=?";
try {
runner.update(UPDATE_INACTIVE_PROJECT, updateTime, user, project.getId());
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error marking project "
+ project.getName() + " as inactive", e);
}
}
@Override
public boolean postEvent(Project project, EventType type, String user,
String message) {
QueryRunner runner = createQueryRunner();
final String INSERT_PROJECT_EVENTS =
"INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
long updateTime = System.currentTimeMillis();
try {
runner.update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(),
updateTime, user, message);
} catch (SQLException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Get all the logs for a given project
*
* @param project
* @return
* @throws ProjectManagerException
*/
@Override
public List<ProjectLogEvent> getProjectEvents(Project project, int num,
int skip) throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
List<ProjectLogEvent> events = null;
try {
events =
runner.query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER,
logHandler, project.getId(), num, skip);
} catch (SQLException e) {
logger.error(e);
}
return events;
}
@Override
public void updateDescription(Project project, String description, String user)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
final String UPDATE_PROJECT_DESCRIPTION =
"UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
long updateTime = System.currentTimeMillis();
try {
runner.update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user,
project.getId());
project.setDescription(description);
project.setLastModifiedTimestamp(updateTime);
project.setLastModifiedUser(user);
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error marking project "
+ project.getName() + " as inactive", e);
}
}
@Override
public int getLatestProjectVersion(Project project)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
IntHander handler = new IntHander();
try {
return runner.query(IntHander.SELECT_LATEST_VERSION, handler,
project.getId());
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error marking project "
+ project.getName() + " as inactive", e);
}
}
@Override
public void uploadFlows(Project project, int version, Collection<Flow> flows)
throws ProjectManagerException {
// We do one at a time instead of batch... because well, the batch could be
// large.
logger.info("Uploading flows");
Connection connection = getConnection();
try {
for (Flow flow : flows) {
uploadFlow(connection, project, version, flow, defaultEncodingType);
}
connection.commit();
} catch (IOException e) {
throw new ProjectManagerException("Flow Upload failed.", e);
} catch (SQLException e) {
throw new ProjectManagerException("Flow Upload failed.", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
@Override
public void uploadFlow(Project project, int version, Flow flow)
throws ProjectManagerException {
logger.info("Uploading flows");
Connection connection = getConnection();
try {
uploadFlow(connection, project, version, flow, defaultEncodingType);
connection.commit();
} catch (IOException e) {
throw new ProjectManagerException("Flow Upload failed.", e);
} catch (SQLException e) {
throw new ProjectManagerException("Flow Upload failed commit.", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
@Override
public void updateFlow(Project project, int version, Flow flow)
throws ProjectManagerException {
logger.info("Uploading flows");
Connection connection = getConnection();
try {
QueryRunner runner = new QueryRunner();
String json = JSONUtils.toJSON(flow.toObject());
byte[] stringData = json.getBytes("UTF-8");
byte[] data = stringData;
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
logger.info("Flow upload " + flow.getId() + " is byte size "
+ data.length);
final String UPDATE_FLOW =
"UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
try {
runner.update(connection, UPDATE_FLOW, defaultEncodingType.getNumVal(),
data, project.getId(), version, flow.getId());
} catch (SQLException e) {
e.printStackTrace();
throw new ProjectManagerException("Error inserting flow "
+ flow.getId(), e);
}
connection.commit();
} catch (IOException e) {
throw new ProjectManagerException("Flow Upload failed.", e);
} catch (SQLException e) {
throw new ProjectManagerException("Flow Upload failed commit.", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
public EncodingType getDefaultEncodingType() {
return defaultEncodingType;
}
public void setDefaultEncodingType(EncodingType defaultEncodingType) {
this.defaultEncodingType = defaultEncodingType;
}
private void uploadFlow(Connection connection, Project project, int version,
Flow flow, EncodingType encType) throws ProjectManagerException,
IOException {
QueryRunner runner = new QueryRunner();
String json = JSONUtils.toJSON(flow.toObject());
byte[] stringData = json.getBytes("UTF-8");
byte[] data = stringData;
if (encType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
final String INSERT_FLOW =
"INSERT INTO project_flows (project_id, version, flow_id, modified_time, encoding_type, json) values (?,?,?,?,?,?)";
try {
runner.update(connection, INSERT_FLOW, project.getId(), version,
flow.getId(), System.currentTimeMillis(), encType.getNumVal(), data);
} catch (SQLException e) {
throw new ProjectManagerException("Error inserting flow " + flow.getId(),
e);
}
}
@Override
public Flow fetchFlow(Project project, String flowId)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
try {
List<Flow> flows =
runner.query(ProjectFlowsResultHandler.SELECT_PROJECT_FLOW, handler,
project.getId(), project.getVersion(), flowId);
if (flows.isEmpty()) {
return null;
} else {
return flows.get(0);
}
} catch (SQLException e) {
throw new ProjectManagerException("Error fetching flow " + flowId, e);
}
}
@Override
public List<Flow> fetchAllProjectFlows(Project project)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
List<Flow> flows = null;
try {
flows =
runner.query(ProjectFlowsResultHandler.SELECT_ALL_PROJECT_FLOWS,
handler, project.getId(), project.getVersion());
} catch (SQLException e) {
throw new ProjectManagerException("Error fetching flows from project "
+ project.getName() + " version " + project.getVersion(), e);
}
return flows;
}
@Override
public void uploadProjectProperties(Project project, List<Props> properties)
throws ProjectManagerException {
Connection connection = getConnection();
try {
for (Props props : properties) {
uploadProjectProperty(connection, project, props.getSource(), props);
}
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException(
"Error uploading project property files", e);
} catch (IOException e) {
throw new ProjectManagerException(
"Error uploading project property files", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
@Override
public void uploadProjectProperty(Project project, Props props)
throws ProjectManagerException {
Connection connection = getConnection();
try {
uploadProjectProperty(connection, project, props.getSource(), props);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException(
"Error uploading project property files", e);
} catch (IOException e) {
throw new ProjectManagerException(
"Error uploading project property file", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
@Override
public void updateProjectProperty(Project project, Props props)
throws ProjectManagerException {
Connection connection = getConnection();
try {
updateProjectProperty(connection, project, props.getSource(), props);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException(
"Error uploading project property files", e);
} catch (IOException e) {
throw new ProjectManagerException(
"Error uploading project property file", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
private void updateProjectProperty(Connection connection, Project project,
String name, Props props) throws ProjectManagerException, IOException {
QueryRunner runner = new QueryRunner();
final String UPDATE_PROPERTIES =
"UPDATE project_properties SET property=? WHERE project_id=? AND version=? AND name=?";
String propertyJSON = PropsUtils.toJSONString(props, true);
byte[] data = propertyJSON.getBytes("UTF-8");
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(data);
}
try {
runner.update(connection, UPDATE_PROPERTIES, data, project.getId(),
project.getVersion(), name);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error updating property "
+ project.getName() + " version " + project.getVersion(), e);
}
}
private void uploadProjectProperty(Connection connection, Project project,
String name, Props props) throws ProjectManagerException, IOException {
QueryRunner runner = new QueryRunner();
final String INSERT_PROPERTIES =
"INSERT INTO project_properties (project_id, version, name, modified_time, encoding_type, property) values (?,?,?,?,?,?)";
String propertyJSON = PropsUtils.toJSONString(props, true);
byte[] data = propertyJSON.getBytes("UTF-8");
if (defaultEncodingType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(data);
}
try {
runner.update(connection, INSERT_PROPERTIES, project.getId(),
project.getVersion(), name, System.currentTimeMillis(),
defaultEncodingType.getNumVal(), data);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error uploading project properties "
+ name + " into " + project.getName() + " version "
+ project.getVersion(), e);
}
}
@Override
public Props fetchProjectProperty(int projectId, int projectVer,
String propsName) throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
ProjectPropertiesResultsHandler handler =
new ProjectPropertiesResultsHandler();
try {
List<Pair<String, Props>> properties =
runner.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY,
handler, projectId, projectVer, propsName);
if (properties == null || properties.isEmpty()) {
return null;
}
return properties.get(0).getSecond();
} catch (SQLException e) {
logger.error("Error fetching property " + propsName
+ " Project " + projectId + " version " + projectVer, e);
throw new ProjectManagerException("Error fetching property " + propsName,
e);
}
}
@Override
public Props fetchProjectProperty(Project project, String propsName)
throws ProjectManagerException {
// TODO: 11/23/16 call the other overloaded method fetchProjectProperty internally.
QueryRunner runner = createQueryRunner();
ProjectPropertiesResultsHandler handler =
new ProjectPropertiesResultsHandler();
try {
List<Pair<String, Props>> properties =
runner.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY,
handler, project.getId(), project.getVersion(), propsName);
if (properties == null || properties.isEmpty()) {
logger.warn("Project " + project.getId() + " version " + project.getVersion()
+ " property " + propsName + " is empty.");
return null;
}
return properties.get(0).getSecond();
} catch (SQLException e) {
logger.error("Error fetching property " + propsName
+ "Project " + project.getId() + " version " + project.getVersion(), e);
throw new ProjectManagerException("Error fetching property " + propsName
+ "Project " + project.getId() + " version " + project.getVersion(), e);
}
}
@Override
public void cleanOlderProjectVersion(int projectId, int version)
throws ProjectManagerException {
Connection connection = getConnection();
try {
cleanOlderProjectVersionFlows(connection, projectId, version);
cleanOlderProjectVersionProperties(connection, projectId, version);
cleanOlderProjectFiles(connection, projectId, version);
cleanOlderProjectVersion(connection, projectId, version);
} finally {
DbUtils.closeQuietly(connection);
}
}
private void cleanOlderProjectVersionFlows(Connection connection,
int projectId, int version) throws ProjectManagerException {
final String DELETE_FLOW =
"DELETE FROM project_flows WHERE project_id=? AND version<?";
QueryRunner runner = new QueryRunner();
try {
runner.update(connection, DELETE_FLOW, projectId, version);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error deleting project version flows "
+ projectId + ":" + version, e);
}
}
private void cleanOlderProjectVersionProperties(Connection connection,
int projectId, int version) throws ProjectManagerException {
final String DELETE_PROPERTIES =
"DELETE FROM project_properties WHERE project_id=? AND version<?";
QueryRunner runner = new QueryRunner();
try {
runner.update(connection, DELETE_PROPERTIES, projectId, version);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException(
"Error deleting project version properties " + projectId + ":"
+ version, e);
}
}
private void cleanOlderProjectFiles(Connection connection, int projectId,
int version) throws ProjectManagerException {
final String DELETE_PROJECT_FILES =
"DELETE FROM project_files WHERE project_id=? AND version<?";
QueryRunner runner = new QueryRunner();
try {
runner.update(connection, DELETE_PROJECT_FILES, projectId, version);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error deleting project version files "
+ projectId + ":" + version, e);
}
}
private void cleanOlderProjectVersion(Connection connection, int projectId,
int version) throws ProjectManagerException {
final String UPDATE_PROJECT_VERSIONS =
"UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
QueryRunner runner = new QueryRunner();
try {
runner.update(connection, UPDATE_PROJECT_VERSIONS, projectId, version);
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException(
"Error updating project version chunksize " + projectId + ":"
+ version, e);
}
}
@Override
public Map<String, Props> fetchProjectProperties(int projectId, int version)
throws ProjectManagerException {
QueryRunner runner = createQueryRunner();
ProjectPropertiesResultsHandler handler =
new ProjectPropertiesResultsHandler();
try {
List<Pair<String, Props>> properties =
runner.query(
ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
handler, projectId, version);
if (properties == null || properties.isEmpty()) {
return null;
}
HashMap<String, Props> props = new HashMap<String, Props>();
for (Pair<String, Props> pair : properties) {
props.put(pair.getFirst(), pair.getSecond());
}
return props;
} catch (SQLException e) {
logger.error("Error fetching properties, project id" + projectId + " version " + version, e);
throw new ProjectManagerException("Error fetching properties", e);
}
}
private static class ProjectResultHandler implements
ResultSetHandler<List<Project>> {
private static String SELECT_PROJECT_BY_NAME =
"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
private static String SELECT_PROJECT_BY_ID =
"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
private static String SELECT_ALL_ACTIVE_PROJECTS =
"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE active=true";
private static String SELECT_ACTIVE_PROJECT_BY_NAME =
"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true";
@Override
public List<Project> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<Project> emptyList();
}
ArrayList<Project> projects = new ArrayList<Project>();
do {
int id = rs.getInt(1);
String name = rs.getString(2);
boolean active = rs.getBoolean(3);
long modifiedTime = rs.getLong(4);
long createTime = rs.getLong(5);
int version = rs.getInt(6);
String lastModifiedBy = rs.getString(7);
String description = rs.getString(8);
int encodingType = rs.getInt(9);
byte[] data = rs.getBytes(10);
Project project;
if (data != null) {
EncodingType encType = EncodingType.fromInteger(encodingType);
Object blobObj;
try {
// Convoluted way to inflate strings. Should find common package or
// helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
blobObj = JSONUtils.parseJSONFromString(jsonString);
} else {
String jsonString = new String(data, "UTF-8");
blobObj = JSONUtils.parseJSONFromString(jsonString);
}
project = Project.projectFromObject(blobObj);
} catch (IOException e) {
throw new SQLException("Failed to get project.", e);
}
} else {
project = new Project(id, name);
}
// update the fields as they may have changed
project.setActive(active);
project.setLastModifiedTimestamp(modifiedTime);
project.setCreateTimestamp(createTime);
project.setVersion(version);
project.setLastModifiedUser(lastModifiedBy);
project.setDescription(description);
projects.add(project);
} while (rs.next());
return projects;
}
}
private static class ProjectPermissionsResultHandler implements
ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
private static String SELECT_PROJECT_PERMISSION =
"SELECT project_id, modified_time, name, permissions, isGroup FROM project_permissions WHERE project_id=?";
@Override
public List<Triple<String, Boolean, Permission>> handle(ResultSet rs)
throws SQLException {
if (!rs.next()) {
return Collections.<Triple<String, Boolean, Permission>> emptyList();
}
ArrayList<Triple<String, Boolean, Permission>> permissions =
new ArrayList<Triple<String, Boolean, Permission>>();
do {
String username = rs.getString(3);
int permissionFlag = rs.getInt(4);
boolean val = rs.getBoolean(5);
Permission perm = new Permission(permissionFlag);
permissions.add(new Triple<String, Boolean, Permission>(username, val,
perm));
} while (rs.next());
return permissions;
}
}
private static class ProjectFlowsResultHandler implements
ResultSetHandler<List<Flow>> {
private static String SELECT_PROJECT_FLOW =
"SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=? AND flow_id=?";
private static String SELECT_ALL_PROJECT_FLOWS =
"SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?";
@Override
public List<Flow> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<Flow> emptyList();
}
ArrayList<Flow> flows = new ArrayList<Flow>();
do {
String flowId = rs.getString(3);
int encodingType = rs.getInt(5);
byte[] dataBytes = rs.getBytes(6);
if (dataBytes == null) {
continue;
}
EncodingType encType = EncodingType.fromInteger(encodingType);
Object flowObj = null;
try {
// Convoluted way to inflate strings. Should find common package or
// helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
String jsonString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
} else {
String jsonString = new String(dataBytes, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
Flow flow = Flow.flowFromObject(flowObj);
flows.add(flow);
} catch (IOException e) {
throw new SQLException("Error retrieving flow data " + flowId, e);
}
} while (rs.next());
return flows;
}
}
private static class ProjectPropertiesResultsHandler implements
ResultSetHandler<List<Pair<String, Props>>> {
private static String SELECT_PROJECT_PROPERTY =
"SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=? AND name=?";
private static String SELECT_PROJECT_PROPERTIES =
"SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=?";
@Override
public List<Pair<String, Props>> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<Pair<String, Props>> emptyList();
}
List<Pair<String, Props>> properties =
new ArrayList<Pair<String, Props>>();
do {
String name = rs.getString(3);
int eventType = rs.getInt(5);
byte[] dataBytes = rs.getBytes(6);
EncodingType encType = EncodingType.fromInteger(eventType);
String propertyString = null;
try {
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
propertyString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
} else {
propertyString = new String(dataBytes, "UTF-8");
}
Props props = PropsUtils.fromJSONString(propertyString);
props.setSource(name);
properties.add(new Pair<String, Props>(name, props));
} catch (IOException e) {
throw new SQLException(e);
}
} while (rs.next());
return properties;
}
}
private static class ProjectLogsResultHandler implements
ResultSetHandler<List<ProjectLogEvent>> {
private static String SELECT_PROJECT_EVENTS_ORDER =
"SELECT project_id, event_type, event_time, username, message FROM project_events WHERE project_id=? ORDER BY event_time DESC LIMIT ? OFFSET ?";
@Override
public List<ProjectLogEvent> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<ProjectLogEvent> emptyList();
}
ArrayList<ProjectLogEvent> events = new ArrayList<ProjectLogEvent>();
do {
int projectId = rs.getInt(1);
int eventType = rs.getInt(2);
long eventTime = rs.getLong(3);
String username = rs.getString(4);
String message = rs.getString(5);
ProjectLogEvent event =
new ProjectLogEvent(projectId, EventType.fromInteger(eventType),
eventTime, username, message);
events.add(event);
} while (rs.next());
return events;
}
}
private static class ProjectFileChunkResultHandler implements
ResultSetHandler<List<byte[]>> {
private static String SELECT_PROJECT_CHUNKS_FILE =
"SELECT project_id, version, chunk, size, file FROM project_files WHERE project_id=? AND version=? AND chunk >= ? AND chunk < ? ORDER BY chunk ASC";
@Override
public List<byte[]> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<byte[]> emptyList();
}
ArrayList<byte[]> data = new ArrayList<byte[]>();
do {
byte[] bytes = rs.getBytes(5);
data.add(bytes);
} while (rs.next());
return data;
}
}
private static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
private static String SELECT_PROJECT_VERSION =
"SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri "
+ "FROM project_versions WHERE project_id=? AND version=?";
@Override
public List<ProjectFileHandler> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return null;
}
List<ProjectFileHandler> handlers = new ArrayList<ProjectFileHandler>();
do {
int projectId = rs.getInt(1);
int version = rs.getInt(2);
long uploadTime = rs.getLong(3);
String uploader = rs.getString(4);
String fileType = rs.getString(5);
String fileName = rs.getString(6);
byte[] md5 = rs.getBytes(7);
int numChunks = rs.getInt(8);
String uri = rs.getString(9);
ProjectFileHandler handler =
new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, uri);
handlers.add(handler);
} while (rs.next());
return handlers;
}
}
private static class IntHander implements ResultSetHandler<Integer> {
private static String SELECT_LATEST_VERSION =
"SELECT MAX(version) FROM project_versions WHERE project_id=?";
@Override
public Integer handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return 0;
}
return rs.getInt(1);
}
}
private Connection getConnection() throws ProjectManagerException {
Connection connection = null;
try {
connection = super.getDBConnection(false);
} catch (Exception e) {
DbUtils.closeQuietly(connection);
throw new ProjectManagerException("Error getting DB connection.", e);
}
return connection;
}
}