Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
index 0fdc1fc..f341f7a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
@@ -16,52 +16,44 @@
package azkaban.executor;
-import azkaban.database.AbstractJdbcLoader;
import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
-import azkaban.utils.Props;
import java.sql.SQLException;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
+//TODO jamiesjc: This class is deprecated as we don't fetch active_execution_flow table any longer.
+// So this class should be removed onwards.
+@Deprecated
@Singleton
-public class ActiveExecutingFlowsDao extends AbstractJdbcLoader{
+public class ActiveExecutingFlowsDao {
private final DatabaseOperator dbOperator;
@Inject
- ActiveExecutingFlowsDao(final Props props, final CommonMetrics commonMetrics,
- final DatabaseOperator dbOperator) {
- super(props, commonMetrics);
+ ActiveExecutingFlowsDao(final DatabaseOperator dbOperator) {
this.dbOperator = dbOperator;
}
void addActiveExecutableReference(final ExecutionReference reference)
throws ExecutorManagerException {
- final String INSERT =
- "INSERT INTO active_executing_flows "
- + "(exec_id, update_time) values (?,?)";
- final QueryRunner runner = createQueryRunner();
-
+ final String INSERT = "INSERT INTO active_executing_flows "
+ + "(exec_id, update_time) values (?,?)";
try {
- runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
+ this.dbOperator.update(INSERT, reference.getExecId(), reference.getUpdateTime());
} catch (final SQLException e) {
throw new ExecutorManagerException(
"Error updating active flow reference " + reference.getExecId(), e);
}
}
- void removeActiveExecutableReference(final int execid)
+ void removeActiveExecutableReference(final int execId)
throws ExecutorManagerException {
final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
-
- final QueryRunner runner = createQueryRunner();
try {
- runner.update(DELETE, execid);
+ this.dbOperator.update(DELETE, execId);
} catch (final SQLException e) {
throw new ExecutorManagerException(
- "Error deleting active flow reference " + execid, e);
+ "Error deleting active flow reference " + execId, e);
}
}
@@ -70,16 +62,12 @@ public class ActiveExecutingFlowsDao extends AbstractJdbcLoader{
final String DELETE =
"UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
- final QueryRunner runner = createQueryRunner();
- int updateNum = 0;
try {
- updateNum = runner.update(DELETE, updateTime, execId);
+ // Should be 1.
+ return this.dbOperator.update(DELETE, updateTime, execId) > 0;
} catch (final SQLException e) {
throw new ExecutorManagerException(
"Error deleting active flow reference " + execId, e);
}
-
- // Should be 1.
- return updateNum > 0;
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
index 5c226c1..88fe492 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
@@ -16,9 +16,7 @@
package azkaban.executor;
-import azkaban.database.AbstractJdbcLoader;
import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
@@ -26,7 +24,6 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import java.io.File;
import java.io.IOException;
-import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -35,32 +32,26 @@ import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.apache.commons.dbutils.DbUtils;
-import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@Singleton
-public class ExecutionJobDao extends AbstractJdbcLoader{
+public class ExecutionJobDao {
private static final Logger logger = Logger.getLogger(ExecutorDao.class);
private final DatabaseOperator dbOperator;
- private final EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- ExecutionJobDao(final Props props, final CommonMetrics commonMetrics,
- final DatabaseOperator databaseOperator) {
- super(props, commonMetrics);
+ ExecutionJobDao(final DatabaseOperator databaseOperator) {
this.dbOperator = databaseOperator;
}
- void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
+ public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
throws ExecutorManagerException {
- final String INSERT_EXECUTION_NODE =
- "INSERT INTO execution_jobs "
- + "(exec_id, project_id, version, flow_id, job_id, start_time, "
- + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
+ final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
+ + "(exec_id, project_id, version, flow_id, job_id, start_time, "
+ + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
byte[] inputParam = null;
if (inputProps != null) {
@@ -75,10 +66,9 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
final ExecutableFlow flow = node.getExecutableFlow();
final String flowId = node.getParentFlow().getFlowPath();
- System.out.println("Uploading flowId " + flowId);
- final QueryRunner runner = createQueryRunner();
+ logger.info("Uploading flowId " + flowId);
try {
- runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
+ this.dbOperator.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
inputParam, node.getAttempt());
@@ -87,12 +77,10 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
}
}
- void updateExecutableNode(final ExecutableNode node)
- throws ExecutorManagerException {
- final String UPSERT_EXECUTION_NODE =
- "UPDATE execution_jobs "
- + "SET start_time=?, end_time=?, status=?, output_params=? "
- + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+ public void updateExecutableNode(final ExecutableNode node) throws ExecutorManagerException {
+ final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs "
+ + "SET start_time=?, end_time=?, status=?, output_params=? "
+ + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
byte[] outputParam = null;
final Props outputProps = node.getOutputProps();
@@ -105,62 +93,54 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
throw new ExecutorManagerException("Error encoding input params");
}
}
-
- final QueryRunner runner = createQueryRunner();
try {
- runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
+ this.dbOperator.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
.getEndTime(), node.getStatus().getNumVal(), outputParam, node
.getExecutableFlow().getExecutionId(), node.getParentFlow()
.getFlowPath(), node.getId(), node.getAttempt());
} catch (final SQLException e) {
- throw new ExecutorManagerException("Error updating job " + node.getId(),
- e);
+ throw new ExecutorManagerException("Error updating job " + node.getId(), e);
}
}
- List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
+ public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
try {
- final List<ExecutableJobInfo> info =
- runner.query(
- FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
- new FetchExecutableJobHandler(), execId, jobId);
+ final List<ExecutableJobInfo> info = this.dbOperator.query(
+ FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
+ new FetchExecutableJobHandler(), execId, jobId);
if (info == null || info.isEmpty()) {
return null;
+ } else {
+ return info;
}
- return info;
} catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
- ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
+ public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
try {
final List<ExecutableJobInfo> info =
- runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
+ this.dbOperator.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
new FetchExecutableJobHandler(), execId, jobId, attempts);
if (info == null || info.isEmpty()) {
return null;
+ } else {
+ return info.get(0);
}
- return info.get(0);
} catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
- Props fetchExecutionJobInputProps(final int execId, final String jobId)
+ public Props fetchExecutionJobInputProps(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
try {
- final Pair<Props, Props> props =
- runner.query(
- FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
- new FetchExecutableJobPropsHandler(), execId, jobId);
+ final Pair<Props, Props> props = this.dbOperator.query(
+ FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(), execId, jobId);
return props.getFirst();
} catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job params " + execId
@@ -168,15 +148,12 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
}
}
- Props fetchExecutionJobOutputProps(final int execId, final String jobId)
+ public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
try {
- final Pair<Props, Props> props =
- runner
- .query(
- FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
- new FetchExecutableJobPropsHandler(), execId, jobId);
+ final Pair<Props, Props> props = this.dbOperator.query(
+ FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(), execId, jobId);
return props.getFirst();
} catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job params " + execId
@@ -184,56 +161,47 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
}
}
- Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
+ public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
try {
- final Pair<Props, Props> props = runner .query(
- FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
- new FetchExecutableJobPropsHandler(), execId, jobId);
- return props;
+ return this.dbOperator.query(
+ FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(), execId, jobId);
} catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job params " + execId
+ " " + jobId, e);
}
}
- List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
- final int skip, final int size)
- throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
+ public List<ExecutableJobInfo> fetchJobHistory(final int projectId,
+ final String jobId,
+ final int skip,
+ final int size) throws ExecutorManagerException {
try {
final List<ExecutableJobInfo> info =
- runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
+ this.dbOperator.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
new FetchExecutableJobHandler(), projectId, jobId, skip, size);
if (info == null || info.isEmpty()) {
return null;
+ } else {
+ return info;
}
- return info;
} catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
- List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
+ public List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
try {
- final String attachments =
- runner
- .query(
- FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
- new FetchExecutableJobAttachmentsHandler(), execId, jobId);
+ final String attachments = this.dbOperator.query(
+ FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+ new FetchExecutableJobAttachmentsHandler(), execId, jobId);
if (attachments == null) {
return null;
+ } else {
+ return (List<Object>) JSONUtils.parseJSONFromString(attachments);
}
-
- final List<Object> attachmentList =
- (List<Object>) JSONUtils.parseJSONFromString(attachments);
-
- return attachmentList;
} catch (final IOException e) {
throw new ExecutorManagerException(
"Error converting job attachments to JSON " + jobId, e);
@@ -243,49 +211,20 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
}
}
- void uploadAttachmentFile(final ExecutableNode node, final File file)
- throws ExecutorManagerException {
- final Connection connection = getConnection();
- try {
- uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
- connection.commit();
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error committing attachments ", e);
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error uploading attachments ", e);
- } finally {
- DbUtils.closeQuietly(connection);
- }
- }
-
- private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
- final File file, final EncodingType encType) throws SQLException, IOException {
-
- final String jsonString = FileUtils.readFileToString(file);
- final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
-
+ public void uploadAttachmentFile(final ExecutableNode node, final File file)
+ throws ExecutorManagerException {
final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
"UPDATE execution_jobs " + "SET attachments=? "
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
-
- final QueryRunner runner = new QueryRunner();
- runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
- node.getExecutableFlow().getExecutionId(), node.getParentFlow()
- .getNestedId(), node.getId(), node.getAttempt());
- }
-
- /**
- * TODO kunkun-tang: Will be removed during the next refactor.
- */
- private Connection getConnection() throws ExecutorManagerException {
- Connection connection = null;
try {
- connection = super.getDBConnection(false);
- } catch (final Exception e) {
- DbUtils.closeQuietly(connection);
- throw new ExecutorManagerException("Error getting DB connection.", e);
+ final String jsonString = FileUtils.readFileToString(file);
+ final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+ this.dbOperator.update(UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
+ node.getExecutableFlow().getExecutionId(), node.getParentFlow()
+ .getNestedId(), node.getId(), node.getAttempt());
+ } catch (final IOException | SQLException e) {
+ throw new ExecutorManagerException("Error uploading attachments.", e);
}
- return connection;
}
private static class FetchExecutableJobHandler implements
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
index da96fc9..0821814 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
@@ -16,80 +16,75 @@
package azkaban.executor;
-import azkaban.database.AbstractJdbcLoader;
+import azkaban.database.EncodingType;
import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
+import azkaban.db.DatabaseTransOperator;
+import azkaban.db.SQLTransaction;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.GZIPUtils;
import azkaban.utils.Pair;
-import azkaban.utils.Props;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.apache.commons.dbutils.DbUtils;
-import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
import org.joda.time.DateTime;
+
@Singleton
-public class ExecutionLogsDao extends AbstractJdbcLoader{
+public class ExecutionLogsDao {
+ private static final Logger logger = Logger.getLogger(ExecutionLogsDao.class);
private final DatabaseOperator dbOperator;
private final EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- ExecutionLogsDao(final Props props, final CommonMetrics commonMetrics,
- final DatabaseOperator dbOperator) {
- super(props, commonMetrics);
+ ExecutionLogsDao(final DatabaseOperator dbOperator) {
this.dbOperator = dbOperator;
}
+ // TODO kunkun-tang: the interface's parameter is called endByte, but actually is length.
LogData fetchLogs(final int execId, final String name, final int attempt,
final int startByte,
final int length) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
final FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
try {
- final LogData result =
- runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
- attempt, startByte, startByte + length);
- return result;
+ return this.dbOperator.query(FetchLogsHandler.FETCH_LOGS, handler,
+ execId, name, attempt, startByte, startByte + length);
} catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching logs " + execId
+ " : " + name, e);
}
}
- void uploadLogFile(final int execId, final String name, final int attempt,
- final File... files)
- throws ExecutorManagerException {
- final Connection connection = getConnection();
+ public void uploadLogFile(final int execId, final String name, final int attempt,
+ final File... files) throws ExecutorManagerException {
+ final SQLTransaction<Integer> transaction = transOperator -> {
+ uploadLogFile(transOperator, execId, name, attempt, files, this.defaultEncodingType);
+ transOperator.getConnection().commit();
+ return 1;
+ };
try {
- uploadLogFile(connection, execId, name, attempt, files,
- getDefaultEncodingType());
- connection.commit();
- } catch (final SQLException | IOException e) {
- throw new ExecutorManagerException("Error committing log", e);
- } finally {
- DbUtils.closeQuietly(connection);
+ this.dbOperator.transaction(transaction);
+ } catch (final SQLException e) {
+ logger.error("uploadLogFile failed.", e);
+ throw new ExecutorManagerException("uploadLogFile failed.", e);
}
}
- void uploadLogFile(final Connection connection, final int execId, final String name,
- final int attempt, final File[] files, final EncodingType encType)
- throws ExecutorManagerException, IOException {
+ private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId, final String name,
+ final int attempt, final File[] files, final EncodingType encType)
+ throws SQLException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
final byte[] buffer = new byte[50 * 1024];
@@ -107,7 +102,7 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
while (size >= 0) {
if (pos + size == buffer.length) {
// Flush here.
- uploadLogPart(connection, execId, name, attempt, startByte,
+ uploadLogPart(transOperator, execId, name, attempt, startByte,
startByte + buffer.length, encType, buffer, buffer.length);
pos = 0;
@@ -127,13 +122,15 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
// Final commit of buffer.
if (pos > 0) {
- uploadLogPart(connection, execId, name, attempt, startByte, startByte
+ uploadLogPart(transOperator, execId, name, attempt, startByte, startByte
+ pos, encType, buffer, pos);
}
} catch (final SQLException e) {
- throw new ExecutorManagerException("Error writing log part.", e);
+ logger.error("Error writing log part.", e);
+ throw new SQLException("Error writing log part", e);
} catch (final IOException e) {
- throw new ExecutorManagerException("Error chunking", e);
+ logger.error("Error chunking.", e);
+ throw new SQLException("Error chunking", e);
}
}
@@ -141,35 +138,25 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
throws ExecutorManagerException {
final String DELETE_BY_TIME =
"DELETE FROM execution_logs WHERE upload_time < ?";
-
- final QueryRunner runner = this.createQueryRunner();
- int updateNum = 0;
try {
- updateNum = runner.update(DELETE_BY_TIME, millis);
+ return this.dbOperator.update(DELETE_BY_TIME, millis);
} catch (final SQLException e) {
- e.printStackTrace();
+ logger.error("delete execution logs failed", e);
throw new ExecutorManagerException(
"Error deleting old execution_logs before " + millis, e);
}
-
- return updateNum;
}
- // TODO kunkun-tang: Will be removed in the future refactor.
- private EncodingType getDefaultEncodingType() {
- return this.defaultEncodingType;
- }
-
- private void uploadLogPart(final Connection connection, final int execId, final String name,
+ private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId,
+ final String name,
final int attempt, final int startByte, final int endByte,
final EncodingType encType,
- final byte[] buffer, final int length) throws SQLException, IOException {
- final String INSERT_EXECUTION_LOGS =
- "INSERT INTO execution_logs "
- + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
- + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
+ final byte[] buffer, final int length)
+ throws SQLException, IOException {
+ final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
+ + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+ + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
- final QueryRunner runner = new QueryRunner();
byte[] buf = buffer;
if (encType == EncodingType.GZIP) {
buf = GZIPUtils.gzipBytes(buf, 0, length);
@@ -177,23 +164,11 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
buf = Arrays.copyOf(buffer, length);
}
- runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt,
+ transOperator.update(INSERT_EXECUTION_LOGS, execId, name, attempt,
encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
.getMillis());
}
- // TODO kunkun-tang: should be removed in future refactor.
- private Connection getConnection() throws ExecutorManagerException {
- Connection connection = null;
- try {
- connection = super.getDBConnection(false);
- } catch (final Exception e) {
- DbUtils.closeQuietly(connection);
- throw new ExecutorManagerException("Error getting DB connection.", e);
- }
- return connection;
- }
-
private static class FetchLogsHandler implements ResultSetHandler<LogData> {
private static final String FETCH_LOGS =
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
index 6c7f44d..d4e581c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
@@ -16,11 +16,8 @@
package azkaban.executor;
-import azkaban.database.AbstractJdbcLoader;
import azkaban.db.DatabaseOperator;
import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.metrics.CommonMetrics;
-import azkaban.utils.Props;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -29,31 +26,25 @@ import java.util.Date;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@Singleton
-public class ExecutorEventsDao extends AbstractJdbcLoader {
+public class ExecutorEventsDao {
private final DatabaseOperator dbOperator;
@Inject
- public ExecutorEventsDao(final Props props, final CommonMetrics commonMetrics,
- final DatabaseOperator dbOperator) {
- super(props, commonMetrics);
+ public ExecutorEventsDao(final DatabaseOperator dbOperator) {
this.dbOperator = dbOperator;
}
public void postExecutorEvent(final Executor executor, final EventType type, final String user,
final String message) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
final String INSERT_PROJECT_EVENTS =
"INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
- final Date updateDate = new Date();
try {
- runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
- updateDate, user, message);
+ this.dbOperator.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+ new Date(), user, message);
} catch (final SQLException e) {
throw new ExecutorManagerException("Failed to post executor event", e);
}
@@ -62,20 +53,13 @@ public class ExecutorEventsDao extends AbstractJdbcLoader {
public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
final int offset)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
- final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
- List<ExecutorLogEvent> events = null;
try {
- events =
- runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
- logHandler, executor.getId(), num, offset);
+ return this.dbOperator.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+ new ExecutorLogsResultHandler(), executor.getId(), num, offset);
} catch (final SQLException e) {
throw new ExecutorManagerException(
"Failed to fetch events for executor id : " + executor.getId(), e);
}
-
- return events;
}
/**
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index d1ba681..0b3f06c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -16,13 +16,11 @@
package azkaban.executor;
-import azkaban.database.AbstractJdbcLoader;
+import azkaban.database.EncodingType;
import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
-import azkaban.utils.Props;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -33,33 +31,25 @@ import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;
@Singleton
-public class FetchActiveFlowDao extends AbstractJdbcLoader {
+public class FetchActiveFlowDao {
private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
private final DatabaseOperator dbOperator;
@Inject
- public FetchActiveFlowDao(final Props props, final CommonMetrics commonMetrics,
- final DatabaseOperator dbOperator) {
- super(props, commonMetrics);
+ public FetchActiveFlowDao(final DatabaseOperator dbOperator) {
this.dbOperator = dbOperator;
}
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
-
try {
- final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
- runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
- flowHandler);
- return properties;
+ return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+ new FetchActiveExecutableFlows());
} catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
@@ -67,14 +57,11 @@ public class FetchActiveFlowDao extends AbstractJdbcLoader {
Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
-
try {
final List<Pair<ExecutionReference, ExecutableFlow>> flows =
- runner.query(
- FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
- flowHandler, execId);
+ this.dbOperator
+ .query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
+ new FetchActiveExecutableFlowByExecId(), execId);
if (flows.isEmpty()) {
return null;
} else {
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0818532..3d1da87 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -25,7 +25,6 @@ import azkaban.utils.Props;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.File;
-import java.io.IOException;
import java.sql.Connection;
import java.time.Duration;
import java.util.List;
@@ -263,14 +262,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
this.executionLogsDao.uploadLogFile(execId, name, attempt, files);
}
- private void uploadLogFile(final Connection connection, final int execId, final String name,
- final int attempt, final File[] files, final EncodingType encType)
- throws ExecutorManagerException, IOException {
- // 50K buffer... if logs are greater than this, we chunk.
- // However, we better prevent large log files from being uploaded somehow
- this.executionLogsDao.uploadLogFile(connection, execId, name, attempt, files, encType);
- }
-
@Override
public void uploadAttachmentFile(final ExecutableNode node, final File file)
throws ExecutorManagerException {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 2cc254e..badcb18 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -21,17 +21,22 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import azkaban.db.DatabaseOperator;
import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Pair;
+import azkaban.utils.Props;
import azkaban.utils.TestUtils;
+import java.io.File;
import java.sql.SQLException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
public class ExecutionFlowDaoTest {
@@ -43,6 +48,8 @@ public class ExecutionFlowDaoTest {
private ExecutionFlowDao executionFlowDao;
private ExecutorDao executorDao;
private AssignExecutorDao assignExecutor;
+ private FetchActiveFlowDao fetchActiveFlowDao;
+ private ExecutionJobDao executionJobDao;
@BeforeClass
public static void setUp() throws Exception {
@@ -62,8 +69,10 @@ public class ExecutionFlowDaoTest {
@Before
public void setup() {
this.executionFlowDao = new ExecutionFlowDao(dbOperator);
- this.executorDao= new ExecutorDao(dbOperator);
- this.assignExecutor= new AssignExecutorDao(dbOperator, this.executorDao);
+ this.executorDao = new ExecutorDao(dbOperator);
+ this.assignExecutor = new AssignExecutorDao(dbOperator, this.executorDao);
+ this.fetchActiveFlowDao = new FetchActiveFlowDao(dbOperator);
+ this.executionJobDao = new ExecutionJobDao(dbOperator);
}
@After
@@ -225,6 +234,111 @@ public class ExecutionFlowDaoTest {
.hasMessageContaining("non-existent execution");
}
+
+ @Test
+ public void testFetchActiveFlowsExecutorAssigned() throws Exception {
+
+ // Upload flow1, executor assigned
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ this.executionFlowDao.uploadExecutableFlow(flow1);
+ final Executor executor = this.executorDao.addExecutor("test", 1);
+ this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
+
+ // Upload flow2, executor not assigned
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ this.executionFlowDao.uploadExecutableFlow(flow2);
+
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+ this.fetchActiveFlowDao.fetchActiveFlows();
+
+ assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
+ assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isFalse();
+ final ExecutableFlow flow1Result =
+ activeFlows1.get(flow1.getExecutionId()).getSecond();
+ assertTwoFlowSame(flow1Result, flow1);
+ }
+
+ @Test
+ public void testFetchActiveFlowsStatusChanged() throws Exception {
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ this.executionFlowDao.uploadExecutableFlow(flow1);
+ final Executor executor = this.executorDao.addExecutor("test", 1);
+ this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
+
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+ this.fetchActiveFlowDao.fetchActiveFlows();
+
+ assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
+
+ // When flow status becomes SUCCEEDED/KILLED/FAILED, it should not be in active state
+ flow1.setStatus(Status.SUCCEEDED);
+ this.executionFlowDao.updateExecutableFlow(flow1);
+ activeFlows1 = this.fetchActiveFlowDao.fetchActiveFlows();
+ assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
+
+ flow1.setStatus(Status.KILLED);
+ this.executionFlowDao.updateExecutableFlow(flow1);
+ activeFlows1 = this.fetchActiveFlowDao.fetchActiveFlows();
+ assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
+
+ flow1.setStatus(Status.FAILED);
+ this.executionFlowDao.updateExecutableFlow(flow1);
+ activeFlows1 = this.fetchActiveFlowDao.fetchActiveFlows();
+ assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
+ }
+
+ @Test @Ignore
+ // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
+ // test methods as well.
+ public void testFetchActiveFlowsReferenceChanged() throws Exception {
+ }
+
+ @Test @Ignore
+ // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
+ // test methods as well.
+ public void testFetchActiveFlowByExecId() throws Exception {
+ }
+
+ @Test
+ public void testUploadAndFetchExecutableNode() throws Exception {
+
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow.setExecutionId(10);
+
+ final File jobFile = ExecutionsTestUtil.getFlowFile("exectest1", "job10.job");
+ final Props props = new Props(null, jobFile);
+ props.put("test", "test2");
+ final ExecutableNode oldNode = flow.getExecutableNode("job10");
+ oldNode.setStartTime(System.currentTimeMillis());
+ this.executionJobDao.uploadExecutableNode(oldNode, props);
+
+ final ExecutableJobInfo info = this.executionJobDao.fetchJobInfo(10, "job10", 0);
+ assertThat(flow.getEndTime()).isEqualTo(info.getEndTime());
+ assertThat(flow.getProjectId()).isEqualTo(info.getProjectId());
+ assertThat(flow.getVersion()).isEqualTo(info.getVersion());
+ assertThat(flow.getFlowId()).isEqualTo(info.getFlowId());
+
+ assertThat(oldNode.getId()).isEqualTo(info.getJobId());
+ assertThat(oldNode.getStatus()).isEqualTo(info.getStatus());
+ assertThat(oldNode.getStartTime()).isEqualTo(info.getStartTime());
+
+ // Fetch props
+ final Props outputProps = new Props();
+ outputProps.put("hello", "output");
+ oldNode.setOutputProps(outputProps);
+ oldNode.setEndTime(System.currentTimeMillis());
+ this.executionJobDao.updateExecutableNode(oldNode);
+
+ final Props fInputProps = this.executionJobDao.fetchExecutionJobInputProps(10, "job10");
+ final Props fOutputProps = this.executionJobDao.fetchExecutionJobOutputProps(10, "job10");
+ final Pair<Props, Props> inOutProps = this.executionJobDao.fetchExecutionJobProps(10, "job10");
+
+ assertThat(fInputProps.get("test")).isEqualTo("test2");
+ assertThat(fOutputProps.get("hello")).isEqualTo("output");
+ assertThat(inOutProps.getFirst().get("test")).isEqualTo("test2");
+ assertThat(inOutProps.getSecond().get("hello")).isEqualTo("output");
+ }
+
private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
@@ -238,4 +352,5 @@ public class ExecutionFlowDaoTest {
.isEqualTo(flow2.getExecutionOptions().getFailureAction());
assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
}
+
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
new file mode 100644
index 0000000..f22a597
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.FileIOUtils.LogData;
+import java.io.File;
+import java.sql.SQLException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExecutionLogsDaoTest {
+
+ private static final String LOG_TEST_DIR_NAME = "logtest";
+ private static DatabaseOperator dbOperator;
+ private ExecutionLogsDao executionLogsDao;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ dbOperator = Utils.initTestDB();
+ }
+
+ @AfterClass
+ public static void destroyDB() throws Exception {
+ try {
+ dbOperator.update("DROP ALL OBJECTS");
+ dbOperator.update("SHUTDOWN");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setup() {
+ this.executionLogsDao = new ExecutionLogsDao(dbOperator);
+ }
+
+ @After
+ public void clearDB() {
+ try {
+ dbOperator.update("delete from execution_logs");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testSmallUploadLog() throws ExecutorManagerException {
+ final File logDir = ExecutionsTestUtil.getFlowDir(LOG_TEST_DIR_NAME);
+ final File[] smalllog =
+ { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
+ new File(logDir, "log3.log") };
+
+ this.executionLogsDao.uploadLogFile(1, "smallFiles", 0, smalllog);
+
+ final LogData data = this.executionLogsDao.fetchLogs(1, "smallFiles", 0, 0, 50000);
+ assertThat(data).isNotNull();
+ assertThat(data.getLength()).isEqualTo(53);
+ System.out.println(data.toString());
+
+ final LogData data2 = this.executionLogsDao.fetchLogs(1, "smallFiles", 0, 10, 20);
+ System.out.println(data2.toString());
+
+ assertThat(data2).isNotNull();
+ assertThat(data2.getLength()).isEqualTo(20);
+ }
+
+ @Test
+ public void testLargeUploadLog() throws ExecutorManagerException {
+ final File logDir = ExecutionsTestUtil.getFlowDir(LOG_TEST_DIR_NAME);
+
+ // Multiple of 255 for Henry the Eigth
+ final File[] largelog =
+ { new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
+ new File(logDir, "largeLog3.log") };
+
+ this.executionLogsDao.uploadLogFile(1, "largeFiles", 0, largelog);
+
+ final LogData logsResult = this.executionLogsDao.fetchLogs(1, "largeFiles", 0, 0, 64000);
+ assertThat(logsResult).isNotNull();
+ assertThat(logsResult.getLength()).isEqualTo(64000);
+
+ final LogData logsResult2 = this.executionLogsDao.fetchLogs(1, "largeFiles", 0, 1000, 64000);
+ assertThat(logsResult2).isNotNull();
+ assertThat(logsResult2.getLength()).isEqualTo(64000);
+
+ final LogData logsResult3 = this.executionLogsDao.fetchLogs(1, "largeFiles", 0, 150000, 250000);
+ assertThat(logsResult3).isNotNull();
+ assertThat(logsResult3.getLength()).isEqualTo(185493);
+ }
+}