azkaban-aplcache
Changes
src/java/azkaban/executor/JdbcExecutorLoader.java 488(+358 -130)
Details
src/java/azkaban/executor/JdbcExecutorLoader.java 488(+358 -130)
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 37de61a..70b73da 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -47,8 +47,10 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
-public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLoader {
- private static final Logger logger = Logger.getLogger(JdbcExecutorLoader.class);
+public class JdbcExecutorLoader extends AbstractJdbcLoader
+ implements ExecutorLoader {
+ private static final Logger logger =
+ Logger.getLogger(JdbcExecutorLoader.class);
private EncodingType defaultEncodingType = EncodingType.GZIP;
@@ -65,11 +67,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public synchronized void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
+ public synchronized void uploadExecutableFlow(ExecutableFlow flow)
+ throws ExecutorManagerException {
Connection connection = getConnection();
try {
uploadExecutableFlow(connection, flow, defaultEncodingType);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new ExecutorManagerException("Error uploading flow", e);
}
finally {
@@ -77,17 +81,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
}
- private synchronized void uploadExecutableFlow(Connection connection, ExecutableFlow flow, EncodingType encType) throws ExecutorManagerException, IOException {
- final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)";
+ private synchronized void uploadExecutableFlow(Connection connection,
+ ExecutableFlow flow, EncodingType encType)
+ throws ExecutorManagerException, IOException {
+ final String INSERT_EXECUTABLE_FLOW =
+ "INSERT INTO execution_flows " +
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time) " +
+ "values (?,?,?,?,?,?,?)";
QueryRunner runner = new QueryRunner();
long submitTime = System.currentTimeMillis();
long id;
try {
flow.setStatus(Status.PREPARING);
- runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);
+ runner.update(
+ connection,
+ INSERT_EXECUTABLE_FLOW,
+ flow.getProjectId(),
+ flow.getFlowId(),
+ flow.getVersion(),
+ Status.PREPARING.getNumVal(),
+ submitTime,
+ flow.getSubmitUser(),
+ submitTime);
connection.commit();
- id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
+ id = runner.query(
+ connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
if (id == -1l) {
throw new ExecutorManagerException("Execution id is not properly created.");
@@ -96,13 +115,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
flow.setExecutionId((int)id);
updateExecutableFlow(connection, flow, encType);
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error creating execution.", e);
}
}
@Override
- public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
+ public void updateExecutableFlow(ExecutableFlow flow)
+ throws ExecutorManagerException {
Connection connection = this.getConnection();
try {
@@ -113,8 +134,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
}
- private void updateExecutableFlow(Connection connection, ExecutableFlow flow, EncodingType encType) throws ExecutorManagerException {
- final String UPDATE_EXECUTABLE_FLOW_DATA = "UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?";
+ private void updateExecutableFlow(
+ Connection connection, ExecutableFlow flow, EncodingType encType)
+ throws ExecutorManagerException {
+ final String UPDATE_EXECUTABLE_FLOW_DATA =
+ "UPDATE execution_flows " +
+ "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? " +
+ "WHERE exec_id=?";
QueryRunner runner = new QueryRunner();
String json = JSONUtils.toJSON(flow.toObject());
@@ -132,36 +158,53 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
try {
- runner.update(connection, UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus().getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow.getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
+ runner.update(
+ connection,
+ UPDATE_EXECUTABLE_FLOW_DATA,
+ flow.getStatus().getNumVal(),
+ flow.getUpdateTime(),
+ flow.getStartTime(),
+ flow.getEndTime(),
+ encType.getNumVal(),
+ data,
+ flow.getExecutionId());
connection.commit();
}
- catch(SQLException e) {
+ catch (SQLException e) {
throw new ExecutorManagerException("Error updating flow.", e);
}
}
@Override
- public ExecutableFlow fetchExecutableFlow(int id) throws ExecutorManagerException {
+ public ExecutableFlow fetchExecutableFlow(int id)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler, id);
+ List<ExecutableFlow> properties = runner.query(
+ FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler, id);
return properties.get(0);
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching flow id " + id, e);
}
}
@Override
- public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
try {
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties = runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW, flowHandler);
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
+ runner.query(
+ FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+ flowHandler);
return properties;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@@ -174,7 +217,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
try {
int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
return count;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
}
}
@@ -189,7 +233,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
int count = runner.query(
IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
return count;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
}
}
@@ -204,7 +249,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
int count = runner.query(
IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
return count;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
}
}
@@ -224,7 +270,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
skip,
num);
return properties;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@@ -246,7 +293,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
skip,
num);
return properties;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@@ -265,7 +313,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
skip,
num);
return properties;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@@ -371,39 +420,56 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public void addActiveExecutableReference(ExecutionReference reference) throws ExecutorManagerException {
- final String INSERT = "INSERT INTO active_executing_flows (exec_id, host, port, update_time) values (?,?,?,?)";
+ public void addActiveExecutableReference(ExecutionReference reference)
+ throws ExecutorManagerException {
+ final String INSERT =
+ "INSERT INTO active_executing_flows " +
+ "(exec_id, host, port, update_time) values (?,?,?,?)";
QueryRunner runner = createQueryRunner();
try {
- runner.update(INSERT, reference.getExecId(), reference.getHost(), reference.getPort(), reference.getUpdateTime());
- } catch (SQLException e) {
- throw new ExecutorManagerException("Error updating active flow reference " + reference.getExecId(), e);
+ runner.update(
+ INSERT,
+ reference.getExecId(),
+ reference.getHost(),
+ reference.getPort(),
+ reference.getUpdateTime());
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error updating active flow reference " + reference.getExecId(), e);
}
}
@Override
- public void removeActiveExecutableReference(int execid) throws ExecutorManagerException {
+ public void removeActiveExecutableReference(int execid)
+ throws ExecutorManagerException {
final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
QueryRunner runner = createQueryRunner();
try {
runner.update(DELETE, execid);
- } catch (SQLException e) {
- throw new ExecutorManagerException("Error deleting active flow reference " + execid, e);
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error deleting active flow reference " + execid, e);
}
}
@Override
- public boolean updateExecutableReference(int execId, long updateTime) throws ExecutorManagerException {
- final String DELETE = "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
+ public boolean updateExecutableReference(int execId, long updateTime)
+ throws ExecutorManagerException {
+ final String DELETE =
+ "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
QueryRunner runner = createQueryRunner();
int updateNum = 0;
try {
updateNum = runner.update(DELETE, updateTime, execId);
- } catch (SQLException e) {
- throw new ExecutorManagerException("Error deleting active flow reference " + execId, e);
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error deleting active flow reference " + execId, e);
}
// Should be 1.
@@ -411,15 +477,21 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public void uploadExecutableNode(ExecutableNode node, Props inputProps) throws ExecutorManagerException {
- final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
+ public void uploadExecutableNode(ExecutableNode node, Props inputProps)
+ throws ExecutorManagerException {
+ final String INSERT_EXECUTION_NODE =
+ "INSERT INTO execution_jobs " +
+ "(exec_id, project_id, version, flow_id, job_id, start_time, " +
+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
byte[] inputParam = null;
if (inputProps != null) {
try {
- String jsonString = JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
+ String jsonString =
+ JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new ExecutorManagerException("Error encoding input params");
}
}
@@ -427,7 +499,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
ExecutableFlow flow = node.getExecutableFlow();
String flowId = flow.getFlowId();
- // if the main flow is not the parent, then we'll create a composite key for flowID
+ // If the main flow is not the parent, then we'll create a composite key
+ // for flowID.
if (flow != node.getParentFlow()) {
flowId = node.getParentFlow().getNestedId();
}
@@ -445,24 +518,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
node.getEndTime(),
node.getStatus().getNumVal(),
inputParam,
- node.getAttempt()
- );
+ node.getAttempt());
} catch (SQLException e) {
- throw new ExecutorManagerException("Error writing job " + node.getId(), e);
+ throw new ExecutorManagerException(
+ "Error writing job " + node.getId(), e);
}
}
@Override
- public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
- final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+ public void updateExecutableNode(ExecutableNode node)
+ throws ExecutorManagerException {
+ final String UPSERT_EXECUTION_NODE =
+ "UPDATE execution_jobs " +
+ "SET start_time=?, end_time=?, status=?, output_params=? " +
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
byte[] outputParam = null;
Props outputProps = node.getOutputProps();
if (outputProps != null) {
try {
- String jsonString = JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
+ String jsonString =
+ JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new ExecutorManagerException("Error encoding input params");
}
}
@@ -480,113 +559,163 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
node.getId(),
node.getAttempt());
} catch (SQLException e) {
- throw new ExecutorManagerException("Error updating job " + node.getId(), e);
+ throw new ExecutorManagerException(
+ "Error updating job " + node.getId(), e);
}
}
@Override
- public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
+ public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, new FetchExecutableJobHandler(), execId, jobId);
+ List<ExecutableJobInfo> info = runner.query(
+ FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
+ new FetchExecutableJobHandler(),
+ execId,
+ jobId);
if (info == null || info.isEmpty()) {
return null;
}
-
return info;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
@Override
- public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts) throws ExecutorManagerException {
+ public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE, new FetchExecutableJobHandler(), execId, jobId);
+ List<ExecutableJobInfo> info = runner.query(
+ FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
+ new FetchExecutableJobHandler(),
+ execId,
+ jobId);
if (info == null || info.isEmpty()) {
return null;
}
-
return info.get(0);
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
@Override
- public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
+ public Props fetchExecutionJobInputProps(int execId, String jobId)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+ Pair<Props, Props> props = runner.query(
+ FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(),
+ execId,
+ jobId);
return props.getFirst();
}
catch (SQLException e) {
- throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+ throw new ExecutorManagerException(
+ "Error querying job params " + execId + " " + jobId, e);
}
}
@Override
- public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
+ public Props fetchExecutionJobOutputProps(int execId, String jobId)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+ Pair<Props, Props> props = runner.query(
+ FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(),
+ execId,
+ jobId);
return props.getFirst();
}
catch (SQLException e) {
- throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+ throw new ExecutorManagerException(
+ "Error querying job params " + execId + " " + jobId, e);
}
}
@Override
- public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
+ public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+ Pair<Props, Props> props = runner.query(
+ FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(),
+ execId,
+ jobId);
return props;
}
catch (SQLException e) {
- throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+ throw new ExecutorManagerException(
+ "Error querying job params " + execId + " " + jobId, e);
}
}
@Override
- public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException {
+ public List<ExecutableJobInfo> fetchJobHistory(
+ int projectId, String jobId, int skip, int size)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE, new FetchExecutableJobHandler(), projectId, jobId, skip, size);
+ List<ExecutableJobInfo> info = runner.query(
+ FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
+ new FetchExecutableJobHandler(),
+ projectId,
+ jobId,
+ skip,
+ size);
if (info == null || info.isEmpty()) {
return null;
}
-
return info;
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
@Override
- public LogData fetchLogs(int execId, String name, int attempt, int startByte, int length) throws ExecutorManagerException {
+ public LogData fetchLogs(
+ int execId, String name, int attempt, int startByte, int length)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
-
try {
- LogData result = runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name, attempt, startByte, startByte + length);
+ LogData result = runner.query(
+ FetchLogsHandler.FETCH_LOGS,
+ handler,
+ execId,
+ name,
+ attempt,
+ startByte,
+ startByte + length);
return result;
- } catch (SQLException e) {
- throw new ExecutorManagerException("Error fetching logs " + execId + " : " + name, e);
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error fetching logs " + execId + " : " + name, e);
}
}
@Override
- public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException {
+ public void uploadLogFile(
+ int execId, String name, int attempt, File ... files)
+ throws ExecutorManagerException {
Connection connection = getConnection();
try {
- uploadLogFile(connection, execId, name, attempt, files, defaultEncodingType);
+ uploadLogFile(
+ connection, execId, name, attempt, files, defaultEncodingType);
connection.commit();
}
catch (SQLException e) {
@@ -600,7 +729,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
}
- private void uploadLogFile(Connection connection, int execId, String name, int attempt, File[] files, EncodingType encType) throws ExecutorManagerException, IOException {
+ private void uploadLogFile(
+ Connection connection,
+ int execId,
+ String name,
+ int attempt,
+ File[] files,
+ EncodingType encType) throws ExecutorManagerException, IOException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
byte[] buffer = new byte[50*1024];
@@ -617,7 +752,16 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
while (size >= 0) {
if (pos + size == buffer.length) {
// Flush here.
- uploadLogPart(connection, execId, name, attempt, startByte, startByte + buffer.length, encType, buffer, buffer.length);
+ uploadLogPart(
+ connection,
+ execId,
+ name,
+ attempt,
+ startByte,
+ startByte + buffer.length,
+ encType,
+ buffer,
+ buffer.length);
pos = 0;
length = buffer.length;
@@ -634,7 +778,16 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
// Final commit of buffer.
if (pos > 0) {
- uploadLogPart(connection, execId, name, attempt, startByte, startByte + pos, encType, buffer, pos);
+ uploadLogPart(
+ connection,
+ execId,
+ name,
+ attempt,
+ startByte,
+ startByte + pos,
+ encType,
+ buffer,
+ pos);
}
}
catch (SQLException e) {
@@ -646,13 +799,24 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
finally {
IOUtils.closeQuietly(bufferedStream);
}
-
}
- private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
- final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
- QueryRunner runner = new QueryRunner();
+ private void uploadLogPart(
+ Connection connection,
+ int execId,
+ String name,
+ int attempt,
+ int startByte,
+ int endByte,
+ EncodingType encType,
+ byte[] buffer,
+ int length) throws SQLException, IOException {
+ final String INSERT_EXECUTION_LOGS =
+ "INSERT INTO execution_logs " +
+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, " +
+ "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
+ QueryRunner runner = new QueryRunner();
byte[] buf = buffer;
if (encType == EncodingType.GZIP) {
buf = GZIPUtils.gzipBytes(buf, 0, length);
@@ -661,38 +825,49 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
buf = Arrays.copyOf(buffer, length);
}
- runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf, DateTime.now().getMillis());
+ runner.update(
+ connection,
+ INSERT_EXECUTION_LOGS,
+ execId,
+ name,
+ attempt,
+ encType.getNumVal(),
+ startByte,
+ startByte + length,
+ buf,
+ DateTime.now().getMillis());
}
private Connection getConnection() throws ExecutorManagerException {
Connection connection = null;
try {
connection = super.getDBConnection(false);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
DbUtils.closeQuietly(connection);
throw new ExecutorManagerException("Error getting DB connection.", e);
}
-
return connection;
}
private static class LastInsertID implements ResultSetHandler<Long> {
private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
-
@Override
public Long handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return -1l;
}
-
long id = rs.getLong(1);
return id;
}
-
}
private static class FetchLogsHandler implements ResultSetHandler<LogData> {
- private static String FETCH_LOGS = "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log FROM execution_logs WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? AND start_byte <= ? ORDER BY start_byte";
+ private static String FETCH_LOGS =
+ "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log " +
+ "FROM execution_logs " +
+ "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? " +
+ "AND start_byte <= ? ORDER BY start_byte";
private int startByte;
private int endByte;
@@ -721,8 +896,12 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
byte[] data = rs.getBytes(7);
- int offset = this.startByte > startByte ? this.startByte - startByte : 0;
- int length = this.endByte < endByte ? this.endByte - startByte - offset: endByte - startByte - offset;
+ int offset = this.startByte > startByte
+ ? this.startByte - startByte
+ : 0;
+ int length = this.endByte < endByte
+ ? this.endByte - startByte - offset
+ : endByte - startByte - offset;
try {
byte[] buffer = data;
if (encType == EncodingType.GZIP) {
@@ -730,22 +909,39 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
byteStream.write(buffer, offset, length);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new SQLException(e);
}
} while (rs.next());
byte[] buffer = byteStream.toByteArray();
- Pair<Integer,Integer> result = FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
+ Pair<Integer,Integer> result = FileIOUtils.getUtf8Range(
+ buffer, 0, buffer.length);
- return new LogData(startByte + result.getFirst(), result.getSecond(), new String(buffer, result.getFirst(), result.getSecond()));
+ return new LogData(
+ startByte + result.getFirst(),
+ result.getSecond(),
+ new String(buffer, result.getFirst(), result.getSecond()));
}
}
- private static class FetchExecutableJobHandler implements ResultSetHandler<List<ExecutableJobInfo>> {
- private static String FETCH_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=? AND attempt_id=?";
- private static String FETCH_EXECUTABLE_NODE_ATTEMPTS = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static String FETCH_PROJECT_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE project_id=? AND job_id=? ORDER BY exec_id DESC LIMIT ?, ? ";
+ private static class FetchExecutableJobHandler
+ implements ResultSetHandler<List<ExecutableJobInfo>> {
+ private static String FETCH_EXECUTABLE_NODE =
+ "SELECT exec_id, project_id, version, flow_id, job_id, " +
+ "start_time, end_time, status, attempt " +
+ "FROM execution_jobs WHERE exec_id=? " +
+ "AND job_id=? AND attempt_id=?";
+ private static String FETCH_EXECUTABLE_NODE_ATTEMPTS =
+ "SELECT exec_id, project_id, version, flow_id, job_id, " +
+ "start_time, end_time, status, attempt FROM execution_jobs " +
+ "WHERE exec_id=? AND job_id=?";
+ private static String FETCH_PROJECT_EXECUTABLE_NODE =
+ "SELECT exec_id, project_id, version, flow_id, job_id, " +
+ "start_time, end_time, status, attempt FROM execution_jobs " +
+ "WHERE project_id=? AND job_id=? " +
+ "ORDER BY exec_id DESC LIMIT ?, ? ";
@Override
public List<ExecutableJobInfo> handle(ResultSet rs) throws SQLException {
@@ -765,7 +961,16 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
Status status = Status.fromInteger(rs.getInt(8));
int attempt = rs.getInt(9);
- ExecutableJobInfo info = new ExecutableJobInfo(execId, projectId, version, flowId, jobId, startTime, endTime, status, attempt);
+ ExecutableJobInfo info = new ExecutableJobInfo(
+ execId,
+ projectId,
+ version,
+ flowId,
+ jobId,
+ startTime,
+ endTime,
+ status,
+ attempt);
execNodes.add(info);
} while (rs.next());
@@ -773,10 +978,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
}
- private static class FetchExecutableJobPropsHandler implements ResultSetHandler<Pair<Props, Props>> {
- private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params, output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static class FetchExecutableJobPropsHandler
+ implements ResultSetHandler<Pair<Props, Props>> {
+ private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
+ "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
+ "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
+ "SELECT input_params, output_params " +
+ "FROM execution_jobs WHERE exec_id=? AND job_id=?";
@SuppressWarnings("unchecked")
@Override
@@ -801,9 +1011,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
if (output != null) {
String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
outputProps = PropsUtils.fromHierarchicalMap(
- (Map<String, Object>)JSONUtils.parseJSONFromString(jsonOutputString));
+ (Map<String, Object>) JSONUtils.parseJSONFromString(jsonOutputString));
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new SQLException("Error decoding param data", e);
}
@@ -819,26 +1030,34 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
props = PropsUtils.fromHierarchicalMap(
(Map<String, Object>)JSONUtils.parseJSONFromString(jsonProps));
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new SQLException("Error decoding param data", e);
}
return new Pair<Props,Props>(props, null);
}
}
-
}
- private static class FetchActiveExecutableFlows implements ResultSetHandler<Map<Integer, Pair<ExecutionReference,ExecutableFlow>>> {
- private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, ax.host host, ax.port port, ax.update_time axUpdateTime FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+ private static class FetchActiveExecutableFlows
+ implements ResultSetHandler<Map<Integer, Pair<ExecutionReference,ExecutableFlow>>> {
+ private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data " +
+ "flow_data, ax.host host, ax.port port, ax.update_time " +
+ "axUpdateTime " +
+ "FROM execution_flows ex " +
+ "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
@Override
- public Map<Integer, Pair<ExecutionReference,ExecutableFlow>> handle(ResultSet rs) throws SQLException {
+ public Map<Integer, Pair<ExecutionReference,ExecutableFlow>> handle(ResultSet rs)
+ throws SQLException {
if (!rs.next()) {
return Collections.<Integer, Pair<ExecutionReference,ExecutableFlow>>emptyMap();
}
- Map<Integer, Pair<ExecutionReference,ExecutableFlow>> execFlows = new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
+ Map<Integer, Pair<ExecutionReference,ExecutableFlow>> execFlows =
+ new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
do {
int id = rs.getInt(1);
int encodingType = rs.getInt(2);
@@ -854,7 +1073,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
EncodingType encType = EncodingType.fromInteger(encodingType);
Object flowObj;
try {
- // Convoluted way to inflate strings. Should find common package or helper function.
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
@@ -865,12 +1085,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
ExecutionReference ref = new ExecutionReference(id, host, port);
ref.setUpdateTime(updateTime);
- execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref, exFlow));
- } catch (IOException e) {
+ execFlows.put(
+ id, new Pair<ExecutionReference, ExecutableFlow>(ref, exFlow));
+ }
+ catch (IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -878,7 +1101,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
return execFlows;
}
-
}
private static class FetchExecutableFlows
@@ -935,7 +1157,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
execFlows.add(exFlow);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -943,35 +1166,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
return execFlows;
}
-
}
private static class IntHandler implements ResultSetHandler<Integer> {
- private static String NUM_EXECUTIONS = "SELECT COUNT(1) FROM execution_flows";
- private static String NUM_FLOW_EXECUTIONS = "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
- private static String NUM_JOB_EXECUTIONS = "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+ private static String NUM_EXECUTIONS =
+ "SELECT COUNT(1) FROM execution_flows";
+ private static String NUM_FLOW_EXECUTIONS =
+ "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
+ private static String NUM_JOB_EXECUTIONS =
+ "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
@Override
public Integer handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return 0;
}
-
return rs.getInt(1);
}
}
@Override
- public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
- final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
+ public int removeExecutionLogsByTime(long millis)
+ throws ExecutorManagerException {
+ final String DELETE_BY_TIME =
+ "DELETE FROM execution_logs WHERE upload_time < ?";
QueryRunner runner = createQueryRunner();
int updateNum = 0;
try {
updateNum = runner.update(DELETE_BY_TIME, millis);
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
e.printStackTrace();
- throw new ExecutorManagerException("Error deleting old execution_logs before " + millis, e);
+ throw new ExecutorManagerException(
+ "Error deleting old execution_logs before " + millis, e);
}
return updateNum;
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 76919a1..7dcee5e 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -28,6 +28,7 @@
<script type="text/javascript" src="${context}/js/flowstats.js"></script>
<script type="text/javascript" src="${context}/js/flowstats-no-data.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban/view/flow-execution-list.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/exflow.js"></script>
<script type="text/javascript">
var contextURL = "${context}";