diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0e572a7..e2d6c6b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -16,7 +16,16 @@
package azkaban.executor;
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.metrics.CommonMetrics;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.BufferedInputStream;
@@ -28,6 +37,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -35,8 +45,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.time.Duration;
-
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@@ -45,16 +53,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import azkaban.database.AbstractJdbcLoader;
-import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.utils.FileIOUtils;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.GZIPUtils;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
-
@Singleton
public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutorLoader {
@@ -64,42 +62,42 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- public JdbcExecutorLoader(Props props, CommonMetrics commonMetrics) {
+ public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics) {
super(props, commonMetrics);
}
public EncodingType getDefaultEncodingType() {
- return defaultEncodingType;
+ return this.defaultEncodingType;
}
- public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ public void setDefaultEncodingType(final EncodingType defaultEncodingType) {
this.defaultEncodingType = defaultEncodingType;
}
@Override
- public synchronized void uploadExecutableFlow(ExecutableFlow flow)
+ public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- Connection connection = getConnection();
+ final Connection connection = getConnection();
try {
- uploadExecutableFlow(connection, flow, defaultEncodingType);
- } catch (IOException e) {
+ uploadExecutableFlow(connection, flow, this.defaultEncodingType);
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error uploading flow", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
- private synchronized void uploadExecutableFlow(Connection connection,
- ExecutableFlow flow, EncodingType encType)
+ private synchronized void uploadExecutableFlow(final Connection connection,
+ final ExecutableFlow flow, final EncodingType encType)
throws ExecutorManagerException, IOException {
final String INSERT_EXECUTABLE_FLOW =
"INSERT INTO execution_flows "
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
+ "values (?,?,?,?,?,?,?)";
- QueryRunner runner = new QueryRunner();
- long submitTime = System.currentTimeMillis();
+ final QueryRunner runner = new QueryRunner();
+ final long submitTime = System.currentTimeMillis();
- long id;
+ final long id;
try {
flow.setStatus(Status.PREPARING);
runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
@@ -118,41 +116,41 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
flow.setExecutionId((int) id);
updateExecutableFlow(connection, flow, encType);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error creating execution.", e);
}
}
@Override
- public void updateExecutableFlow(ExecutableFlow flow)
+ public void updateExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- Connection connection = this.getConnection();
+ final Connection connection = this.getConnection();
try {
- updateExecutableFlow(connection, flow, defaultEncodingType);
+ updateExecutableFlow(connection, flow, this.defaultEncodingType);
} finally {
DbUtils.closeQuietly(connection);
}
}
- private void updateExecutableFlow(Connection connection, ExecutableFlow flow,
- EncodingType encType) throws ExecutorManagerException {
+ private void updateExecutableFlow(final Connection connection, final ExecutableFlow flow,
+ final EncodingType encType) throws ExecutorManagerException {
final String UPDATE_EXECUTABLE_FLOW_DATA =
"UPDATE execution_flows "
+ "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? "
+ "WHERE exec_id=?";
- QueryRunner runner = new QueryRunner();
+ final QueryRunner runner = new QueryRunner();
- String json = JSONUtils.toJSON(flow.toObject());
+ final String json = JSONUtils.toJSON(flow.toObject());
byte[] data = null;
try {
- byte[] stringData = json.getBytes("UTF-8");
+ final byte[] stringData = json.getBytes("UTF-8");
data = stringData;
if (encType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error encoding the execution flow.");
}
@@ -161,19 +159,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
.getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow
.getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
connection.commit();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error updating flow.", e);
}
}
@Override
- public ExecutableFlow fetchExecutableFlow(int id)
+ public ExecutableFlow fetchExecutableFlow(final int id)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties =
+ final List<ExecutableFlow> properties =
runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
id);
if (properties.isEmpty()) {
@@ -181,7 +179,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
} else {
return properties.get(0);
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching flow id " + id, e);
}
}
@@ -194,15 +192,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+ final QueryRunner runner = createQueryRunner();
+ final FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
try {
- List<Pair<ExecutionReference, ExecutableFlow>> flows =
+ final List<Pair<ExecutionReference, ExecutableFlow>> flows =
runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
flowHandler);
return flows;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@@ -211,19 +209,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* maxAge indicates how long finished flows are shown in Recently Finished flow page.
*/
@Override
- public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+ public List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
+ final QueryRunner runner = createQueryRunner();
+ final FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
try {
- List<ExecutableFlow> flows =
+ final List<ExecutableFlow> flows =
runner.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
flowHandler, System.currentTimeMillis() - maxAge.toMillis(),
Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
Status.FAILED.getNumVal());
return flows;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching recently finished flows", e);
}
}
@@ -231,27 +229,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
+ final QueryRunner runner = createQueryRunner();
+ final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
try {
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
flowHandler);
return properties;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@Override
- public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+ public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
+ final QueryRunner runner = createQueryRunner();
+ final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
try {
- List<Pair<ExecutionReference, ExecutableFlow>> flows =
+ final List<Pair<ExecutionReference, ExecutableFlow>> flows =
runner.query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
flowHandler, execId);
if(flows.isEmpty()) {
@@ -260,111 +258,111 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
else {
return flows.get(0);
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows by exec id", e);
}
}
@Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
- IntHandler intHandler = new IntHandler();
+ final IntHandler intHandler = new IntHandler();
try {
- int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
+ final int count = runner.query(IntHandler.NUM_EXECUTIONS, intHandler);
return count;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
}
}
@Override
- public int fetchNumExecutableFlows(int projectId, String flowId)
+ public int fetchNumExecutableFlows(final int projectId, final String flowId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
- IntHandler intHandler = new IntHandler();
+ final IntHandler intHandler = new IntHandler();
try {
- int count =
+ final int count =
runner.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId,
flowId);
return count;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
}
}
@Override
- public int fetchNumExecutableNodes(int projectId, String jobId)
+ public int fetchNumExecutableNodes(final int projectId, final String jobId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
- IntHandler intHandler = new IntHandler();
+ final IntHandler intHandler = new IntHandler();
try {
- int count =
+ final int count =
runner.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId,
jobId);
return count;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
}
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
- int skip, int num) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+ public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final int skip, final int num) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties =
+ final List<ExecutableFlow> properties =
runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
flowHandler, projectId, flowId, skip, num);
return properties;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
- int skip, int num, Status status) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+ public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final int skip, final int num, final Status status) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties =
+ final List<ExecutableFlow> properties =
runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
flowHandler, projectId, flowId, status.getNumVal(), skip, num);
return properties;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(int skip, int num)
+ public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
- FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+ final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties =
+ final List<ExecutableFlow> properties =
runner.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
flowHandler, skip, num);
return properties;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(String projContain,
- String flowContains, String userNameContains, int status, long startTime,
- long endTime, int skip, int num) throws ExecutorManagerException {
+ public List<ExecutableFlow> fetchFlowHistory(final String projContain,
+ final String flowContains, final String userNameContains, final int status, final long startTime,
+ final long endTime, final int skip, final int num) throws ExecutorManagerException {
String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
- ArrayList<Object> params = new ArrayList<Object>();
+ final ArrayList<Object> params = new ArrayList<>();
boolean first = true;
if (projContain != null && !projContain.isEmpty()) {
@@ -435,59 +433,59 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
params.add(num);
}
- QueryRunner runner = createQueryRunner();
- FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties =
+ final List<ExecutableFlow> properties =
runner.query(query, flowHandler, params.toArray());
return properties;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
}
}
@Override
- public void addActiveExecutableReference(ExecutionReference reference)
+ public void addActiveExecutableReference(final ExecutionReference reference)
throws ExecutorManagerException {
final String INSERT =
"INSERT INTO active_executing_flows "
+ "(exec_id, update_time) values (?,?)";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(
"Error updating active flow reference " + reference.getExecId(), e);
}
}
@Override
- public void removeActiveExecutableReference(int execid)
+ public void removeActiveExecutableReference(final int execid)
throws ExecutorManagerException {
final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
runner.update(DELETE, execid);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(
"Error deleting active flow reference " + execid, e);
}
}
@Override
- public boolean updateExecutableReference(int execId, long updateTime)
+ public boolean updateExecutableReference(final int execId, final long updateTime)
throws ExecutorManagerException {
final String DELETE =
"UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
int updateNum = 0;
try {
updateNum = runner.update(DELETE, updateTime, execId);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(
"Error deleting active flow reference " + execId, e);
}
@@ -497,7 +495,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
@Override
- public void uploadExecutableNode(ExecutableNode node, Props inputProps)
+ public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
throws ExecutorManagerException {
final String INSERT_EXECUTION_NODE =
"INSERT INTO execution_jobs "
@@ -507,30 +505,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
byte[] inputParam = null;
if (inputProps != null) {
try {
- String jsonString =
+ final String jsonString =
JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error encoding input params");
}
}
- ExecutableFlow flow = node.getExecutableFlow();
- String flowId = node.getParentFlow().getFlowPath();
+ final ExecutableFlow flow = node.getExecutableFlow();
+ final String flowId = node.getParentFlow().getFlowPath();
System.out.println("Uploading flowId " + flowId);
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
inputParam, node.getAttempt());
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error writing job " + node.getId(), e);
}
}
@Override
- public void updateExecutableNode(ExecutableNode node)
+ public void updateExecutableNode(final ExecutableNode node)
throws ExecutorManagerException {
final String UPSERT_EXECUTION_NODE =
"UPDATE execution_jobs "
@@ -538,36 +536,36 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
byte[] outputParam = null;
- Props outputProps = node.getOutputProps();
+ final Props outputProps = node.getOutputProps();
if (outputProps != null) {
try {
- String jsonString =
+ final String jsonString =
JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error encoding input params");
}
}
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
.getEndTime(), node.getStatus().getNumVal(), outputParam, node
.getExecutableFlow().getExecutionId(), node.getParentFlow()
.getFlowPath(), node.getId(), node.getAttempt());
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error updating job " + node.getId(),
e);
}
}
@Override
- public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
+ public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- List<ExecutableJobInfo> info =
+ final List<ExecutableJobInfo> info =
runner.query(
FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
new FetchExecutableJobHandler(), execId, jobId);
@@ -575,122 +573,122 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return null;
}
return info;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
@Override
- public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts)
+ public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- List<ExecutableJobInfo> info =
+ final List<ExecutableJobInfo> info =
runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
new FetchExecutableJobHandler(), execId, jobId, attempts);
if (info == null || info.isEmpty()) {
return null;
}
return info.get(0);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
@Override
- public Props fetchExecutionJobInputProps(int execId, String jobId)
+ public Props fetchExecutionJobInputProps(final int execId, final String jobId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- Pair<Props, Props> props =
+ final Pair<Props, Props> props =
runner.query(
FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
new FetchExecutableJobPropsHandler(), execId, jobId);
return props.getFirst();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job params " + execId
+ " " + jobId, e);
}
}
@Override
- public Props fetchExecutionJobOutputProps(int execId, String jobId)
+ public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- Pair<Props, Props> props =
+ final Pair<Props, Props> props =
runner
.query(
FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
new FetchExecutableJobPropsHandler(), execId, jobId);
return props.getFirst();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job params " + execId
+ " " + jobId, e);
}
}
@Override
- public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
+ public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- Pair<Props, Props> props =
+ final Pair<Props, Props> props =
runner
.query(
FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
new FetchExecutableJobPropsHandler(), execId, jobId);
return props;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job params " + execId
+ " " + jobId, e);
}
}
@Override
- public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
- int skip, int size) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
+ final int skip, final int size) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
try {
- List<ExecutableJobInfo> info =
+ final List<ExecutableJobInfo> info =
runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
new FetchExecutableJobHandler(), projectId, jobId, skip, size);
if (info == null || info.isEmpty()) {
return null;
}
return info;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error querying job info " + jobId, e);
}
}
@Override
- public LogData fetchLogs(int execId, String name, int attempt, int startByte,
- int length) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ public LogData fetchLogs(final int execId, final String name, final int attempt, final int startByte,
+ final int length) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
- FetchLogsHandler handler =
+ final FetchLogsHandler handler =
new FetchLogsHandler(startByte, length + startByte);
try {
- LogData result =
+ final LogData result =
runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
attempt, startByte, startByte + length);
return result;
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching logs " + execId
+ " : " + name, e);
}
}
@Override
- public List<Object> fetchAttachments(int execId, String jobId, int attempt)
+ public List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- String attachments =
+ final String attachments =
runner
.query(
FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
@@ -699,51 +697,50 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return null;
}
- @SuppressWarnings("unchecked")
- List<Object> attachmentList =
+ final List<Object> attachmentList =
(List<Object>) JSONUtils.parseJSONFromString(attachments);
return attachmentList;
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException(
"Error converting job attachments to JSON " + jobId, e);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(
"Error query job attachments " + jobId, e);
}
}
@Override
- public void uploadLogFile(int execId, String name, int attempt, File... files)
+ public void uploadLogFile(final int execId, final String name, final int attempt, final File... files)
throws ExecutorManagerException {
- Connection connection = getConnection();
+ final Connection connection = getConnection();
try {
uploadLogFile(connection, execId, name, attempt, files,
- defaultEncodingType);
+ this.defaultEncodingType);
connection.commit();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error committing log", e);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error committing log", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
- private void uploadLogFile(Connection connection, int execId, String name,
- int attempt, File[] files, EncodingType encType)
+ private void uploadLogFile(final Connection connection, final int execId, final String name,
+ final int attempt, final File[] files, final EncodingType encType)
throws ExecutorManagerException, IOException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
- byte[] buffer = new byte[50 * 1024];
+ final byte[] buffer = new byte[50 * 1024];
int pos = 0;
int length = buffer.length;
int startByte = 0;
try {
for (int i = 0; i < files.length; ++i) {
- File file = files[i];
+ final File file = files[i];
- BufferedInputStream bufferedStream =
+ final BufferedInputStream bufferedStream =
new BufferedInputStream(new FileInputStream(file));
try {
int size = bufferedStream.read(buffer, pos, length);
@@ -773,22 +770,22 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
uploadLogPart(connection, execId, name, attempt, startByte, startByte
+ pos, encType, buffer, pos);
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error writing log part.", e);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error chunking", e);
}
}
- private void uploadLogPart(Connection connection, int execId, String name,
- int attempt, int startByte, int endByte, EncodingType encType,
- byte[] buffer, int length) throws SQLException, IOException {
+ private void uploadLogPart(final Connection connection, final int execId, final String name,
+ final int attempt, final int startByte, final int endByte, final EncodingType encType,
+ final byte[] buffer, final int length) throws SQLException, IOException {
final String INSERT_EXECUTION_LOGS =
"INSERT INTO execution_logs "
+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+ "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
- QueryRunner runner = new QueryRunner();
+ final QueryRunner runner = new QueryRunner();
byte[] buf = buffer;
if (encType == EncodingType.GZIP) {
buf = GZIPUtils.gzipBytes(buf, 0, length);
@@ -802,32 +799,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
@Override
- public void uploadAttachmentFile(ExecutableNode node, File file)
+ public void uploadAttachmentFile(final ExecutableNode node, final File file)
throws ExecutorManagerException {
- Connection connection = getConnection();
+ final Connection connection = getConnection();
try {
- uploadAttachmentFile(connection, node, file, defaultEncodingType);
+ uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
connection.commit();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error committing attachments ", e);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException("Error uploading attachments ", e);
} finally {
DbUtils.closeQuietly(connection);
}
}
- private void uploadAttachmentFile(Connection connection, ExecutableNode node,
- File file, EncodingType encType) throws SQLException, IOException {
+ private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
+ final File file, final EncodingType encType) throws SQLException, IOException {
- String jsonString = FileUtils.readFileToString(file);
- byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+ final String jsonString = FileUtils.readFileToString(file);
+ final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
"UPDATE execution_jobs " + "SET attachments=? "
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
- QueryRunner runner = new QueryRunner();
+ final QueryRunner runner = new QueryRunner();
runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
node.getExecutableFlow().getExecutionId(), node.getParentFlow()
.getNestedId(), node.getId(), node.getAttempt());
@@ -837,7 +834,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
Connection connection = null;
try {
connection = super.getDBConnection(false);
- } catch (Exception e) {
+ } catch (final Exception e) {
DbUtils.closeQuietly(connection);
throw new ExecutorManagerException("Error getting DB connection.", e);
}
@@ -853,14 +850,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
@Override
public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
try {
- List<Executor> executors =
+ final List<Executor> executors =
runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
return executors;
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExecutorManagerException("Error fetching executors", e);
}
}
@@ -873,15 +870,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
@Override
public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
try {
- List<Executor> executors =
+ final List<Executor> executors =
runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
executorHandler);
return executors;
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExecutorManagerException("Error fetching active executors", e);
}
}
@@ -892,13 +889,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
*/
@Override
- public Executor fetchExecutor(String host, int port)
+ public Executor fetchExecutor(final String host, final int port)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
try {
- List<Executor> executors =
+ final List<Executor> executors =
runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
executorHandler, host, port);
if (executors.isEmpty()) {
@@ -906,7 +903,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
} else {
return executors.get(0);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExecutorManagerException(String.format(
"Error fetching executor %s:%d", host, port), e);
}
@@ -918,12 +915,12 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
*/
@Override
- public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
try {
- List<Executor> executors =
+ final List<Executor> executors =
runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
executorHandler, executorId);
if (executors.isEmpty()) {
@@ -931,7 +928,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
} else {
return executors.get(0);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExecutorManagerException(String.format(
"Error fetching executor with id: %d", executorId), e);
}
@@ -943,20 +940,20 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#updateExecutor(int)
*/
@Override
- public void updateExecutor(Executor executor) throws ExecutorManagerException {
+ public void updateExecutor(final Executor executor) throws ExecutorManagerException {
final String UPDATE =
"UPDATE executors SET host=?, port=?, active=? where id=?";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- int rows =
+ final int rows =
runner.update(UPDATE, executor.getHost(), executor.getPort(),
executor.isActive(), executor.getId());
if (rows == 0) {
throw new ExecutorManagerException("No executor with id :"
+ executor.getId());
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error inactivating executor "
+ executor.getId(), e);
}
@@ -968,7 +965,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
*/
@Override
- public Executor addExecutor(String host, int port)
+ public Executor addExecutor(final String host, final int port)
throws ExecutorManagerException {
// verify, if executor already exists
Executor executor = fetchExecutor(host, port);
@@ -984,13 +981,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return executor;
}
- private void addExecutorHelper(String host, int port)
+ private void addExecutorHelper(final String host, final int port)
throws ExecutorManagerException {
final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
runner.update(INSERT, host, port);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(String.format("Error adding %s:%d ",
host, port), e);
}
@@ -1003,16 +1000,16 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#removeExecutor(String, int)
*/
@Override
- public void removeExecutor(String host, int port) throws ExecutorManagerException {
+ public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- int rows = runner.update(DELETE, host, port);
+ final int rows = runner.update(DELETE, host, port);
if (rows == 0) {
throw new ExecutorManagerException("No executor with host, port :"
+ "(" + host + "," + port + ")");
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error removing executor with host, port : "
+ "(" + host + "," + port + ")", e);
}
@@ -1026,17 +1023,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* java.lang.String)
*/
@Override
- public void postExecutorEvent(Executor executor, EventType type, String user,
- String message) throws ExecutorManagerException{
- QueryRunner runner = createQueryRunner();
+ public void postExecutorEvent(final Executor executor, final EventType type, final String user,
+ final String message) throws ExecutorManagerException{
+ final QueryRunner runner = createQueryRunner();
final String INSERT_PROJECT_EVENTS =
"INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
- Date updateDate = new Date();
+ final Date updateDate = new Date();
try {
runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
updateDate, user, message);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Failed to post executor event", e);
}
}
@@ -1048,17 +1045,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* int, int)
*/
@Override
- public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
- int offset) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
+ public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
+ final int offset) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
- ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+ final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
List<ExecutorLogEvent> events = null;
try {
events =
runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
logHandler, executor.getId(), num, offset);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(
"Failed to fetch events for executor id : " + executor.getId(), e);
}
@@ -1073,27 +1070,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
*/
@Override
- public void assignExecutor(int executorId, int executionId)
+ public void assignExecutor(final int executorId, final int executionId)
throws ExecutorManagerException {
final String UPDATE =
"UPDATE execution_flows SET executor_id=? where exec_id=?";
- QueryRunner runner = createQueryRunner();
+ final QueryRunner runner = createQueryRunner();
try {
- Executor executor = fetchExecutor(executorId);
+ final Executor executor = fetchExecutor(executorId);
if (executor == null) {
throw new ExecutorManagerException(String.format(
"Failed to assign non-existent executor Id: %d to execution : %d ",
executorId, executionId));
}
- int rows = runner.update(UPDATE, executorId, executionId);
+ final int rows = runner.update(UPDATE, executorId, executionId);
if (rows == 0) {
throw new ExecutorManagerException(String.format(
"Failed to assign executor Id: %d to non-existent execution : %d ",
executorId, executionId));
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error updating executor id "
+ executorId, e);
}
@@ -1106,75 +1103,116 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
* @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
*/
@Override
- public Executor fetchExecutorByExecutionId(int executionId)
+ public Executor fetchExecutorByExecutionId(final int executionId)
throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ final QueryRunner runner = createQueryRunner();
+ final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
Executor executor = null;
try {
- List<Executor> executors =
+ final List<Executor> executors =
runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
executorHandler, executionId);
if (executors.size() > 0) {
executor = executors.get(0);
}
- } catch (SQLException e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(
"Error fetching executor for exec_id : " + executionId, e);
}
return executor;
}
+ @Override
+ public int removeExecutionLogsByTime(final long millis)
+ throws ExecutorManagerException {
+ final String DELETE_BY_TIME =
+ "DELETE FROM execution_logs WHERE upload_time < ?";
+
+ final QueryRunner runner = createQueryRunner();
+ int updateNum = 0;
+ try {
+ updateNum = runner.update(DELETE_BY_TIME, millis);
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ throw new ExecutorManagerException(
+ "Error deleting old execution_logs before " + millis, e);
+ }
+
+ return updateNum;
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+ */
+ @Override
+ public void unassignExecutor(final int executionId) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+ final QueryRunner runner = createQueryRunner();
+ try {
+ final int rows = runner.update(UPDATE, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to unassign executor for execution : %d ", executionId));
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error updating execution id "
+ + executionId, e);
+ }
+ }
+
private static class LastInsertID implements ResultSetHandler<Long> {
- private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
+ private static final String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@Override
- public Long handle(ResultSet rs) throws SQLException {
+ public Long handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return -1L;
}
- long id = rs.getLong(1);
+ final long id = rs.getLong(1);
return id;
}
}
private static class FetchLogsHandler implements ResultSetHandler<LogData> {
- private static String FETCH_LOGS =
+ private static final String FETCH_LOGS =
"SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log "
+ "FROM execution_logs "
+ "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? "
+ "AND start_byte <= ? ORDER BY start_byte";
- private int startByte;
- private int endByte;
+ private final int startByte;
+ private final int endByte;
- public FetchLogsHandler(int startByte, int endByte) {
+ public FetchLogsHandler(final int startByte, final int endByte) {
this.startByte = startByte;
this.endByte = endByte;
}
@Override
- public LogData handle(ResultSet rs) throws SQLException {
+ public LogData handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return null;
}
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
do {
// int execId = rs.getInt(1);
// String name = rs.getString(2);
- @SuppressWarnings("unused")
- int attempt = rs.getInt(3);
- EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
- int startByte = rs.getInt(5);
- int endByte = rs.getInt(6);
+ final int attempt = rs.getInt(3);
+ final EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
+ final int startByte = rs.getInt(5);
+ final int endByte = rs.getInt(6);
- byte[] data = rs.getBytes(7);
+ final byte[] data = rs.getBytes(7);
- int offset =
+ final int offset =
this.startByte > startByte ? this.startByte - startByte : 0;
- int length =
+ final int length =
this.endByte < endByte ? this.endByte - startByte - offset
: endByte - startByte - offset;
try {
@@ -1184,56 +1222,56 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
byteStream.write(buffer, offset, length);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException(e);
}
} while (rs.next());
- byte[] buffer = byteStream.toByteArray();
- Pair<Integer, Integer> result =
+ final byte[] buffer = byteStream.toByteArray();
+ final Pair<Integer, Integer> result =
FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
- return new LogData(startByte + result.getFirst(), result.getSecond(),
+ return new LogData(this.startByte + result.getFirst(), result.getSecond(),
new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
}
}
private static class FetchExecutableJobHandler implements
ResultSetHandler<List<ExecutableJobInfo>> {
- private static String FETCH_EXECUTABLE_NODE =
+ private static final String FETCH_EXECUTABLE_NODE =
"SELECT exec_id, project_id, version, flow_id, job_id, "
+ "start_time, end_time, status, attempt "
+ "FROM execution_jobs WHERE exec_id=? "
+ "AND job_id=? AND attempt=?";
- private static String FETCH_EXECUTABLE_NODE_ATTEMPTS =
+ private static final String FETCH_EXECUTABLE_NODE_ATTEMPTS =
"SELECT exec_id, project_id, version, flow_id, job_id, "
+ "start_time, end_time, status, attempt FROM execution_jobs "
+ "WHERE exec_id=? AND job_id=?";
- private static String FETCH_PROJECT_EXECUTABLE_NODE =
+ private static final String FETCH_PROJECT_EXECUTABLE_NODE =
"SELECT exec_id, project_id, version, flow_id, job_id, "
+ "start_time, end_time, status, attempt FROM execution_jobs "
+ "WHERE project_id=? AND job_id=? "
+ "ORDER BY exec_id DESC LIMIT ?, ? ";
@Override
- public List<ExecutableJobInfo> handle(ResultSet rs) throws SQLException {
+ public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<ExecutableJobInfo> emptyList();
}
- List<ExecutableJobInfo> execNodes = new ArrayList<ExecutableJobInfo>();
+ final List<ExecutableJobInfo> execNodes = new ArrayList<>();
do {
- int execId = rs.getInt(1);
- int projectId = rs.getInt(2);
- int version = rs.getInt(3);
- String flowId = rs.getString(4);
- String jobId = rs.getString(5);
- long startTime = rs.getLong(6);
- long endTime = rs.getLong(7);
- Status status = Status.fromInteger(rs.getInt(8));
- int attempt = rs.getInt(9);
-
- ExecutableJobInfo info =
+ final int execId = rs.getInt(1);
+ final int projectId = rs.getInt(2);
+ final int version = rs.getInt(3);
+ final String flowId = rs.getString(4);
+ final String jobId = rs.getString(5);
+ final long startTime = rs.getLong(6);
+ final long endTime = rs.getLong(7);
+ final Status status = Status.fromInteger(rs.getInt(8));
+ final int attempt = rs.getInt(9);
+
+ final ExecutableJobInfo info =
new ExecutableJobInfo(execId, projectId, version, flowId, jobId,
startTime, endTime, status, attempt);
execNodes.add(info);
@@ -1245,19 +1283,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchExecutableJobAttachmentsHandler implements
ResultSetHandler<String> {
- private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+ private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
"SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
@Override
- public String handle(ResultSet rs) throws SQLException {
+ public String handle(final ResultSet rs) throws SQLException {
String attachmentsJson = null;
if (rs.next()) {
try {
- byte[] attachments = rs.getBytes(1);
+ final byte[] attachments = rs.getBytes(1);
if (attachments != null) {
attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error decoding job attachments", e);
}
}
@@ -1267,62 +1305,61 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchExecutableJobPropsHandler implements
ResultSetHandler<Pair<Props, Props>> {
- private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
+ private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
"SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
+ private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
"SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
+ private static final String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
"SELECT input_params, output_params "
+ "FROM execution_jobs WHERE exec_id=? AND job_id=?";
- @SuppressWarnings("unchecked")
@Override
- public Pair<Props, Props> handle(ResultSet rs) throws SQLException {
+ public Pair<Props, Props> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
- return new Pair<Props, Props>(null, null);
+ return new Pair<>(null, null);
}
if (rs.getMetaData().getColumnCount() > 1) {
- byte[] input = rs.getBytes(1);
- byte[] output = rs.getBytes(2);
+ final byte[] input = rs.getBytes(1);
+ final byte[] output = rs.getBytes(2);
Props inputProps = null;
Props outputProps = null;
try {
if (input != null) {
- String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
+ final String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
inputProps =
PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
.parseJSONFromString(jsonInputString));
}
if (output != null) {
- String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
+ final String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
outputProps =
PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
.parseJSONFromString(jsonOutputString));
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error decoding param data", e);
}
- return new Pair<Props, Props>(inputProps, outputProps);
+ return new Pair<>(inputProps, outputProps);
} else {
- byte[] params = rs.getBytes(1);
+ final byte[] params = rs.getBytes(1);
Props props = null;
try {
if (params != null) {
- String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
+ final String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
props =
PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
.parseJSONFromString(jsonProps));
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error decoding param data", e);
}
- return new Pair<Props, Props>(props, null);
+ return new Pair<>(props, null);
}
}
}
@@ -1333,47 +1370,47 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchQueuedExecutableFlows implements
ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
// Select queued unassigned flows
- private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+ private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
"SELECT exec_id, enc_type, flow_data FROM execution_flows"
+ " Where executor_id is NULL AND status = "
+ Status.PREPARING.getNumVal();
@Override
- public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
throws SQLException {
if (!rs.next()) {
return Collections.emptyList();
}
- List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
new ArrayList<>();
do {
- int id = rs.getInt(1);
- int encodingType = rs.getInt(2);
- byte[] data = rs.getBytes(3);
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
if (data == null) {
logger.error("Found a flow with empty data blob exec_id: " + id);
} else {
- EncodingType encType = EncodingType.fromInteger(encodingType);
- Object flowObj;
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
try {
// Convoluted way to inflate strings. Should find common package or
// helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
- String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
} else {
- String jsonString = new String(data, "UTF-8");
+ final String jsonString = new String(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow =
+ final ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
- ExecutionReference ref = new ExecutionReference(id);
+ final ExecutionReference ref = new ExecutionReference(id);
execFlows.add(new Pair<>(ref, exFlow));
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -1386,40 +1423,40 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchRecentlyFinishedFlows implements
ResultSetHandler<List<ExecutableFlow>> {
// Execution_flows table is already indexed by end_time
- private static String FETCH_RECENTLY_FINISHED_FLOW =
+ private static final String FETCH_RECENTLY_FINISHED_FLOW =
"SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ "WHERE end_time > ? AND status IN (?, ?, ?)";
@Override
public List<ExecutableFlow> handle(
- ResultSet rs) throws SQLException {
+ final ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.emptyList();
}
- List<ExecutableFlow> execFlows = new ArrayList<>();
+ final List<ExecutableFlow> execFlows = new ArrayList<>();
do {
- int id = rs.getInt(1);
- int encodingType = rs.getInt(2);
- byte[] data = rs.getBytes(3);
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
if (data != null) {
- EncodingType encType = EncodingType.fromInteger(encodingType);
- Object flowObj;
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
try {
if (encType == EncodingType.GZIP) {
- String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
} else {
- String jsonString = new String(data, "UTF-8");
+ final String jsonString = new String(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow =
+ final ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
execFlows.add(exFlow);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -1432,7 +1469,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
// Select running and executor assigned flows
- private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
+ private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ "et.port port, et.id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
@@ -1445,45 +1482,45 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
- ResultSet rs) throws SQLException {
+ final ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.emptyMap();
}
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
new HashMap<>();
do {
- int id = rs.getInt(1);
- int encodingType = rs.getInt(2);
- byte[] data = rs.getBytes(3);
- String host = rs.getString(4);
- int port = rs.getInt(5);
- int executorId = rs.getInt(6);
- boolean executorStatus = rs.getBoolean(7);
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+ final String host = rs.getString(4);
+ final int port = rs.getInt(5);
+ final int executorId = rs.getInt(6);
+ final boolean executorStatus = rs.getBoolean(7);
if (data == null) {
execFlows.put(id, null);
} else {
- EncodingType encType = EncodingType.fromInteger(encodingType);
- Object flowObj;
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
try {
// Convoluted way to inflate strings. Should find common package or
// helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
- String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
} else {
- String jsonString = new String(data, "UTF-8");
+ final String jsonString = new String(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow =
+ final ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
- Executor executor = new Executor(executorId, host, port, executorStatus);
- ExecutionReference ref = new ExecutionReference(id, executor);
+ final Executor executor = new Executor(executorId, host, port, executorStatus);
+ final ExecutionReference ref = new ExecutionReference(id, executor);
execFlows.put(id, new Pair<>(ref, exFlow));
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -1495,7 +1532,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchActiveExecutableFlowByExecId implements
ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
- private static String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
+ private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ "et.port port, et.id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
@@ -1507,43 +1544,43 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
+ Status.FAILED.getNumVal() + ")";
@Override
- public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
throws SQLException {
if (!rs.next()) {
return Collections.emptyList();
}
- List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
new ArrayList<>();
do {
- int id = rs.getInt(1);
- int encodingType = rs.getInt(2);
- byte[] data = rs.getBytes(3);
- String host = rs.getString(4);
- int port = rs.getInt(5);
- int executorId = rs.getInt(6);
- boolean executorStatus = rs.getBoolean(7);
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+ final String host = rs.getString(4);
+ final int port = rs.getInt(5);
+ final int executorId = rs.getInt(6);
+ final boolean executorStatus = rs.getBoolean(7);
if (data == null) {
logger.error("Found a flow with empty data blob exec_id: " + id);
} else {
- EncodingType encType = EncodingType.fromInteger(encodingType);
- Object flowObj;
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
try {
if (encType == EncodingType.GZIP) {
- String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
} else {
- String jsonString = new String(data, "UTF-8");
+ final String jsonString = new String(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow =
+ final ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
- Executor executor = new Executor(executorId, host, port, executorStatus);
- ExecutionReference ref = new ExecutionReference(id, executor);
+ final Executor executor = new Executor(executorId, host, port, executorStatus);
+ final ExecutionReference ref = new ExecutionReference(id, executor);
execFlows.add(new Pair<>(ref, exFlow));
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -1555,9 +1592,9 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private static class FetchExecutableFlows implements
ResultSetHandler<List<ExecutableFlow>> {
- private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
+ private static final String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
"SELECT exec_id, enc_type, flow_data FROM execution_flows ";
- private static String FETCH_EXECUTABLE_FLOW =
+ private static final String FETCH_EXECUTABLE_FLOW =
"SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ "WHERE exec_id=?";
// private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
@@ -1565,49 +1602,49 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
// +
// "FROM execution_flows ex " +
// "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
- private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
+ private static final String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
"SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ "ORDER BY exec_id DESC LIMIT ?, ?";
- private static String FETCH_EXECUTABLE_FLOW_HISTORY =
+ private static final String FETCH_EXECUTABLE_FLOW_HISTORY =
"SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ "WHERE project_id=? AND flow_id=? "
+ "ORDER BY exec_id DESC LIMIT ?, ?";
- private static String FETCH_EXECUTABLE_FLOW_BY_STATUS =
+ private static final String FETCH_EXECUTABLE_FLOW_BY_STATUS =
"SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ "WHERE project_id=? AND flow_id=? AND status=? "
+ "ORDER BY exec_id DESC LIMIT ?, ?";
@Override
- public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
+ public List<ExecutableFlow> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<ExecutableFlow> emptyList();
}
- List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
+ final List<ExecutableFlow> execFlows = new ArrayList<>();
do {
- int id = rs.getInt(1);
- int encodingType = rs.getInt(2);
- byte[] data = rs.getBytes(3);
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
if (data != null) {
- EncodingType encType = EncodingType.fromInteger(encodingType);
- Object flowObj;
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
try {
// Convoluted way to inflate strings. Should find common package
// or helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
- String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
} else {
- String jsonString = new String(data, "UTF-8");
+ final String jsonString = new String(data, "UTF-8");
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow =
+ final ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
execFlows.add(exFlow);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
}
@@ -1618,17 +1655,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
private static class IntHandler implements ResultSetHandler<Integer> {
- private static String NUM_EXECUTIONS =
+ private static final String NUM_EXECUTIONS =
"SELECT COUNT(1) FROM execution_flows";
- private static String NUM_FLOW_EXECUTIONS =
+ private static final String NUM_FLOW_EXECUTIONS =
"SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
- private static String NUM_JOB_EXECUTIONS =
+ private static final String NUM_JOB_EXECUTIONS =
"SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
- private static String FETCH_EXECUTOR_ID =
+ private static final String FETCH_EXECUTOR_ID =
"SELECT executor_id FROM execution_flows WHERE exec_id=?";
@Override
- public Integer handle(ResultSet rs) throws SQLException {
+ public Integer handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return 0;
}
@@ -1636,56 +1673,37 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- @Override
- public int removeExecutionLogsByTime(long millis)
- throws ExecutorManagerException {
- final String DELETE_BY_TIME =
- "DELETE FROM execution_logs WHERE upload_time < ?";
-
- QueryRunner runner = createQueryRunner();
- int updateNum = 0;
- try {
- updateNum = runner.update(DELETE_BY_TIME, millis);
- } catch (SQLException e) {
- e.printStackTrace();
- throw new ExecutorManagerException(
- "Error deleting old execution_logs before " + millis, e);
- }
-
- return updateNum;
- }
-
/**
* JDBC ResultSetHandler to fetch records from executors table
*/
private static class FetchExecutorHandler implements
ResultSetHandler<List<Executor>> {
- private static String FETCH_ALL_EXECUTORS =
+ private static final String FETCH_ALL_EXECUTORS =
"SELECT id, host, port, active FROM executors";
- private static String FETCH_ACTIVE_EXECUTORS =
+ private static final String FETCH_ACTIVE_EXECUTORS =
"SELECT id, host, port, active FROM executors where active=true";
- private static String FETCH_EXECUTOR_BY_ID =
+ private static final String FETCH_EXECUTOR_BY_ID =
"SELECT id, host, port, active FROM executors where id=?";
- private static String FETCH_EXECUTOR_BY_HOST_PORT =
+ private static final String FETCH_EXECUTOR_BY_HOST_PORT =
"SELECT id, host, port, active FROM executors where host=? AND port=?";
- private static String FETCH_EXECUTION_EXECUTOR =
+ private static final String FETCH_EXECUTION_EXECUTOR =
"SELECT ex.id, ex.host, ex.port, ex.active FROM "
+ " executors ex INNER JOIN execution_flows ef "
+ "on ex.id = ef.executor_id where exec_id=?";
@Override
- public List<Executor> handle(ResultSet rs) throws SQLException {
+ public List<Executor> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<Executor> emptyList();
}
- List<Executor> executors = new ArrayList<Executor>();
+ final List<Executor> executors = new ArrayList<>();
do {
- int id = rs.getInt(1);
- String host = rs.getString(2);
- int port = rs.getInt(3);
- boolean active = rs.getBoolean(4);
- Executor executor = new Executor(id, host, port, active);
+ final int id = rs.getInt(1);
+ final String host = rs.getString(2);
+ final int port = rs.getInt(3);
+ final boolean active = rs.getBoolean(4);
+ final Executor executor = new Executor(id, host, port, active);
executors.add(executor);
} while (rs.next());
@@ -1698,25 +1716,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
*/
private static class ExecutorLogsResultHandler implements
ResultSetHandler<List<ExecutorLogEvent>> {
- private static String SELECT_EXECUTOR_EVENTS_ORDER =
+ private static final String SELECT_EXECUTOR_EVENTS_ORDER =
"SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+ " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
@Override
- public List<ExecutorLogEvent> handle(ResultSet rs) throws SQLException {
+ public List<ExecutorLogEvent> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<ExecutorLogEvent> emptyList();
}
- ArrayList<ExecutorLogEvent> events = new ArrayList<ExecutorLogEvent>();
+ final ArrayList<ExecutorLogEvent> events = new ArrayList<>();
do {
- int executorId = rs.getInt(1);
- int eventType = rs.getInt(2);
- Date eventTime = rs.getDate(3);
- String username = rs.getString(4);
- String message = rs.getString(5);
+ final int executorId = rs.getInt(1);
+ final int eventType = rs.getInt(2);
+ final Date eventTime = rs.getDate(3);
+ final String username = rs.getString(4);
+ final String message = rs.getString(5);
- ExecutorLogEvent event =
+ final ExecutorLogEvent event =
new ExecutorLogEvent(executorId, username, eventTime,
EventType.fromInteger(eventType), message);
events.add(event);
@@ -1725,27 +1743,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return events;
}
}
-
- /**
- *
- * {@inheritDoc}
- * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
- */
- @Override
- public void unassignExecutor(int executionId) throws ExecutorManagerException {
- final String UPDATE =
- "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
-
- QueryRunner runner = createQueryRunner();
- try {
- int rows = runner.update(UPDATE, executionId);
- if (rows == 0) {
- throw new ExecutorManagerException(String.format(
- "Failed to unassign executor for execution : %d ", executionId));
- }
- } catch (SQLException e) {
- throw new ExecutorManagerException("Error updating execution id "
- + executionId, e);
- }
- }
}