diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
index dd26850..35a1882 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
@@ -16,10 +16,7 @@
package azkaban.executor;
-import azkaban.database.AbstractJdbcLoader;
import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
-import azkaban.utils.Props;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -27,78 +24,60 @@ import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;
@Singleton
-public class ExecutorDao extends AbstractJdbcLoader {
+public class ExecutorDao {
private static final Logger logger = Logger.getLogger(ExecutorDao.class);
private final DatabaseOperator dbOperator;
@Inject
- public ExecutorDao(final Props props, final CommonMetrics commonMetrics,
- final DatabaseOperator dbOperator) {
- super(props, commonMetrics);
+ public ExecutorDao (final DatabaseOperator dbOperator) {
this.dbOperator = dbOperator;
}
- public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
+ List<Executor> fetchAllExecutors() throws ExecutorManagerException {
try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
- return executors;
+ return this.dbOperator
+ .query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, new FetchExecutorHandler());
} catch (final Exception e) {
throw new ExecutorManagerException("Error fetching executors", e);
}
}
- public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
+ List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
- executorHandler);
- return executors;
- } catch (final Exception e) {
+ return this.dbOperator
+ .query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS, new FetchExecutorHandler());
+ } catch (final SQLException e) {
throw new ExecutorManagerException("Error fetching active executors", e);
}
}
public Executor fetchExecutor(final String host, final int port)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
try {
final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
- executorHandler, host, port);
+ this.dbOperator.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+ new FetchExecutorHandler(), host, port);
if (executors.isEmpty()) {
return null;
} else {
return executors.get(0);
}
- } catch (final Exception e) {
+ } catch (final SQLException e) {
throw new ExecutorManagerException(String.format(
"Error fetching executor %s:%d", host, port), e);
}
}
public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
- executorHandler, executorId);
+ final List<Executor> executors = this.dbOperator
+ .query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+ new FetchExecutorHandler(), executorId);
if (executors.isEmpty()) {
return null;
} else {
@@ -110,58 +89,69 @@ public class ExecutorDao extends AbstractJdbcLoader {
}
}
- public Executor fetchExecutorByExecutionId(final int executionId)
+ Executor fetchExecutorByExecutionId(final int executionId)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
- Executor executor = null;
try {
- final List<Executor> executors =
- runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+ final List<Executor> executors = this.dbOperator
+ .query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
executorHandler, executionId);
if (executors.size() > 0) {
- executor = executors.get(0);
+ return executors.get(0);
+ } else {
+ return null;
}
} catch (final SQLException e) {
throw new ExecutorManagerException(
"Error fetching executor for exec_id : " + executionId, e);
}
- return executor;
}
- public Executor addExecutor(final String host, final int port)
+ Executor addExecutor(final String host, final int port)
throws ExecutorManagerException {
// verify, if executor already exists
- Executor executor = fetchExecutor(host, port);
- if (executor != null) {
+ if (fetchExecutor(host, port) != null) {
throw new ExecutorManagerException(String.format(
"Executor %s:%d already exist", host, port));
}
// add new executor
addExecutorHelper(host, port);
- // fetch newly added executor
- executor = fetchExecutor(host, port);
- return executor;
+ // fetch newly added executor
+ return fetchExecutor(host, port);
}
private void addExecutorHelper(final String host, final int port)
throws ExecutorManagerException {
final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
- final QueryRunner runner = createQueryRunner();
try {
- runner.update(INSERT, host, port);
+ this.dbOperator.update(INSERT, host, port);
} catch (final SQLException e) {
throw new ExecutorManagerException(String.format("Error adding %s:%d ",
host, port), e);
}
}
- public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
+ public void updateExecutor(final Executor executor) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE executors SET host=?, port=?, active=? where id=?";
+
+ try {
+ final int rows = this.dbOperator.update(UPDATE, executor.getHost(), executor.getPort(),
+ executor.isActive(), executor.getId());
+ if (rows == 0) {
+ throw new ExecutorManagerException("No executor with id :" + executor.getId());
+ }
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error inactivating executor "
+ + executor.getId(), e);
+ }
+ }
+
+ void removeExecutor(final String host, final int port) throws ExecutorManagerException {
final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
- final QueryRunner runner = createQueryRunner();
try {
- final int rows = runner.update(DELETE, host, port);
+ final int rows = this.dbOperator.update(DELETE, host, port);
if (rows == 0) {
throw new ExecutorManagerException("No executor with host, port :"
+ "(" + host + "," + port + ")");
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 162c34e..00c3f86 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -339,22 +339,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public void updateExecutor(final Executor executor) throws ExecutorManagerException {
- final String UPDATE =
- "UPDATE executors SET host=?, port=?, active=? where id=?";
-
- final QueryRunner runner = createQueryRunner();
- try {
- final int rows =
- runner.update(UPDATE, executor.getHost(), executor.getPort(),
- executor.isActive(), executor.getId());
- if (rows == 0) {
- throw new ExecutorManagerException("No executor with id :"
- + executor.getId());
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error inactivating executor "
- + executor.getId(), e);
- }
+ this.executorDao.updateExecutor(executor);
}
@Override
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorDaoTest.java
new file mode 100644
index 0000000..b97ea20
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorDaoTest.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.test.Utils;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExecutorDaoTest {
+
+ private static DatabaseOperator dbOperator;
+ private ExecutorDao executorDao;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ dbOperator = Utils.initTestDB();
+ }
+
+ @AfterClass
+ public static void destroyDB() throws Exception {
+ try {
+ dbOperator.update("DROP ALL OBJECTS");
+ dbOperator.update("SHUTDOWN");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setup() {
+ this.executorDao = new ExecutorDao(dbOperator);
+ }
+
+ @After
+ public void clearDB() {
+ try {
+ dbOperator.update("delete from executors");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /* Test all executors fetch from empty executors */
+ @Test
+ public void testFetchEmptyExecutors() throws Exception {
+ final List<Executor> executors = this.executorDao.fetchAllExecutors();
+ assertThat(executors.size()).isEqualTo(0);
+ }
+
+ /* Test active executors fetch from empty executors */
+ @Test
+ public void testFetchEmptyActiveExecutors() throws Exception {
+ final List<Executor> executors = this.executorDao.fetchActiveExecutors();
+ assertThat(executors.size()).isEqualTo(0);
+ }
+
+ /* Test missing executor fetch with search by executor id */
+ @Test
+ public void testFetchMissingExecutorId() throws Exception {
+ final Executor executor = this.executorDao.fetchExecutor(0);
+ assertThat(executor).isEqualTo(null);
+ }
+
+ /* Test missing executor fetch with search by host:port */
+ @Test
+ public void testFetchMissingExecutorHostPort() throws Exception {
+ final Executor executor = this.executorDao.fetchExecutor("localhost", 12345);
+ assertThat(executor).isEqualTo(null);
+ }
+
+ /* Test to add duplicate executors */
+ @Test
+ public void testDuplicateAddExecutor() throws Exception {
+ final String host = "localhost";
+ final int port = 12345;
+ this.executorDao.addExecutor(host, port);
+ assertThatThrownBy(() -> this.executorDao.addExecutor(host, port))
+ .isInstanceOf(ExecutorManagerException.class)
+ .hasMessageContaining("already exist");
+ }
+
+ /* Test to try update a non-existent executor */
+ @Test
+ public void testMissingExecutorUpdate() throws Exception {
+ final Executor executor = new Executor(1, "localhost", 1234, true);
+ assertThatThrownBy(() -> this.executorDao.updateExecutor(executor))
+ .isInstanceOf(ExecutorManagerException.class)
+ .hasMessageContaining("No executor with id");
+ }
+
+ /* Test add & fetch by Id Executors */
+ @Test
+ public void testSingleExecutorFetchById() throws Exception {
+ final List<Executor> executors = addTestExecutors();
+ for (final Executor executor : executors) {
+ final Executor fetchedExecutor = this.executorDao.fetchExecutor(executor.getId());
+ assertThat(executor).isEqualTo(fetchedExecutor);
+ }
+ }
+
+ /* Test fetch all executors */
+ @Test
+ public void testFetchAllExecutors() throws Exception {
+ final List<Executor> executors = addTestExecutors();
+ executors.get(0).setActive(false);
+ this.executorDao.updateExecutor(executors.get(0));
+ final List<Executor> fetchedExecutors = this.executorDao.fetchAllExecutors();
+ assertThat(executors.size()).isEqualTo(fetchedExecutors.size());
+ assertThat(executors.toArray()).isEqualTo(fetchedExecutors.toArray());
+ }
+
+ /* Test fetch only active executors */
+ @Test
+ public void testFetchActiveExecutors() throws Exception {
+ final List<Executor> executors = addTestExecutors();
+
+ executors.get(0).setActive(true);
+ this.executorDao.updateExecutor(executors.get(0));
+ final List<Executor> fetchedExecutors = this.executorDao.fetchActiveExecutors();
+ assertThat(executors.size()).isEqualTo(fetchedExecutors.size() + 2);
+ assertThat(executors.get(0)).isEqualTo(fetchedExecutors.get(0));
+ }
+
+ /* Test add & fetch by host:port Executors */
+ @Test
+ public void testSingleExecutorFetchHostPort() throws Exception {
+ final List<Executor> executors = addTestExecutors();
+ for (final Executor executor : executors) {
+ final Executor fetchedExecutor =
+ this.executorDao.fetchExecutor(executor.getHost(), executor.getPort());
+ assertThat(executor).isEqualTo(fetchedExecutor);
+ }
+ }
+
+ /* Helper method used in methods testing jdbc interface for executors table */
+ private List<Executor> addTestExecutors()
+ throws ExecutorManagerException {
+ final List<Executor> executors = new ArrayList<>();
+ executors.add(this.executorDao.addExecutor("localhost1", 12345));
+ executors.add(this.executorDao.addExecutor("localhost2", 12346));
+ executors.add(this.executorDao.addExecutor("localhost1", 12347));
+ return executors;
+ }
+
+ /* Test Removing Executor */
+ @Test
+ public void testRemovingExecutor() throws Exception {
+ final Executor executor = this.executorDao.addExecutor("localhost1", 12345);
+ assertThat(executor).isNotNull();
+ this.executorDao.removeExecutor("localhost1", 12345);
+ final Executor fetchedExecutor = this.executorDao.fetchExecutor("localhost1", 12345);
+ assertThat(fetchedExecutor).isNull();
+ }
+
+ /* Test Executor reactivation */
+ @Test
+ public void testExecutorActivation() throws Exception {
+ final Executor executor = this.executorDao.addExecutor("localhost1", 12345);
+ assertThat(executor.isActive()).isFalse();
+
+ executor.setActive(true);
+ this.executorDao.updateExecutor(executor);
+ final Executor fetchedExecutor = this.executorDao.fetchExecutor(executor.getId());
+ assertThat(fetchedExecutor.isActive()).isTrue();
+ }
+}