Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDBManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDBManager.java
new file mode 100644
index 0000000..51f3a5f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDBManager.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.EncodingType;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.SQLTransaction;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.inject.Inject;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class ExecutionFlowDBManager {
+
+ private static final Logger logger = Logger.getLogger(ExecutionFlowDBManager.class);
+ private final DatabaseOperator dbOperator;
+
+ @Inject
+ public ExecutionFlowDBManager(final DatabaseOperator dbOperator) {
+ this.dbOperator = dbOperator;
+ }
+
+ public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
+ throws ExecutorManagerException {
+ final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
+ + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
+ + "values (?,?,?,?,?,?,?)";
+ final long submitTime = System.currentTimeMillis();
+ flow.setStatus(Status.PREPARING);
+
+ /**
+ * Why we need a transaction to get last insert ID?
+ * Because "SELECT LAST_INSERT_ID()" needs to have the same connection
+ * as inserting the new entry.
+ * See https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id
+ */
+ final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
+ transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
+ flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
+ submitTime, flow.getSubmitUser(), submitTime);
+ transOperator.getConnection().commit();
+ return transOperator.getLastInsertId();
+ };
+
+ try {
+ final long id = this.dbOperator.transaction(insertAndGetLastID);
+ logger.info("Flow given " + flow.getFlowId() + " given id " + id);
+ flow.setExecutionId((int) id);
+ updateExecutableFlow(flow);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error creating execution.", e);
+ }
+ }
+
+ List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
+ new FetchExecutableFlows(), skip, num);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching flow History", e);
+ }
+ }
+
+ List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final int skip, final int num)
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
+ new FetchExecutableFlows(), projectId, flowId, skip, num);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching flow history", e);
+ }
+ }
+
+ List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final int skip, final int num,
+ final Status status)
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
+ new FetchExecutableFlows(), projectId, flowId, status.getNumVal(), skip, num);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
+ List<ExecutableFlow> fetchFlowHistory(final String projContain, final String flowContains,
+ final String userNameContains, final int status,
+ final long startTime, final long endTime,
+ final int skip, final int num)
+ throws ExecutorManagerException {
+ String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
+ final List<Object> params = new ArrayList<>();
+
+ boolean first = true;
+ if (projContain != null && !projContain.isEmpty()) {
+ query += " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
+ params.add('%' + projContain + '%');
+ first = false;
+ }
+
+ // todo kunkun-tang: we don't need the below complicated logics. We should just use a simple way.
+ if (flowContains != null && !flowContains.isEmpty()) {
+ if (first) {
+ query += " WHERE ";
+ first = false;
+ } else {
+ query += " AND ";
+ }
+
+ query += " flow_id LIKE ?";
+ params.add('%' + flowContains + '%');
+ }
+
+ if (userNameContains != null && !userNameContains.isEmpty()) {
+ if (first) {
+ query += " WHERE ";
+ first = false;
+ } else {
+ query += " AND ";
+ }
+ query += " submit_user LIKE ?";
+ params.add('%' + userNameContains + '%');
+ }
+
+ if (status != 0) {
+ if (first) {
+ query += " WHERE ";
+ first = false;
+ } else {
+ query += " AND ";
+ }
+ query += " status = ?";
+ params.add(status);
+ }
+
+ if (startTime > 0) {
+ if (first) {
+ query += " WHERE ";
+ first = false;
+ } else {
+ query += " AND ";
+ }
+ query += " start_time > ?";
+ params.add(startTime);
+ }
+
+ if (endTime > 0) {
+ if (first) {
+ query += " WHERE ";
+ } else {
+ query += " AND ";
+ }
+ query += " end_time < ?";
+ params.add(endTime);
+ }
+
+ if (skip > -1 && num > 0) {
+ query += " ORDER BY exec_id DESC LIMIT ?, ?";
+ params.add(skip);
+ params.add(num);
+ }
+
+ try {
+ return this.dbOperator.query(query, new FetchExecutableFlows(), params.toArray());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
+ void updateExecutableFlow(final ExecutableFlow flow) throws ExecutorManagerException {
+ updateExecutableFlow(flow, EncodingType.GZIP);
+ }
+
+ private void updateExecutableFlow(final ExecutableFlow flow, final EncodingType encType)
+ throws ExecutorManagerException {
+ final String UPDATE_EXECUTABLE_FLOW_DATA =
+ "UPDATE execution_flows "
+ + "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? "
+ + "WHERE exec_id=?";
+
+ final String json = JSONUtils.toJSON(flow.toObject());
+ byte[] data = null;
+ try {
+ final byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+ // Todo kunkun-tang: use a common method to transform stringData to data.
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ } catch (final IOException e) {
+ throw new ExecutorManagerException("Error encoding the execution flow.");
+ }
+
+ try {
+ this.dbOperator.update(UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus()
+ .getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow
+ .getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error updating flow.", e);
+ }
+ }
+
+ public ExecutableFlow fetchExecutableFlow(final int execId) throws ExecutorManagerException {
+ final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+ try {
+ final List<ExecutableFlow> properties = this.dbOperator
+ .query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler, execId);
+ if (properties.isEmpty()) {
+ return null;
+ } else {
+ return properties.get(0);
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching flow id " + execId, e);
+ }
+ }
+
+ public static class FetchExecutableFlows implements
+ ResultSetHandler<List<ExecutableFlow>> {
+
+ static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
+ static String FETCH_EXECUTABLE_FLOW =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ + "WHERE exec_id=?";
+ static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ + "ORDER BY exec_id DESC LIMIT ?, ?";
+ static String FETCH_EXECUTABLE_FLOW_HISTORY =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ + "WHERE project_id=? AND flow_id=? "
+ + "ORDER BY exec_id DESC LIMIT ?, ?";
+ static String FETCH_EXECUTABLE_FLOW_BY_STATUS =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ + "WHERE project_id=? AND flow_id=? AND status=? "
+ + "ORDER BY exec_id DESC LIMIT ?, ?";
+
+ @Override
+ public List<ExecutableFlow> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<ExecutableFlow> execFlows = new ArrayList<>();
+ do {
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+
+ if (data != null) {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
+
+ /**
+ * The below code is a duplicate against many places, like azkaban.database.EncodingType
+ * TODO kunkun-tang: Extract these duplicates to a single static method.
+ */
+ try {
+ // Convoluted way to inflate strings. Should find common package
+ // or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ final ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ execFlows.add(exFlow);
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index e2d6c6b..db8afa2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -58,12 +58,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutorLoader {
private static final Logger logger = Logger
.getLogger(JdbcExecutorLoader.class);
-
+ private final ExecutionFlowDBManager executionFlowDBManager;
private EncodingType defaultEncodingType = EncodingType.GZIP;
@Inject
- public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics) {
+ public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics, final ExecutionFlowDBManager executionFlowDBManager) {
super(props, commonMetrics);
+ this.executionFlowDBManager = executionFlowDBManager;
}
public EncodingType getDefaultEncodingType() {
@@ -77,91 +78,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- final Connection connection = getConnection();
- try {
- uploadExecutableFlow(connection, flow, this.defaultEncodingType);
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error uploading flow", e);
- } finally {
- DbUtils.closeQuietly(connection);
- }
- }
-
- private synchronized void uploadExecutableFlow(final Connection connection,
- final ExecutableFlow flow, final EncodingType encType)
- throws ExecutorManagerException, IOException {
- final String INSERT_EXECUTABLE_FLOW =
- "INSERT INTO execution_flows "
- + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
- + "values (?,?,?,?,?,?,?)";
- final QueryRunner runner = new QueryRunner();
- final long submitTime = System.currentTimeMillis();
-
- final long id;
- try {
- flow.setStatus(Status.PREPARING);
- runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
- flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
- submitTime, flow.getSubmitUser(), submitTime);
- connection.commit();
- id =
- runner.query(connection, LastInsertID.LAST_INSERT_ID,
- new LastInsertID());
-
- if (id == -1L) {
- throw new ExecutorManagerException(
- "Execution id is not properly created.");
- }
- logger.info("Flow given " + flow.getFlowId() + " given id " + id);
- flow.setExecutionId((int) id);
-
- updateExecutableFlow(connection, flow, encType);
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error creating execution.", e);
- }
+ this.executionFlowDBManager.uploadExecutableFlow(flow);
}
@Override
public void updateExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
- final Connection connection = this.getConnection();
-
- try {
- updateExecutableFlow(connection, flow, this.defaultEncodingType);
- } finally {
- DbUtils.closeQuietly(connection);
- }
- }
-
- private void updateExecutableFlow(final Connection connection, final ExecutableFlow flow,
- final EncodingType encType) throws ExecutorManagerException {
- final String UPDATE_EXECUTABLE_FLOW_DATA =
- "UPDATE execution_flows "
- + "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? "
- + "WHERE exec_id=?";
- final QueryRunner runner = new QueryRunner();
-
- final String json = JSONUtils.toJSON(flow.toObject());
- byte[] data = null;
- try {
- final byte[] stringData = json.getBytes("UTF-8");
- data = stringData;
-
- if (encType == EncodingType.GZIP) {
- data = GZIPUtils.gzipBytes(stringData);
- }
- } catch (final IOException e) {
- throw new ExecutorManagerException("Error encoding the execution flow.");
- }
-
- try {
- runner.update(connection, UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus()
- .getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow
- .getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
- connection.commit();
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error updating flow.", e);
- }
+ this.executionFlowDBManager.updateExecutableFlow(flow);
}
@Override
@@ -311,138 +234,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
- try {
- final List<ExecutableFlow> properties =
- runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
- flowHandler, projectId, flowId, skip, num);
- return properties;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows", e);
- }
+ return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num, final Status status) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
- try {
- final List<ExecutableFlow> properties =
- runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
- flowHandler, projectId, flowId, status.getNumVal(), skip, num);
- return properties;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows", e);
- }
+ return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num, status);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
-
- final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
- try {
- final List<ExecutableFlow> properties =
- runner.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
- flowHandler, skip, num);
- return properties;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows", e);
- }
+ return this.executionFlowDBManager.fetchFlowHistory(skip,num);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final String projContain,
final String flowContains, final String userNameContains, final int status, final long startTime,
final long endTime, final int skip, final int num) throws ExecutorManagerException {
- String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
- final ArrayList<Object> params = new ArrayList<>();
-
- boolean first = true;
- if (projContain != null && !projContain.isEmpty()) {
- query += " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
- params.add('%' + projContain + '%');
- first = false;
- }
-
- if (flowContains != null && !flowContains.isEmpty()) {
- if (first) {
- query += " WHERE ";
- first = false;
- } else {
- query += " AND ";
- }
-
- query += " flow_id LIKE ?";
- params.add('%' + flowContains + '%');
- }
-
- if (userNameContains != null && !userNameContains.isEmpty()) {
- if (first) {
- query += " WHERE ";
- first = false;
- } else {
- query += " AND ";
- }
- query += " submit_user LIKE ?";
- params.add('%' + userNameContains + '%');
- }
-
- if (status != 0) {
- if (first) {
- query += " WHERE ";
- first = false;
- } else {
- query += " AND ";
- }
- query += " status = ?";
- params.add(status);
- }
-
- if (startTime > 0) {
- if (first) {
- query += " WHERE ";
- first = false;
- } else {
- query += " AND ";
- }
- query += " start_time > ?";
- params.add(startTime);
- }
-
- if (endTime > 0) {
- if (first) {
- query += " WHERE ";
- first = false;
- } else {
- query += " AND ";
- }
- query += " end_time < ?";
- params.add(endTime);
- }
-
- if (skip > -1 && num > 0) {
- query += " ORDER BY exec_id DESC LIMIT ?, ?";
- params.add(skip);
- params.add(num);
- }
-
- final QueryRunner runner = createQueryRunner();
- final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
- try {
- final List<ExecutableFlow> properties =
- runner.query(query, flowHandler, params.toArray());
- return properties;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows", e);
- }
+ return this.executionFlowDBManager.fetchFlowHistory(projContain, flowContains,
+ userNameContains, status, startTime, endTime, skip, num);
}
@Override
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
new file mode 100644
index 0000000..6c87b35
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+import java.io.File;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.dbutils.QueryRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExecutionFlowDBManagerTest {
+
+ private static final Props props = new Props();
+ private static DatabaseOperator dbOperator;
+ private ExecutionFlowDBManager executionFlowDBManager;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
+ dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
+
+ final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
+ props.put("database.sql.scripts.dir", sqlScriptsDir);
+
+ // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
+ final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
+ new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
+ final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
+ setup.loadTableInfo();
+ setup.updateDatabase(true, false);
+ }
+
+ @AfterClass
+ public static void destroyDB() throws Exception {
+ try {
+ dbOperator.update("DROP ALL OBJECTS");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setup() {
+ this.executionFlowDBManager = new ExecutionFlowDBManager(dbOperator);
+ }
+
+ @After
+ public void clearDB() {
+ try {
+ dbOperator.update("DELETE FROM execution_flows");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private ExecutableFlow createTestFlow() throws Exception {
+ return TestUtils.createExecutableFlow("exectest1", "exec1");
+ }
+
+ @Test
+ public void testUploadAndFetchExecutionFlows() throws Exception {
+
+ final ExecutableFlow flow = createTestFlow();
+ this.executionFlowDBManager.uploadExecutableFlow(flow);
+
+ final ExecutableFlow fetchFlow =
+ this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+
+ assertThat(flow).isNotSameAs(fetchFlow);
+ assertTwoFlowSame(flow, fetchFlow);
+ }
+
+
+ @Test
+ public void testUpdateExecutableFlow() throws Exception {
+ final ExecutableFlow flow = createTestFlow();
+ this.executionFlowDBManager.uploadExecutableFlow(flow);
+
+ final ExecutableFlow fetchFlow =
+ this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+
+ fetchFlow.setEndTime(System.currentTimeMillis());
+ fetchFlow.setStatus(Status.SUCCEEDED);
+ this.executionFlowDBManager.updateExecutableFlow(fetchFlow);
+ final ExecutableFlow fetchFlow2 =
+ this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+
+ assertTwoFlowSame(fetchFlow, fetchFlow2);
+ }
+
+ @Test
+ public void fetchFlowHistory() throws Exception {
+ final ExecutableFlow flow = createTestFlow();
+ this.executionFlowDBManager.uploadExecutableFlow(flow);
+ final List<ExecutableFlow> flowList1 = this.executionFlowDBManager.fetchFlowHistory(0,2 );
+ assertThat(flowList1.size()).isEqualTo(1);
+
+ final List<ExecutableFlow> flowList2 = this.executionFlowDBManager
+ .fetchFlowHistory(flow.getProjectId(), flow.getId(),0,2 );
+ assertThat(flowList2.size()).isEqualTo(1);
+
+ final ExecutableFlow fetchFlow =
+ this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+ assertTwoFlowSame(flowList1.get(0), flowList2.get(0));
+ assertTwoFlowSame(flowList1.get(0), fetchFlow);
+ }
+
+ private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
+ assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
+ assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
+ assertThat(flow1.getEndTime()).isEqualTo(flow2.getEndTime());
+ assertThat(flow1.getStartTime()).isEqualTo(flow2.getStartTime());
+ assertThat(flow1.getSubmitTime()).isEqualTo(flow2.getStartTime());
+ assertThat(flow1.getFlowId()).isEqualTo(flow2.getFlowId());
+ assertThat(flow1.getProjectId()).isEqualTo(flow2.getProjectId());
+ assertThat(flow1.getVersion()).isEqualTo(flow2.getVersion());
+ assertThat(flow1.getExecutionOptions().getFailureAction())
+ .isEqualTo(flow2.getExecutionOptions().getFailureAction());
+ assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 3b3f1ca..9da2af7 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1006,7 +1006,6 @@ public class JdbcExecutorLoaderTest {
logsResult6.getLength(), 185493);
}
- @SuppressWarnings("static-access")
@Ignore @Test
public void testRemoveExecutionLogsByTime() throws ExecutorManagerException,
IOException, InterruptedException {
@@ -1055,8 +1054,9 @@ public class JdbcExecutorLoaderTest {
props.put("mysql.password", password);
props.put("mysql.numconnections", numConnections);
+ //TODO kunkun-tang: temporary work-around here. This Test is to be deprecated.
return new JdbcExecutorLoader(props,
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new CommonMetrics(new MetricsManager(new MetricRegistry())), null);
}
private boolean isTestSetup() {
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index b8347dc..efa6c12 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -29,6 +29,7 @@ import azkaban.database.AzkabanDatabaseSetup;
import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.db.DatabaseOperator;
import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutionFlowDBManager;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
@@ -121,6 +122,7 @@ public class AzkabanWebServerTest {
executorLoader.updateExecutor(executor);
assertNotNull(injector.getInstance(AzkabanWebServer.class));
+ assertNotNull(injector.getInstance(ExecutionFlowDBManager.class));
//Test if triggermanager is singletonly guiced. If not, the below test will fail.
assertSingleton(ExecutorLoader.class, injector);
@@ -133,6 +135,7 @@ public class AzkabanWebServerTest {
assertSingleton(TriggerManager.class, injector);
assertSingleton(AlerterHolder.class, injector);
assertSingleton(Emailer.class, injector);
+ assertSingleton(ExecutionFlowDBManager.class, injector);
SERVICE_PROVIDER.unsetInjector();
}