azkaban-aplcache

Continue DB API migration as a follow-up of #1357 (#1362) This

8/21/2017 5:32:14 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
index dd26850..35a1882 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
@@ -16,10 +16,7 @@
 
 package azkaban.executor;
 
-import azkaban.database.AbstractJdbcLoader;
 import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
-import azkaban.utils.Props;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -27,78 +24,60 @@ import java.util.Collections;
 import java.util.List;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.log4j.Logger;
 
 @Singleton
-public class ExecutorDao extends AbstractJdbcLoader {
+public class ExecutorDao {
 
   private static final Logger logger = Logger.getLogger(ExecutorDao.class);
   private final DatabaseOperator dbOperator;
 
   @Inject
-  public ExecutorDao(final Props props, final CommonMetrics commonMetrics,
-                     final DatabaseOperator dbOperator) {
-    super(props, commonMetrics);
+  public ExecutorDao (final DatabaseOperator dbOperator) {
     this.dbOperator = dbOperator;
   }
 
-  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
+  List<Executor> fetchAllExecutors() throws ExecutorManagerException {
     try {
-      final List<Executor> executors =
-          runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
-      return executors;
+      return this.dbOperator
+          .query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, new FetchExecutorHandler());
     } catch (final Exception e) {
       throw new ExecutorManagerException("Error fetching executors", e);
     }
   }
 
-  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
+  List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
     try {
-      final List<Executor> executors =
-          runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
-              executorHandler);
-      return executors;
-    } catch (final Exception e) {
+      return this.dbOperator
+          .query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS, new FetchExecutorHandler());
+    } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active executors", e);
     }
   }
 
   public Executor fetchExecutor(final String host, final int port)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
     try {
       final List<Executor> executors =
-          runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
-              executorHandler, host, port);
+          this.dbOperator.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+              new FetchExecutorHandler(), host, port);
       if (executors.isEmpty()) {
         return null;
       } else {
         return executors.get(0);
       }
-    } catch (final Exception e) {
+    } catch (final SQLException e) {
       throw new ExecutorManagerException(String.format(
           "Error fetching executor %s:%d", host, port), e);
     }
   }
 
   public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
     try {
-      final List<Executor> executors =
-          runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
-              executorHandler, executorId);
+      final List<Executor> executors = this.dbOperator
+          .query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+              new FetchExecutorHandler(), executorId);
       if (executors.isEmpty()) {
         return null;
       } else {
@@ -110,58 +89,69 @@ public class ExecutorDao extends AbstractJdbcLoader {
     }
   }
 
-  public Executor fetchExecutorByExecutionId(final int executionId)
+  Executor fetchExecutorByExecutionId(final int executionId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
     final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-    Executor executor = null;
     try {
-      final List<Executor> executors =
-          runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+      final List<Executor> executors = this.dbOperator
+          .query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
               executorHandler, executionId);
       if (executors.size() > 0) {
-        executor = executors.get(0);
+        return executors.get(0);
+      } else {
+        return null;
       }
     } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error fetching executor for exec_id : " + executionId, e);
     }
-    return executor;
   }
 
