Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
new file mode 100644
index 0000000..0fdc1fc
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+
+@Singleton
+public class ActiveExecutingFlowsDao extends AbstractJdbcLoader{
+
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ ActiveExecutingFlowsDao(final Props props, final CommonMetrics commonMetrics,
+ final DatabaseOperator dbOperator) {
+ super(props, commonMetrics);
+ this.dbOperator = dbOperator;
+ }
+
+ void addActiveExecutableReference(final ExecutionReference reference)
+ throws ExecutorManagerException {
+ final String INSERT =
+ "INSERT INTO active_executing_flows "
+ + "(exec_id, update_time) values (?,?)";
+ final QueryRunner runner = createQueryRunner();
+
+ try {
+ runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(
+ "Error updating active flow reference " + reference.getExecId(), e);
+ }
+ }
+
+ void removeActiveExecutableReference(final int execid)
+ throws ExecutorManagerException {
+ final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
+
+ final QueryRunner runner = createQueryRunner();
+ try {
+ runner.update(DELETE, execid);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(
+ "Error deleting active flow reference " + execid, e);
+ }
+ }
+
+ boolean updateExecutableReference(final int execId, final long updateTime)
+ throws ExecutorManagerException {
+ final String DELETE =
+ "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
+
+ final QueryRunner runner = createQueryRunner();
+ int updateNum = 0;
+ try {
+ updateNum = runner.update(DELETE, updateTime, execId);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(
+ "Error deleting active flow reference " + execId, e);
+ }
+
+ // Should be 1.
+ return updateNum > 0;
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
new file mode 100644
index 0000000..5c226c1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class ExecutionJobDao extends AbstractJdbcLoader{
+
+ private static final Logger logger = Logger.getLogger(ExecutorDao.class);
+ private final DatabaseOperator dbOperator;
+ private final EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ @Inject
+ ExecutionJobDao(final Props props, final CommonMetrics commonMetrics,
+ final DatabaseOperator databaseOperator) {
+ super(props, commonMetrics);
+ this.dbOperator = databaseOperator;
+ }
+
+ void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
+ throws ExecutorManagerException {
+ final String INSERT_EXECUTION_NODE =
+ "INSERT INTO execution_jobs "
+ + "(exec_id, project_id, version, flow_id, job_id, start_time, "
+ + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
+
+ byte[] inputParam = null;
+ if (inputProps != null) {
+ try {
+ final String jsonString =
+ JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
+ inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
+ } catch (final IOException e) {
+ throw new ExecutorManagerException("Error encoding input params");
+ }
+ }
+
+ final ExecutableFlow flow = node.getExecutableFlow();
+ final String flowId = node.getParentFlow().getFlowPath();
+ System.out.println("Uploading flowId " + flowId);
+ final QueryRunner runner = createQueryRunner();
+ try {
+ runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
+ flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
+ node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
+ inputParam, node.getAttempt());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error writing job " + node.getId(), e);
+ }
+ }
+
+ void updateExecutableNode(final ExecutableNode node)
+ throws ExecutorManagerException {
+ final String UPSERT_EXECUTION_NODE =
+ "UPDATE execution_jobs "
+ + "SET start_time=?, end_time=?, status=?, output_params=? "
+ + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+ byte[] outputParam = null;
+ final Props outputProps = node.getOutputProps();
+ if (outputProps != null) {
+ try {
+ final String jsonString =
+ JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
+ outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
+ } catch (final IOException e) {
+ throw new ExecutorManagerException("Error encoding input params");
+ }
+ }
+
+ final QueryRunner runner = createQueryRunner();
+ try {
+ runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
+ .getEndTime(), node.getStatus().getNumVal(), outputParam, node
+ .getExecutableFlow().getExecutionId(), node.getParentFlow()
+ .getFlowPath(), node.getId(), node.getAttempt());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error updating job " + node.getId(),
+ e);
+ }
+ }
+
+ List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ try {
+ final List<ExecutableJobInfo> info =
+ runner.query(
+ FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
+ new FetchExecutableJobHandler(), execId, jobId);
+ if (info == null || info.isEmpty()) {
+ return null;
+ }
+ return info;
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error querying job info " + jobId, e);
+ }
+ }
+
+ ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ try {
+ final List<ExecutableJobInfo> info =
+ runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
+ new FetchExecutableJobHandler(), execId, jobId, attempts);
+ if (info == null || info.isEmpty()) {
+ return null;
+ }
+ return info.get(0);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error querying job info " + jobId, e);
+ }
+ }
+
+ Props fetchExecutionJobInputProps(final int execId, final String jobId)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ try {
+ final Pair<Props, Props> props =
+ runner.query(
+ FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(), execId, jobId);
+ return props.getFirst();
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error querying job params " + execId
+ + " " + jobId, e);
+ }
+ }
+
+ Props fetchExecutionJobOutputProps(final int execId, final String jobId)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ try {
+ final Pair<Props, Props> props =
+ runner
+ .query(
+ FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(), execId, jobId);
+ return props.getFirst();
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error querying job params " + execId
+ + " " + jobId, e);
+ }
+ }
+
+ Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ try {
+ final Pair<Props, Props> props = runner .query(
+ FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
+ new FetchExecutableJobPropsHandler(), execId, jobId);
+ return props;
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error querying job params " + execId
+ + " " + jobId, e);
+ }
+ }
+
+ List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
+ final int skip, final int size)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ try {
+ final List<ExecutableJobInfo> info =
+ runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
+ new FetchExecutableJobHandler(), projectId, jobId, skip, size);
+ if (info == null || info.isEmpty()) {
+ return null;
+ }
+ return info;
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error querying job info " + jobId, e);
+ }
+ }
+
+ List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ try {
+ final String attachments =
+ runner
+ .query(
+ FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+ new FetchExecutableJobAttachmentsHandler(), execId, jobId);
+ if (attachments == null) {
+ return null;
+ }
+
+ final List<Object> attachmentList =
+ (List<Object>) JSONUtils.parseJSONFromString(attachments);
+
+ return attachmentList;
+ } catch (final IOException e) {
+ throw new ExecutorManagerException(
+ "Error converting job attachments to JSON " + jobId, e);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(
+ "Error query job attachments " + jobId, e);
+ }
+ }
+
+ void uploadAttachmentFile(final ExecutableNode node, final File file)
+ throws ExecutorManagerException {
+ final Connection connection = getConnection();
+ try {
+ uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
+ connection.commit();
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error committing attachments ", e);
+ } catch (final IOException e) {
+ throw new ExecutorManagerException("Error uploading attachments ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
+ final File file, final EncodingType encType) throws SQLException, IOException {
+
+ final String jsonString = FileUtils.readFileToString(file);
+ final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+ final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
+ "UPDATE execution_jobs " + "SET attachments=? "
+ + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+ final QueryRunner runner = new QueryRunner();
+ runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
+ node.getExecutableFlow().getExecutionId(), node.getParentFlow()
+ .getNestedId(), node.getId(), node.getAttempt());
+ }
+
+ /**
+ * TODO kunkun-tang: Will be removed during the next refactor.
+ */
+ private Connection getConnection() throws ExecutorManagerException {
+ Connection connection = null;
+ try {
+ connection = super.getDBConnection(false);
+ } catch (final Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new ExecutorManagerException("Error getting DB connection.", e);
+ }
+ return connection;
+ }
+
+ private static class FetchExecutableJobHandler implements
+ ResultSetHandler<List<ExecutableJobInfo>> {
+ private static final String FETCH_EXECUTABLE_NODE =
+ "SELECT exec_id, project_id, version, flow_id, job_id, "
+ + "start_time, end_time, status, attempt "
+ + "FROM execution_jobs WHERE exec_id=? "
+ + "AND job_id=? AND attempt=?";
+ private static final String FETCH_EXECUTABLE_NODE_ATTEMPTS =
+ "SELECT exec_id, project_id, version, flow_id, job_id, "
+ + "start_time, end_time, status, attempt FROM execution_jobs "
+ + "WHERE exec_id=? AND job_id=?";
+ private static final String FETCH_PROJECT_EXECUTABLE_NODE =
+ "SELECT exec_id, project_id, version, flow_id, job_id, "
+ + "start_time, end_time, status, attempt FROM execution_jobs "
+ + "WHERE project_id=? AND job_id=? "
+ + "ORDER BY exec_id DESC LIMIT ?, ? ";
+
+ @Override
+ public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<ExecutableJobInfo> emptyList();
+ }
+
+ final List<ExecutableJobInfo> execNodes = new ArrayList<>();
+ do {
+ final int execId = rs.getInt(1);
+ final int projectId = rs.getInt(2);
+ final int version = rs.getInt(3);
+ final String flowId = rs.getString(4);
+ final String jobId = rs.getString(5);
+ final long startTime = rs.getLong(6);
+ final long endTime = rs.getLong(7);
+ final Status status = Status.fromInteger(rs.getInt(8));
+ final int attempt = rs.getInt(9);
+
+ final ExecutableJobInfo info =
+ new ExecutableJobInfo(execId, projectId, version, flowId, jobId,
+ startTime, endTime, status, attempt);
+ execNodes.add(info);
+ } while (rs.next());
+
+ return execNodes;
+ }
+ }
+
+ private static class FetchExecutableJobPropsHandler implements
+ ResultSetHandler<Pair<Props, Props>> {
+ private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
+ "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
+ "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static final String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
+ "SELECT input_params, output_params "
+ + "FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+ @Override
+ public Pair<Props, Props> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return new Pair<>(null, null);
+ }
+
+ if (rs.getMetaData().getColumnCount() > 1) {
+ final byte[] input = rs.getBytes(1);
+ final byte[] output = rs.getBytes(2);
+
+ Props inputProps = null;
+ Props outputProps = null;
+ try {
+ if (input != null) {
+ final String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
+ inputProps =
+ PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
+ .parseJSONFromString(jsonInputString));
+
+ }
+ if (output != null) {
+ final String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
+ outputProps =
+ PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
+ .parseJSONFromString(jsonOutputString));
+ }
+ } catch (final IOException e) {
+ throw new SQLException("Error decoding param data", e);
+ }
+
+ return new Pair<>(inputProps, outputProps);
+ } else {
+ final byte[] params = rs.getBytes(1);
+ Props props = null;
+ try {
+ if (params != null) {
+ final String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
+
+ props =
+ PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
+ .parseJSONFromString(jsonProps));
+ }
+ } catch (final IOException e) {
+ throw new SQLException("Error decoding param data", e);
+ }
+
+ return new Pair<>(props, null);
+ }
+ }
+ }
+
+ private static class FetchExecutableJobAttachmentsHandler implements
+ ResultSetHandler<String> {
+ private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+ "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+ @Override
+ public String handle(final ResultSet rs) throws SQLException {
+ String attachmentsJson = null;
+ if (rs.next()) {
+ try {
+ final byte[] attachments = rs.getBytes(1);
+ if (attachments != null) {
+ attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
+ }
+ } catch (final IOException e) {
+ throw new SQLException("Error decoding job attachments", e);
+ }
+ }
+ return attachmentsJson;
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
new file mode 100644
index 0000000..da96fc9
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+
+@Singleton
+public class ExecutionLogsDao extends AbstractJdbcLoader{
+
+ private final DatabaseOperator dbOperator;
+ private final EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ @Inject
+ ExecutionLogsDao(final Props props, final CommonMetrics commonMetrics,
+ final DatabaseOperator dbOperator) {
+ super(props, commonMetrics);
+ this.dbOperator = dbOperator;
+ }
+
+ LogData fetchLogs(final int execId, final String name, final int attempt,
+ final int startByte,
+ final int length) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ final FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
+ try {
+ final LogData result =
+ runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
+ attempt, startByte, startByte + length);
+ return result;
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching logs " + execId
+ + " : " + name, e);
+ }
+ }
+
+ void uploadLogFile(final int execId, final String name, final int attempt,
+ final File... files)
+ throws ExecutorManagerException {
+ final Connection connection = getConnection();
+ try {
+ uploadLogFile(connection, execId, name, attempt, files,
+ getDefaultEncodingType());
+ connection.commit();
+ } catch (final SQLException | IOException e) {
+ throw new ExecutorManagerException("Error committing log", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ void uploadLogFile(final Connection connection, final int execId, final String name,
+ final int attempt, final File[] files, final EncodingType encType)
+ throws ExecutorManagerException, IOException {
+ // 50K buffer... if logs are greater than this, we chunk.
+ // However, we better prevent large log files from being uploaded somehow
+ final byte[] buffer = new byte[50 * 1024];
+ int pos = 0;
+ int length = buffer.length;
+ int startByte = 0;
+ try {
+ for (int i = 0; i < files.length; ++i) {
+ final File file = files[i];
+
+ final BufferedInputStream bufferedStream =
+ new BufferedInputStream(new FileInputStream(file));
+ try {
+ int size = bufferedStream.read(buffer, pos, length);
+ while (size >= 0) {
+ if (pos + size == buffer.length) {
+ // Flush here.
+ uploadLogPart(connection, execId, name, attempt, startByte,
+ startByte + buffer.length, encType, buffer, buffer.length);
+
+ pos = 0;
+ length = buffer.length;
+ startByte += buffer.length;
+ } else {
+ // Usually end of file.
+ pos += size;
+ length = buffer.length - pos;
+ }
+ size = bufferedStream.read(buffer, pos, length);
+ }
+ } finally {
+ IOUtils.closeQuietly(bufferedStream);
+ }
+ }
+
+ // Final commit of buffer.
+ if (pos > 0) {
+ uploadLogPart(connection, execId, name, attempt, startByte, startByte
+ + pos, encType, buffer, pos);
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error writing log part.", e);
+ } catch (final IOException e) {
+ throw new ExecutorManagerException("Error chunking", e);
+ }
+ }
+
+ int removeExecutionLogsByTime(final long millis)
+ throws ExecutorManagerException {
+ final String DELETE_BY_TIME =
+ "DELETE FROM execution_logs WHERE upload_time < ?";
+
+ final QueryRunner runner = this.createQueryRunner();
+ int updateNum = 0;
+ try {
+ updateNum = runner.update(DELETE_BY_TIME, millis);
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ throw new ExecutorManagerException(
+ "Error deleting old execution_logs before " + millis, e);
+ }
+
+ return updateNum;
+ }
+
+ // TODO kunkun-tang: Will be removed in the future refactor.
+ private EncodingType getDefaultEncodingType() {
+ return this.defaultEncodingType;
+ }
+
+ private void uploadLogPart(final Connection connection, final int execId, final String name,
+ final int attempt, final int startByte, final int endByte,
+ final EncodingType encType,
+ final byte[] buffer, final int length) throws SQLException, IOException {
+ final String INSERT_EXECUTION_LOGS =
+ "INSERT INTO execution_logs "
+ + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+ + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
+
+ final QueryRunner runner = new QueryRunner();
+ byte[] buf = buffer;
+ if (encType == EncodingType.GZIP) {
+ buf = GZIPUtils.gzipBytes(buf, 0, length);
+ } else if (length < buf.length) {
+ buf = Arrays.copyOf(buffer, length);
+ }
+
+ runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt,
+ encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
+ .getMillis());
+ }
+
+ // TODO kunkun-tang: should be removed in future refactor.
+ private Connection getConnection() throws ExecutorManagerException {
+ Connection connection = null;
+ try {
+ connection = super.getDBConnection(false);
+ } catch (final Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new ExecutorManagerException("Error getting DB connection.", e);
+ }
+ return connection;
+ }
+
+ private static class FetchLogsHandler implements ResultSetHandler<LogData> {
+
+ private static final String FETCH_LOGS =
+ "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log "
+ + "FROM execution_logs "
+ + "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? "
+ + "AND start_byte <= ? ORDER BY start_byte";
+
+ private final int startByte;
+ private final int endByte;
+
+ FetchLogsHandler(final int startByte, final int endByte) {
+ this.startByte = startByte;
+ this.endByte = endByte;
+ }
+
+ @Override
+ public LogData handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return null;
+ }
+
+ final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+ do {
+ // int execId = rs.getInt(1);
+ // String name = rs.getString(2);
+ final int attempt = rs.getInt(3);
+ final EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
+ final int startByte = rs.getInt(5);
+ final int endByte = rs.getInt(6);
+
+ final byte[] data = rs.getBytes(7);
+
+ final int offset =
+ this.startByte > startByte ? this.startByte - startByte : 0;
+ final int length =
+ this.endByte < endByte ? this.endByte - startByte - offset
+ : endByte - startByte - offset;
+ try {
+ byte[] buffer = data;
+ if (encType == EncodingType.GZIP) {
+ buffer = GZIPUtils.unGzipBytes(data);
+ }
+
+ byteStream.write(buffer, offset, length);
+ } catch (final IOException e) {
+ throw new SQLException(e);
+ }
+ } while (rs.next());
+
+ final byte[] buffer = byteStream.toByteArray();
+ final Pair<Integer, Integer> result =
+ FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
+
+ return new LogData(this.startByte + result.getFirst(), result.getSecond(),
+ new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
new file mode 100644
index 0000000..6c7f44d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.executor.ExecutorLogEvent.EventType;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+@Singleton
+public class ExecutorEventsDao extends AbstractJdbcLoader {
+
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ public ExecutorEventsDao(final Props props, final CommonMetrics commonMetrics,
+ final DatabaseOperator dbOperator) {
+ super(props, commonMetrics);
+ this.dbOperator = dbOperator;
+ }
+
+ public void postExecutorEvent(final Executor executor, final EventType type, final String user,
+ final String message) throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ final String INSERT_PROJECT_EVENTS =
+ "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+ final Date updateDate = new Date();
+ try {
+ runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+ updateDate, user, message);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Failed to post executor event", e);
+ }
+ }
+
+ public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
+ final int offset)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+
+ final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+ List<ExecutorLogEvent> events = null;
+ try {
+ events =
+ runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+ logHandler, executor.getId(), num, offset);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException(
+ "Failed to fetch events for executor id : " + executor.getId(), e);
+ }
+
+ return events;
+ }
+
+ /**
+ * JDBC ResultSetHandler to fetch records from executor_events table
+ */
+ private static class ExecutorLogsResultHandler implements
+ ResultSetHandler<List<ExecutorLogEvent>> {
+
+ private static final String SELECT_EXECUTOR_EVENTS_ORDER =
+ "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+ + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
+
+ @Override
+ public List<ExecutorLogEvent> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<ExecutorLogEvent>emptyList();
+ }
+
+ final ArrayList<ExecutorLogEvent> events = new ArrayList<>();
+ do {
+ final int executorId = rs.getInt(1);
+ final int eventType = rs.getInt(2);
+ final Date eventTime = rs.getDate(3);
+ final String username = rs.getString(4);
+ final String message = rs.getString(5);
+
+ final ExecutorLogEvent event =
+ new ExecutorLogEvent(executorId, username, eventTime,
+ EventType.fromInteger(eventType), message);
+ events.add(event);
+ } while (rs.next());
+
+ return events;
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
new file mode 100644
index 0000000..d1ba681
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class FetchActiveFlowDao extends AbstractJdbcLoader {
+
+ private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ public FetchActiveFlowDao(final Props props, final CommonMetrics commonMetrics,
+ final DatabaseOperator dbOperator) {
+ super(props, commonMetrics);
+ this.dbOperator = dbOperator;
+ }
+
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
+
+ try {
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
+ runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+ flowHandler);
+ return properties;
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
+ Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
+ throws ExecutorManagerException {
+ final QueryRunner runner = createQueryRunner();
+ final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
+
+ try {
+ final List<Pair<ExecutionReference, ExecutableFlow>> flows =
+ runner.query(
+ FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
+ flowHandler, execId);
+ if (flows.isEmpty()) {
+ return null;
+ } else {
+ return flows.get(0);
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows by exec id", e);
+ }
+ }
+
+ private static class FetchActiveExecutableFlows implements
+ ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+
+ // Select running and executor assigned flows
+ private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, et.id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " INNER JOIN "
+ + " executors et ON ex.executor_id = et.id"
+ + " Where ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")";
+
+ @Override
+ public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
+ final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyMap();
+ }
+
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new HashMap<>();
+ do {
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+ final String host = rs.getString(4);
+ final int port = rs.getInt(5);
+ final int executorId = rs.getInt(6);
+ final boolean executorStatus = rs.getBoolean(7);
+
+ if (data == null) {
+ execFlows.put(id, null);
+ } else {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ final ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ final Executor executor = new Executor(executorId, host, port, executorStatus);
+ final ExecutionReference ref = new ExecutionReference(id, executor);
+ execFlows.put(id, new Pair<>(ref, exFlow));
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
+ private static class FetchActiveExecutableFlowByExecId implements
+ ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+
+ private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, et.id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " INNER JOIN "
+ + " executors et ON ex.executor_id = et.id"
+ + " Where ex.exec_id = ? AND ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")";
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
+ throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new ArrayList<>();
+ do {
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+ final String host = rs.getString(4);
+ final int port = rs.getInt(5);
+ final int executorId = rs.getInt(6);
+ final boolean executorStatus = rs.getBoolean(7);
+
+ if (data == null) {
+ logger.error("Found a flow with empty data blob exec_id: " + id);
+ } else {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
+ try {
+ if (encType == EncodingType.GZIP) {
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ final ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ final Executor executor = new Executor(executorId, host, port, executorStatus);
+ final ExecutionReference ref = new ExecutionReference(id, executor);
+ execFlows.add(new Pair<>(ref, exFlow));
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 9c1d579..3856122 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -19,39 +19,27 @@ package azkaban.executor;
import azkaban.database.AbstractJdbcLoader;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.metrics.CommonMetrics;
-import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
@Singleton
public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@@ -60,15 +48,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
.getLogger(JdbcExecutorLoader.class);
private final ExecutionFlowDao executionFlowDao;
private final ExecutorDao executorDao;
+ private final ExecutionJobDao executionJobDao;
+ private final ExecutionLogsDao executionLogsDao;
+ private final ExecutorEventsDao executorEventsDao;
+ private final ActiveExecutingFlowsDao activeExecutingFlowsDao;
+ private final FetchActiveFlowDao fetchActiveFlowDao;
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics,
final ExecutionFlowDao executionFlowDao,
- final ExecutorDao executorDao) {
+ final ExecutorDao executorDao,
+ final ExecutionJobDao executionJobDao,
+ final ExecutionLogsDao executionLogsDao,
+ final ExecutorEventsDao executorEventsDao,
+ final ActiveExecutingFlowsDao activeExecutingFlowsDao,
+ final FetchActiveFlowDao fetchActiveFlowDao) {
super(props, commonMetrics);
this.executionFlowDao = executionFlowDao;
this.executorDao = executorDao;
+ this.executionJobDao = executionJobDao;
+ this.executionLogsDao= executionLogsDao;
+ this.executorEventsDao = executorEventsDao;
+ this.activeExecutingFlowsDao = activeExecutingFlowsDao;
+ this.fetchActiveFlowDao = fetchActiveFlowDao;
}
public EncodingType getDefaultEncodingType() {
@@ -142,38 +145,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
- try {
- final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
- runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
- flowHandler);
- return properties;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows", e);
- }
+ return this.fetchActiveFlowDao.fetchActiveFlows();
}
@Override
public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
- try {
- final List<Pair<ExecutionReference, ExecutableFlow>> flows =
- runner.query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
- flowHandler, execId);
- if(flows.isEmpty()) {
- return null;
- }
- else {
- return flows.get(0);
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows by exec id", e);
- }
+ return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
}
@Override
@@ -250,283 +230,96 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public void addActiveExecutableReference(final ExecutionReference reference)
throws ExecutorManagerException {
- final String INSERT =
- "INSERT INTO active_executing_flows "
- + "(exec_id, update_time) values (?,?)";
- final QueryRunner runner = createQueryRunner();
- try {
- runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
- } catch (final SQLException e) {
- throw new ExecutorManagerException(
- "Error updating active flow reference " + reference.getExecId(), e);
- }
+ this.activeExecutingFlowsDao.addActiveExecutableReference(reference);
}
@Override
public void removeActiveExecutableReference(final int execid)
throws ExecutorManagerException {
- final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
- final QueryRunner runner = createQueryRunner();
- try {
- runner.update(DELETE, execid);
- } catch (final SQLException e) {
- throw new ExecutorManagerException(
- "Error deleting active flow reference " + execid, e);
- }
+ this.activeExecutingFlowsDao.removeActiveExecutableReference(execid);
}
@Override
public boolean updateExecutableReference(final int execId, final long updateTime)
throws ExecutorManagerException {
- final String DELETE =
- "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
-
- final QueryRunner runner = createQueryRunner();
- int updateNum = 0;
- try {
- updateNum = runner.update(DELETE, updateTime, execId);
- } catch (final SQLException e) {
- throw new ExecutorManagerException(
- "Error deleting active flow reference " + execId, e);
- }
// Should be 1.
- return updateNum > 0;
+ return this.activeExecutingFlowsDao.updateExecutableReference(execId, updateTime);
}
@Override
public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
throws ExecutorManagerException {
- final String INSERT_EXECUTION_NODE =
- "INSERT INTO execution_jobs "
- + "(exec_id, project_id, version, flow_id, job_id, start_time, "
- + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
-
- byte[] inputParam = null;
- if (inputProps != null) {
- try {
- final String jsonString =
- JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
- inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error encoding input params");
- }
- }
- final ExecutableFlow flow = node.getExecutableFlow();
- final String flowId = node.getParentFlow().getFlowPath();
- System.out.println("Uploading flowId " + flowId);
- final QueryRunner runner = createQueryRunner();
- try {
- runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
- flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
- node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
- inputParam, node.getAttempt());
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error writing job " + node.getId(), e);
- }
+ this.executionJobDao.uploadExecutableNode(node, inputProps);
}
@Override
public void updateExecutableNode(final ExecutableNode node)
throws ExecutorManagerException {
- final String UPSERT_EXECUTION_NODE =
- "UPDATE execution_jobs "
- + "SET start_time=?, end_time=?, status=?, output_params=? "
- + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
-
- byte[] outputParam = null;
- final Props outputProps = node.getOutputProps();
- if (outputProps != null) {
- try {
- final String jsonString =
- JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
- outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error encoding input params");
- }
- }
- final QueryRunner runner = createQueryRunner();
- try {
- runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
- .getEndTime(), node.getStatus().getNumVal(), outputParam, node
- .getExecutableFlow().getExecutionId(), node.getParentFlow()
- .getFlowPath(), node.getId(), node.getAttempt());
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error updating job " + node.getId(),
- e);
- }
+ this.executionJobDao.updateExecutableNode(node);
}
@Override
public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final List<ExecutableJobInfo> info =
- runner.query(
- FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
- new FetchExecutableJobHandler(), execId, jobId);
- if (info == null || info.isEmpty()) {
- return null;
- }
- return info;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error querying job info " + jobId, e);
- }
+ return this.executionJobDao.fetchJobInfoAttempts(execId, jobId);
}
@Override
public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final List<ExecutableJobInfo> info =
- runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
- new FetchExecutableJobHandler(), execId, jobId, attempts);
- if (info == null || info.isEmpty()) {
- return null;
- }
- return info.get(0);
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error querying job info " + jobId, e);
- }
+ return this.executionJobDao.fetchJobInfo(execId, jobId, attempts);
}
@Override
public Props fetchExecutionJobInputProps(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final Pair<Props, Props> props =
- runner.query(
- FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
- new FetchExecutableJobPropsHandler(), execId, jobId);
- return props.getFirst();
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error querying job params " + execId
- + " " + jobId, e);
- }
+ return this.executionJobDao.fetchExecutionJobInputProps(execId, jobId);
}
@Override
public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final Pair<Props, Props> props =
- runner
- .query(
- FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
- new FetchExecutableJobPropsHandler(), execId, jobId);
- return props.getFirst();
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error querying job params " + execId
- + " " + jobId, e);
- }
+ return this.executionJobDao.fetchExecutionJobOutputProps(execId, jobId);
}
@Override
public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final Pair<Props, Props> props =
- runner
- .query(
- FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
- new FetchExecutableJobPropsHandler(), execId, jobId);
- return props;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error querying job params " + execId
- + " " + jobId, e);
- }
+ return this.executionJobDao.fetchExecutionJobProps(execId, jobId);
}
@Override
public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
final int skip, final int size) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final List<ExecutableJobInfo> info =
- runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
- new FetchExecutableJobHandler(), projectId, jobId, skip, size);
- if (info == null || info.isEmpty()) {
- return null;
- }
- return info;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error querying job info " + jobId, e);
- }
+ return this.executionJobDao.fetchJobHistory(projectId, jobId, skip, size);
}
@Override
public LogData fetchLogs(final int execId, final String name, final int attempt, final int startByte,
final int length) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchLogsHandler handler =
- new FetchLogsHandler(startByte, length + startByte);
- try {
- final LogData result =
- runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
- attempt, startByte, startByte + length);
- return result;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching logs " + execId
- + " : " + name, e);
- }
+ return this.executionLogsDao.fetchLogs(execId, name, attempt, startByte, length);
}
@Override
public List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- try {
- final String attachments =
- runner
- .query(
- FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
- new FetchExecutableJobAttachmentsHandler(), execId, jobId);
- if (attachments == null) {
- return null;
- }
-
- final List<Object> attachmentList =
- (List<Object>) JSONUtils.parseJSONFromString(attachments);
-
- return attachmentList;
- } catch (final IOException e) {
- throw new ExecutorManagerException(
- "Error converting job attachments to JSON " + jobId, e);
- } catch (final SQLException e) {
- throw new ExecutorManagerException(
- "Error query job attachments " + jobId, e);
- }
+ return this.executionJobDao.fetchAttachments(execId, jobId, attempt);
}
@Override
public void uploadLogFile(final int execId, final String name, final int attempt, final File... files)
throws ExecutorManagerException {
- final Connection connection = getConnection();
- try {
- uploadLogFile(connection, execId, name, attempt, files,
- this.defaultEncodingType);
- connection.commit();
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error committing log", e);
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error committing log", e);
- } finally {
- DbUtils.closeQuietly(connection);
- }
+ this.executionLogsDao.uploadLogFile(execId, name, attempt, files);
}
private void uploadLogFile(final Connection connection, final int execId, final String name,
@@ -534,102 +327,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
throws ExecutorManagerException, IOException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
- final byte[] buffer = new byte[50 * 1024];
- int pos = 0;
- int length = buffer.length;
- int startByte = 0;
- try {
- for (int i = 0; i < files.length; ++i) {
- final File file = files[i];
-
- final BufferedInputStream bufferedStream =
- new BufferedInputStream(new FileInputStream(file));
- try {
- int size = bufferedStream.read(buffer, pos, length);
- while (size >= 0) {
- if (pos + size == buffer.length) {
- // Flush here.
- uploadLogPart(connection, execId, name, attempt, startByte,
- startByte + buffer.length, encType, buffer, buffer.length);
-
- pos = 0;
- length = buffer.length;
- startByte += buffer.length;
- } else {
- // Usually end of file.
- pos += size;
- length = buffer.length - pos;
- }
- size = bufferedStream.read(buffer, pos, length);
- }
- } finally {
- IOUtils.closeQuietly(bufferedStream);
- }
- }
-
- // Final commit of buffer.
- if (pos > 0) {
- uploadLogPart(connection, execId, name, attempt, startByte, startByte
- + pos, encType, buffer, pos);
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error writing log part.", e);
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error chunking", e);
- }
- }
-
- private void uploadLogPart(final Connection connection, final int execId, final String name,
- final int attempt, final int startByte, final int endByte, final EncodingType encType,
- final byte[] buffer, final int length) throws SQLException, IOException {
- final String INSERT_EXECUTION_LOGS =
- "INSERT INTO execution_logs "
- + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
- + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
-
- final QueryRunner runner = new QueryRunner();
- byte[] buf = buffer;
- if (encType == EncodingType.GZIP) {
- buf = GZIPUtils.gzipBytes(buf, 0, length);
- } else if (length < buf.length) {
- buf = Arrays.copyOf(buffer, length);
- }
-
- runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt,
- encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
- .getMillis());
+ this.executionLogsDao.uploadLogFile(connection, execId, name, attempt, files, encType);
}
@Override
public void uploadAttachmentFile(final ExecutableNode node, final File file)
throws ExecutorManagerException {
- final Connection connection = getConnection();
- try {
- uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
- connection.commit();
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error committing attachments ", e);
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error uploading attachments ", e);
- } finally {
- DbUtils.closeQuietly(connection);
- }
- }
-
- private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
- final File file, final EncodingType encType) throws SQLException, IOException {
-
- final String jsonString = FileUtils.readFileToString(file);
- final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
-
- final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
- "UPDATE execution_jobs " + "SET attachments=? "
- + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
-
- final QueryRunner runner = new QueryRunner();
- runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
- node.getExecutableFlow().getExecutionId(), node.getParentFlow()
- .getNestedId(), node.getId(), node.getAttempt());
+ this.executionJobDao.uploadAttachmentFile(node, file);
}
private Connection getConnection() throws ExecutorManagerException {
@@ -743,17 +447,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public void postExecutorEvent(final Executor executor, final EventType type, final String user,
final String message) throws ExecutorManagerException{
- final QueryRunner runner = createQueryRunner();
- final String INSERT_PROJECT_EVENTS =
- "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
- final Date updateDate = new Date();
- try {
- runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
- updateDate, user, message);
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Failed to post executor event", e);
- }
+ this.executorEventsDao.postExecutorEvent(executor, type, user, message);
}
/**
@@ -765,20 +460,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
final int offset) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
- List<ExecutorLogEvent> events = null;
- try {
- events =
- runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
- logHandler, executor.getId(), num, offset);
- } catch (final SQLException e) {
- throw new ExecutorManagerException(
- "Failed to fetch events for executor id : " + executor.getId(), e);
- }
-
- return events;
+ return this.executorEventsDao.getExecutorEvents(executor, num, offset);
}
/**
@@ -829,20 +512,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public int removeExecutionLogsByTime(final long millis)
throws ExecutorManagerException {
- final String DELETE_BY_TIME =
- "DELETE FROM execution_logs WHERE upload_time < ?";
-
- final QueryRunner runner = createQueryRunner();
- int updateNum = 0;
- try {
- updateNum = runner.update(DELETE_BY_TIME, millis);
- } catch (final SQLException e) {
- e.printStackTrace();
- throw new ExecutorManagerException(
- "Error deleting old execution_logs before " + millis, e);
- }
- return updateNum;
+ return this.executionLogsDao.removeExecutionLogsByTime(millis);
}
/**
@@ -881,193 +552,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- private static class FetchLogsHandler implements ResultSetHandler<LogData> {
- private static final String FETCH_LOGS =
- "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log "
- + "FROM execution_logs "
- + "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? "
- + "AND start_byte <= ? ORDER BY start_byte";
-
- private final int startByte;
- private final int endByte;
-
- public FetchLogsHandler(final int startByte, final int endByte) {
- this.startByte = startByte;
- this.endByte = endByte;
- }
-
- @Override
- public LogData handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return null;
- }
-
- final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-
- do {
- // int execId = rs.getInt(1);
- // String name = rs.getString(2);
- final int attempt = rs.getInt(3);
- final EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
- final int startByte = rs.getInt(5);
- final int endByte = rs.getInt(6);
-
- final byte[] data = rs.getBytes(7);
-
- final int offset =
- this.startByte > startByte ? this.startByte - startByte : 0;
- final int length =
- this.endByte < endByte ? this.endByte - startByte - offset
- : endByte - startByte - offset;
- try {
- byte[] buffer = data;
- if (encType == EncodingType.GZIP) {
- buffer = GZIPUtils.unGzipBytes(data);
- }
-
- byteStream.write(buffer, offset, length);
- } catch (final IOException e) {
- throw new SQLException(e);
- }
- } while (rs.next());
-
- final byte[] buffer = byteStream.toByteArray();
- final Pair<Integer, Integer> result =
- FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
-
- return new LogData(this.startByte + result.getFirst(), result.getSecond(),
- new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
- }
- }
-
- private static class FetchExecutableJobHandler implements
- ResultSetHandler<List<ExecutableJobInfo>> {
- private static final String FETCH_EXECUTABLE_NODE =
- "SELECT exec_id, project_id, version, flow_id, job_id, "
- + "start_time, end_time, status, attempt "
- + "FROM execution_jobs WHERE exec_id=? "
- + "AND job_id=? AND attempt=?";
- private static final String FETCH_EXECUTABLE_NODE_ATTEMPTS =
- "SELECT exec_id, project_id, version, flow_id, job_id, "
- + "start_time, end_time, status, attempt FROM execution_jobs "
- + "WHERE exec_id=? AND job_id=?";
- private static final String FETCH_PROJECT_EXECUTABLE_NODE =
- "SELECT exec_id, project_id, version, flow_id, job_id, "
- + "start_time, end_time, status, attempt FROM execution_jobs "
- + "WHERE project_id=? AND job_id=? "
- + "ORDER BY exec_id DESC LIMIT ?, ? ";
-
- @Override
- public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return Collections.<ExecutableJobInfo> emptyList();
- }
-
- final List<ExecutableJobInfo> execNodes = new ArrayList<>();
- do {
- final int execId = rs.getInt(1);
- final int projectId = rs.getInt(2);
- final int version = rs.getInt(3);
- final String flowId = rs.getString(4);
- final String jobId = rs.getString(5);
- final long startTime = rs.getLong(6);
- final long endTime = rs.getLong(7);
- final Status status = Status.fromInteger(rs.getInt(8));
- final int attempt = rs.getInt(9);
-
- final ExecutableJobInfo info =
- new ExecutableJobInfo(execId, projectId, version, flowId, jobId,
- startTime, endTime, status, attempt);
- execNodes.add(info);
- } while (rs.next());
-
- return execNodes;
- }
- }
-
- private static class FetchExecutableJobAttachmentsHandler implements
- ResultSetHandler<String> {
- private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
- "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
-
- @Override
- public String handle(final ResultSet rs) throws SQLException {
- String attachmentsJson = null;
- if (rs.next()) {
- try {
- final byte[] attachments = rs.getBytes(1);
- if (attachments != null) {
- attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
- }
- } catch (final IOException e) {
- throw new SQLException("Error decoding job attachments", e);
- }
- }
- return attachmentsJson;
- }
- }
-
- private static class FetchExecutableJobPropsHandler implements
- ResultSetHandler<Pair<Props, Props>> {
- private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
- "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
- "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static final String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
- "SELECT input_params, output_params "
- + "FROM execution_jobs WHERE exec_id=? AND job_id=?";
-
- @Override
- public Pair<Props, Props> handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return new Pair<>(null, null);
- }
-
- if (rs.getMetaData().getColumnCount() > 1) {
- final byte[] input = rs.getBytes(1);
- final byte[] output = rs.getBytes(2);
-
- Props inputProps = null;
- Props outputProps = null;
- try {
- if (input != null) {
- final String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
- inputProps =
- PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
- .parseJSONFromString(jsonInputString));
-
- }
- if (output != null) {
- final String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
- outputProps =
- PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
- .parseJSONFromString(jsonOutputString));
- }
- } catch (final IOException e) {
- throw new SQLException("Error decoding param data", e);
- }
-
- return new Pair<>(inputProps, outputProps);
- } else {
- final byte[] params = rs.getBytes(1);
- Props props = null;
- try {
- if (params != null) {
- final String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
-
- props =
- PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
- .parseJSONFromString(jsonProps));
- }
- } catch (final IOException e) {
- throw new SQLException("Error decoding param data", e);
- }
-
- return new Pair<>(props, null);
- }
- }
- }
-
/**
* JDBC ResultSetHandler to fetch queued executions
*/
@@ -1170,130 +654,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- private static class FetchActiveExecutableFlows implements
- ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
- // Select running and executor assigned flows
- private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
- + "et.port port, et.id executorId, et.active executorStatus"
- + " FROM execution_flows ex"
- + " INNER JOIN "
- + " executors et ON ex.executor_id = et.id"
- + " Where ex.status NOT IN ("
- + Status.SUCCEEDED.getNumVal() + ", "
- + Status.KILLED.getNumVal() + ", "
- + Status.FAILED.getNumVal() + ")";
-
- @Override
- public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
- final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return Collections.emptyMap();
- }
-
- final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
- new HashMap<>();
- do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
- final String host = rs.getString(4);
- final int port = rs.getInt(5);
- final int executorId = rs.getInt(6);
- final boolean executorStatus = rs.getBoolean(7);
-
- if (data == null) {
- execFlows.put(id, null);
- } else {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
- try {
- // Convoluted way to inflate strings. Should find common package or
- // helper function.
- if (encType == EncodingType.GZIP) {
- // Decompress the sucker.
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
- final Executor executor = new Executor(executorId, host, port, executorStatus);
- final ExecutionReference ref = new ExecutionReference(id, executor);
- execFlows.put(id, new Pair<>(ref, exFlow));
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
- }
- } while (rs.next());
-
- return execFlows;
- }
- }
-
- private static class FetchActiveExecutableFlowByExecId implements
- ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
- private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
- + "et.port port, et.id executorId, et.active executorStatus"
- + " FROM execution_flows ex"
- + " INNER JOIN "
- + " executors et ON ex.executor_id = et.id"
- + " Where ex.exec_id = ? AND ex.status NOT IN ("
- + Status.SUCCEEDED.getNumVal() + ", "
- + Status.KILLED.getNumVal() + ", "
- + Status.FAILED.getNumVal() + ")";
-
- @Override
- public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
- throws SQLException {
- if (!rs.next()) {
- return Collections.emptyList();
- }
-
- final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
- new ArrayList<>();
- do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
- final String host = rs.getString(4);
- final int port = rs.getInt(5);
- final int executorId = rs.getInt(6);
- final boolean executorStatus = rs.getBoolean(7);
-
- if (data == null) {
- logger.error("Found a flow with empty data blob exec_id: " + id);
- } else {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
- try {
- if (encType == EncodingType.GZIP) {
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
- final Executor executor = new Executor(executorId, host, port, executorStatus);
- final ExecutionReference ref = new ExecutionReference(id, executor);
- execFlows.add(new Pair<>(ref, exFlow));
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
- }
- } while (rs.next());
-
- return execFlows;
- }
- }
-
private static class IntHandler implements ResultSetHandler<Integer> {
private static final String NUM_EXECUTIONS =
"SELECT COUNT(1) FROM execution_flows";
@@ -1313,36 +673,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- /**
- * JDBC ResultSetHandler to fetch records from executor_events table
- */
- private static class ExecutorLogsResultHandler implements
- ResultSetHandler<List<ExecutorLogEvent>> {
- private static final String SELECT_EXECUTOR_EVENTS_ORDER =
- "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
- + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
-
- @Override
- public List<ExecutorLogEvent> handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return Collections.<ExecutorLogEvent> emptyList();
- }
-
- final ArrayList<ExecutorLogEvent> events = new ArrayList<>();
- do {
- final int executorId = rs.getInt(1);
- final int eventType = rs.getInt(2);
- final Date eventTime = rs.getDate(3);
- final String username = rs.getString(4);
- final String message = rs.getString(5);
-
- final ExecutorLogEvent event =
- new ExecutorLogEvent(executorId, username, eventTime,
- EventType.fromInteger(eventType), message);
- events.add(event);
- } while (rs.next());
-
- return events;
- }
- }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 1552742..e3317f1 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1060,7 +1060,9 @@ public class JdbcExecutorLoaderTest {
//TODO kunkun-tang: temporary work-around here. This Test is to be deprecated.
return new JdbcExecutorLoader(props,
- new CommonMetrics(new MetricsManager(new MetricRegistry())), null, null);
+ new CommonMetrics(new MetricsManager(new MetricRegistry())), null
+ , null, null, null, null,
+ null, null);
}
private boolean isTestSetup() {
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index 6718abc..c3c88e4 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -28,12 +28,17 @@ import azkaban.AzkabanCommonModule;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.db.DatabaseOperator;
+import azkaban.executor.ActiveExecutingFlowsDao;
import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutionFlowDao;
+import azkaban.executor.ExecutionJobDao;
+import azkaban.executor.ExecutionLogsDao;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorDao;
+import azkaban.executor.ExecutorEventsDao;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.FetchActiveFlowDao;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManager;
import azkaban.spi.Storage;
@@ -138,6 +143,11 @@ public class AzkabanWebServerTest {
assertSingleton(Emailer.class, injector);
assertSingleton(ExecutionFlowDao.class, injector);
assertSingleton(ExecutorDao.class, injector);
+ assertSingleton(ExecutionJobDao.class, injector);
+ assertSingleton(ExecutionLogsDao.class, injector);
+ assertSingleton(ExecutorEventsDao.class, injector);
+ assertSingleton(ActiveExecutingFlowsDao.class, injector);
+ assertSingleton(FetchActiveFlowDao.class, injector);
SERVICE_PROVIDER.unsetInjector();
}