-  public Executor addExecutor(final String host, final int port)
+  Executor addExecutor(final String host, final int port)
       throws ExecutorManagerException {
     // verify, if executor already exists
-    Executor executor = fetchExecutor(host, port);
-    if (executor != null) {
+    if (fetchExecutor(host, port) != null) {
       throw new ExecutorManagerException(String.format(
           "Executor %s:%d already exist", host, port));
     }
     // add new executor
     addExecutorHelper(host, port);
-    // fetch newly added executor
-    executor = fetchExecutor(host, port);
 
-    return executor;
+    // fetch newly added executor
+    return fetchExecutor(host, port);
   }
 
   private void addExecutorHelper(final String host, final int port)
       throws ExecutorManagerException {
     final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
-    final QueryRunner runner = createQueryRunner();
     try {
-      runner.update(INSERT, host, port);
+      this.dbOperator.update(INSERT, host, port);
     } catch (final SQLException e) {
       throw new ExecutorManagerException(String.format("Error adding %s:%d ",
           host, port), e);
     }
   }
 
-  public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
+  public void updateExecutor(final Executor executor) throws ExecutorManagerException {
+    final String UPDATE =
+        "UPDATE executors SET host=?, port=?, active=? where id=?";
+
+    try {
+      final int rows = this.dbOperator.update(UPDATE, executor.getHost(), executor.getPort(),
+          executor.isActive(), executor.getId());
+      if (rows == 0) {
+        throw new ExecutorManagerException("No executor with id :" + executor.getId());
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error inactivating executor "
+          + executor.getId(), e);
+    }
+  }
+
+  void removeExecutor(final String host, final int port) throws ExecutorManagerException {
     final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
-    final QueryRunner runner = createQueryRunner();
     try {
-      final int rows = runner.update(DELETE, host, port);
+      final int rows = this.dbOperator.update(DELETE, host, port);
       if (rows == 0) {
         throw new ExecutorManagerException("No executor with host, port :"
             + "(" + host + "," + port + ")");
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 162c34e..00c3f86 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -339,22 +339,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
  @Override
   public void updateExecutor(final Executor executor) throws ExecutorManagerException {
-    final String UPDATE =
-      "UPDATE executors SET host=?, port=?, active=? where id=?";
-
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final int rows =
-        runner.update(UPDATE, executor.getHost(), executor.getPort(),
-          executor.isActive(), executor.getId());
-      if (rows == 0) {
-        throw new ExecutorManagerException("No executor with id :"
-          + executor.getId());
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error inactivating executor "
-        + executor.getId(), e);
-    }
+    this.executorDao.updateExecutor(executor);
   }
 
   @Override
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index d1f1263..08e95de 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -50,6 +50,7 @@ public class ExecutionFlowDaoTest {
   public static void destroyDB() throws Exception {
     try {
       dbOperator.update("DROP ALL OBJECTS");
+      dbOperator.update("SHUTDOWN");
     } catch (final SQLException e) {
       e.printStackTrace();
     }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorDaoTest.java
new file mode 100644
index 0000000..b97ea20
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorDaoTest.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.test.Utils;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExecutorDaoTest {
+
+  private static DatabaseOperator dbOperator;
+  private ExecutorDao executorDao;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    dbOperator = Utils.initTestDB();
+  }
+
+  @AfterClass
+  public static void destroyDB() throws Exception {
+    try {
+      dbOperator.update("DROP ALL OBJECTS");
+      dbOperator.update("SHUTDOWN");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Before
+  public void setup() {
+    this.executorDao = new ExecutorDao(dbOperator);
+  }
+
+  @After
+  public void clearDB() {
+    try {
+      dbOperator.update("delete from executors");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /* Test all executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyExecutors() throws Exception {
+    final List<Executor> executors = this.executorDao.fetchAllExecutors();
+    assertThat(executors.size()).isEqualTo(0);
+  }
+
+  /* Test active executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyActiveExecutors() throws Exception {
+    final List<Executor> executors = this.executorDao.fetchActiveExecutors();
+    assertThat(executors.size()).isEqualTo(0);
+  }
+
+  /* Test missing executor fetch with search by executor id */
+  @Test
+  public void testFetchMissingExecutorId() throws Exception {
+    final Executor executor = this.executorDao.fetchExecutor(0);
+    assertThat(executor).isEqualTo(null);
+  }
+
+  /* Test missing executor fetch with search by host:port */
+  @Test
+  public void testFetchMissingExecutorHostPort() throws Exception {
+    final Executor executor = this.executorDao.fetchExecutor("localhost", 12345);
+    assertThat(executor).isEqualTo(null);
+  }
+
+  /* Test to add duplicate executors */
+  @Test
+  public void testDuplicateAddExecutor() throws Exception {
+    final String host = "localhost";
+    final int port = 12345;
+    this.executorDao.addExecutor(host, port);
+    assertThatThrownBy(() -> this.executorDao.addExecutor(host, port))
+        .isInstanceOf(ExecutorManagerException.class)
+        .hasMessageContaining("already exist");
+  }
+
+  /* Test to try update a non-existent executor */
+  @Test
+  public void testMissingExecutorUpdate() throws Exception {
+    final Executor executor = new Executor(1, "localhost", 1234, true);
+    assertThatThrownBy(() -> this.executorDao.updateExecutor(executor))
+        .isInstanceOf(ExecutorManagerException.class)
+        .hasMessageContaining("No executor with id");
+  }
+
+  /* Test add & fetch by Id Executors */
+  @Test
+  public void testSingleExecutorFetchById() throws Exception {
+    final List<Executor> executors = addTestExecutors();
+    for (final Executor executor : executors) {
+      final Executor fetchedExecutor = this.executorDao.fetchExecutor(executor.getId());
+      assertThat(executor).isEqualTo(fetchedExecutor);
+    }
+  }
+
+  /* Test fetch all executors */
+  @Test
+  public void testFetchAllExecutors() throws Exception {
+    final List<Executor> executors = addTestExecutors();
+    executors.get(0).setActive(false);
+    this.executorDao.updateExecutor(executors.get(0));
+    final List<Executor> fetchedExecutors = this.executorDao.fetchAllExecutors();
+    assertThat(executors.size()).isEqualTo(fetchedExecutors.size());
+    assertThat(executors.toArray()).isEqualTo(fetchedExecutors.toArray());
+  }
+
+  /* Test fetch only active executors */
+  @Test
+  public void testFetchActiveExecutors() throws Exception {
+    final List<Executor> executors = addTestExecutors();
+
+    executors.get(0).setActive(true);
+    this.executorDao.updateExecutor(executors.get(0));
+    final List<Executor> fetchedExecutors = this.executorDao.fetchActiveExecutors();
+    assertThat(executors.size()).isEqualTo(fetchedExecutors.size() + 2);
+    assertThat(executors.get(0)).isEqualTo(fetchedExecutors.get(0));
+  }
+
+  /* Test add & fetch by host:port Executors */
+  @Test
+  public void testSingleExecutorFetchHostPort() throws Exception {
+    final List<Executor> executors = addTestExecutors();
+    for (final Executor executor : executors) {
+      final Executor fetchedExecutor =
+          this.executorDao.fetchExecutor(executor.getHost(), executor.getPort());
+      assertThat(executor).isEqualTo(fetchedExecutor);
+    }
+  }
+
+  /* Helper method used in methods testing jdbc interface for executors table */
+  private List<Executor> addTestExecutors()
+      throws ExecutorManagerException {
+    final List<Executor> executors = new ArrayList<>();
+    executors.add(this.executorDao.addExecutor("localhost1", 12345));
+    executors.add(this.executorDao.addExecutor("localhost2", 12346));
+    executors.add(this.executorDao.addExecutor("localhost1", 12347));
+    return executors;
+  }
+
+  /* Test Removing Executor */
+  @Test
+  public void testRemovingExecutor() throws Exception {
+    final Executor executor = this.executorDao.addExecutor("localhost1", 12345);
+    assertThat(executor).isNotNull();
+    this.executorDao.removeExecutor("localhost1", 12345);
+    final Executor fetchedExecutor = this.executorDao.fetchExecutor("localhost1", 12345);
+    assertThat(fetchedExecutor).isNull();
+  }
+
+  /* Test Executor reactivation */
+  @Test
+  public void testExecutorActivation() throws Exception {
+    final Executor executor = this.executorDao.addExecutor("localhost1", 12345);
+    assertThat(executor.isActive()).isFalse();
+
+    executor.setActive(true);
+    this.executorDao.updateExecutor(executor);
+    final Executor fetchedExecutor = this.executorDao.fetchExecutor(executor.getId());
+    assertThat(fetchedExecutor.isActive()).isTrue();
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index c6bafed..c90e149 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -54,6 +54,7 @@ public class JdbcProjectImplTest {
   public static void destroyDB() {
     try {
       dbOperator.update("DROP ALL OBJECTS");
+      dbOperator.update("SHUTDOWN");
     } catch (final SQLException e) {
       e.printStackTrace();
     }