azkaban-developers

One refactor for Executor operations from JdbcExecutorLoader

8/17/2017 8:47:52 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
new file mode 100644
index 0000000..dd26850
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class ExecutorDao extends AbstractJdbcLoader {
+
+  private static final Logger logger = Logger.getLogger(ExecutorDao.class);
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  public ExecutorDao(final Props props, final CommonMetrics commonMetrics,
+                     final DatabaseOperator dbOperator) {
+    super(props, commonMetrics);
+    this.dbOperator = dbOperator;
+  }
+
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      final List<Executor> executors =
+          runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+      return executors;
+    } catch (final Exception e) {
+      throw new ExecutorManagerException("Error fetching executors", e);
+    }
+  }
+
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      final List<Executor> executors =
+          runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
+              executorHandler);
+      return executors;
+    } catch (final Exception e) {
+      throw new ExecutorManagerException("Error fetching active executors", e);
+    }
+  }
+
+  public Executor fetchExecutor(final String host, final int port)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      final List<Executor> executors =
+          runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+              executorHandler, host, port);
+      if (executors.isEmpty()) {
+        return null;
+      } else {
+        return executors.get(0);
+      }
+    } catch (final Exception e) {
+      throw new ExecutorManagerException(String.format(
+          "Error fetching executor %s:%d", host, port), e);
+    }
+  }
+
+  public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      final List<Executor> executors =
+          runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+              executorHandler, executorId);
+      if (executors.isEmpty()) {
+        return null;
+      } else {
+        return executors.get(0);
+      }
+    } catch (final Exception e) {
+      throw new ExecutorManagerException(String.format(
+          "Error fetching executor with id: %d", executorId), e);
+    }
+  }
+
+  public Executor fetchExecutorByExecutionId(final int executionId)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    Executor executor = null;
+    try {
+      final List<Executor> executors =
+          runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+              executorHandler, executionId);
+      if (executors.size() > 0) {
+        executor = executors.get(0);
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(
+          "Error fetching executor for exec_id : " + executionId, e);
+    }
+    return executor;
+  }
+
+  public Executor addExecutor(final String host, final int port)
+      throws ExecutorManagerException {
+    // verify, if executor already exists
+    Executor executor = fetchExecutor(host, port);
+    if (executor != null) {
+      throw new ExecutorManagerException(String.format(
+          "Executor %s:%d already exist", host, port));
+    }
+    // add new executor
+    addExecutorHelper(host, port);
+    // fetch newly added executor
+    executor = fetchExecutor(host, port);
+
+    return executor;
+  }
+
+  private void addExecutorHelper(final String host, final int port)
+      throws ExecutorManagerException {
+    final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
+    final QueryRunner runner = createQueryRunner();
+    try {
+      runner.update(INSERT, host, port);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+          host, port), e);
+    }
+  }
+
+  public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
+    final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
+    final QueryRunner runner = createQueryRunner();
+    try {
+      final int rows = runner.update(DELETE, host, port);
+      if (rows == 0) {
+        throw new ExecutorManagerException("No executor with host, port :"
+            + "(" + host + "," + port + ")");
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error removing executor with host, port : "
+          + "(" + host + "," + port + ")", e);
+    }
+  }
+
+  /**
+   * JDBC ResultSetHandler to fetch records from executors table
+   */
+  public static class FetchExecutorHandler implements
+      ResultSetHandler<List<Executor>> {
+
+    static String FETCH_ALL_EXECUTORS =
+        "SELECT id, host, port, active FROM executors";
+    static String FETCH_ACTIVE_EXECUTORS =
+        "SELECT id, host, port, active FROM executors where active=true";
+    static String FETCH_EXECUTOR_BY_ID =
+        "SELECT id, host, port, active FROM executors where id=?";
+    static String FETCH_EXECUTOR_BY_HOST_PORT =
+        "SELECT id, host, port, active FROM executors where host=? AND port=?";
+    static String FETCH_EXECUTION_EXECUTOR =
+        "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+            + " executors ex INNER JOIN execution_flows ef "
+            + "on ex.id = ef.executor_id  where exec_id=?";
+
+    @Override
+    public List<Executor> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<Executor> executors = new ArrayList<>();
+      do {
+        final int id = rs.getInt(1);
+        final String host = rs.getString(2);
+        final int port = rs.getInt(3);
+        final boolean active = rs.getBoolean(4);
+        final Executor executor = new Executor(id, host, port, active);
+        executors.add(executor);
+      } while (rs.next());
+
+      return executors;
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 207796c..9c1d579 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -58,13 +58,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     ExecutorLoader {
   private static final Logger logger = Logger
       .getLogger(JdbcExecutorLoader.class);
-  private final ExecutionFlowDBManager executionFlowDBManager;
+  private final ExecutionFlowDao executionFlowDao;
+  private final ExecutorDao executorDao;
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
-  public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics, final ExecutionFlowDBManager executionFlowDBManager) {
+  public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics,
+                            final ExecutionFlowDao executionFlowDao,
+                            final ExecutorDao executorDao) {
     super(props, commonMetrics);
-    this.executionFlowDBManager = executionFlowDBManager;
+    this.executionFlowDao = executionFlowDao;
+    this.executorDao = executorDao;
   }
 
   public EncodingType getDefaultEncodingType() {
@@ -78,19 +82,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    this.executionFlowDBManager.uploadExecutableFlow(flow);
+    this.executionFlowDao.uploadExecutableFlow(flow);
   }
 
   @Override
   public void updateExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    this.executionFlowDBManager.updateExecutableFlow(flow);
+    this.executionFlowDao.updateExecutableFlow(flow);
   }
 
   @Override
   public ExecutableFlow fetchExecutableFlow(final int id)
       throws ExecutorManagerException {
-    return this.executionFlowDBManager.fetchExecutableFlow(id);
+    return this.executionFlowDao.fetchExecutableFlow(id);
   }
 
   /**
@@ -220,26 +224,26 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
                                                final int skip, final int num) throws ExecutorManagerException {
-    return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num);
+    return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
                                                final int skip, final int num, final Status status) throws ExecutorManagerException {
-    return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num, status);
+    return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num, status);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
       throws ExecutorManagerException {
-    return this.executionFlowDBManager.fetchFlowHistory(skip,num);
+    return this.executionFlowDao.fetchFlowHistory(skip,num);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final String projContain,
                                                final String flowContains, final String userNameContains, final int status, final long startTime,
                                                final long endTime, final int skip, final int num) throws ExecutorManagerException {
-    return this.executionFlowDBManager.fetchFlowHistory(projContain, flowContains,
+    return this.executionFlowDao.fetchFlowHistory(projContain, flowContains,
         userNameContains, status, startTime, endTime, skip, num);
   }
 
@@ -648,16 +652,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
-    try {
-      final List<Executor> executors =
-        runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
-      return executors;
-    } catch (final Exception e) {
-      throw new ExecutorManagerException("Error fetching executors", e);
-    }
+    return this.executorDao.fetchAllExecutors();
   }
 
   /**
@@ -668,17 +663,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
-    try {
-      final List<Executor> executors =
-        runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
-          executorHandler);
-      return executors;
-    } catch (final Exception e) {
-      throw new ExecutorManagerException("Error fetching active executors", e);
-    }
+    return this.executorDao.fetchActiveExecutors();
   }
 
   /**
@@ -689,22 +674,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public Executor fetchExecutor(final String host, final int port)
     throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
-    try {
-      final List<Executor> executors =
-        runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
-          executorHandler, host, port);
-      if (executors.isEmpty()) {
-        return null;
-      } else {
-        return executors.get(0);
-      }
-    } catch (final Exception e) {
-      throw new ExecutorManagerException(String.format(
-        "Error fetching executor %s:%d", host, port), e);
-    }
+    return this.executorDao.fetchExecutor(host, port);
   }
 
   /**
@@ -714,22 +684,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-
-    try {
-      final List<Executor> executors =
-        runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
-          executorHandler, executorId);
-      if (executors.isEmpty()) {
-        return null;
-      } else {
-        return executors.get(0);
-      }
-    } catch (final Exception e) {
-      throw new ExecutorManagerException(String.format(
-        "Error fetching executor with id: %d", executorId), e);
-    }
+    return this.executorDao.fetchExecutor(executorId);
   }
 
   /**
@@ -764,30 +719,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public Executor addExecutor(final String host, final int port)
     throws ExecutorManagerException {
-    // verify, if executor already exists
-    Executor executor = fetchExecutor(host, port);
-    if (executor != null) {
-      throw new ExecutorManagerException(String.format(
-        "Executor %s:%d already exist", host, port));
-    }
-    // add new executor
-    addExecutorHelper(host, port);
-    // fetch newly added executor
-    executor = fetchExecutor(host, port);
-
-    return executor;
-  }
-
-  private void addExecutorHelper(final String host, final int port)
-    throws ExecutorManagerException {
-    final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
-    final QueryRunner runner = createQueryRunner();
-    try {
-      runner.update(INSERT, host, port);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(String.format("Error adding %s:%d ",
-        host, port), e);
-    }
+    return this.executorDao.addExecutor(host, port);
   }
 
 
@@ -798,18 +730,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
-    final String DELETE = "DELETE FROM executors WHERE host=? AND port=?";
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final int rows = runner.update(DELETE, host, port);
-      if (rows == 0) {
-        throw new ExecutorManagerException("No executor with host, port :"
-            + "(" + host + "," + port + ")");
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error removing executor with host, port : "
-          + "(" + host + "," + port + ")", e);
-    }
+    this.executorDao.removeExecutor(host, port);
   }
 
   /**
@@ -902,21 +823,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public Executor fetchExecutorByExecutionId(final int executionId)
     throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutorHandler executorHandler = new FetchExecutorHandler();
-    Executor executor = null;
-    try {
-      final List<Executor> executors =
-        runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
-          executorHandler, executionId);
-      if (executors.size() > 0) {
-        executor = executors.get(0);
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(
-        "Error fetching executor for exec_id : " + executionId, e);
-    }
-    return executor;
+    return this.executorDao.fetchExecutorByExecutionId(executionId);
   }
 
   @Override
@@ -1406,45 +1313,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  /**
-   * JDBC ResultSetHandler to fetch records from executors table
-   */
-  private static class FetchExecutorHandler implements
-    ResultSetHandler<List<Executor>> {
-    private static final String FETCH_ALL_EXECUTORS =
-      "SELECT id, host, port, active FROM executors";
-    private static final String FETCH_ACTIVE_EXECUTORS =
-      "SELECT id, host, port, active FROM executors where active=true";
-    private static final String FETCH_EXECUTOR_BY_ID =
-      "SELECT id, host, port, active FROM executors where id=?";
-    private static final String FETCH_EXECUTOR_BY_HOST_PORT =
-      "SELECT id, host, port, active FROM executors where host=? AND port=?";
-    private static final String FETCH_EXECUTION_EXECUTOR =
-      "SELECT ex.id, ex.host, ex.port, ex.active FROM "
-        + " executors ex INNER JOIN execution_flows ef "
-        + "on ex.id = ef.executor_id  where exec_id=?";
-
-    @Override
-    public List<Executor> handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return Collections.<Executor> emptyList();
-      }
-
-      final List<Executor> executors = new ArrayList<>();
-      do {
-        final int id = rs.getInt(1);
-        final String host = rs.getString(2);
-        final int port = rs.getInt(3);
-        final boolean active = rs.getBoolean(4);
-        final Executor executor = new Executor(id, host, port, active);
-        executors.add(executor);
-      } while (rs.next());
-
-      return executors;
-    }
-  }
-
-  /**
+ /**
    * JDBC ResultSetHandler to fetch records from executor_events table
    */
   private static class ExecutorLogsResultHandler implements
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 9da2af7..c57c268 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1056,7 +1056,7 @@ public class JdbcExecutorLoaderTest {
 
     //TODO kunkun-tang: temporary work-around here. This Test is to be deprecated.
     return new JdbcExecutorLoader(props,
-        new CommonMetrics(new MetricsManager(new MetricRegistry())), null);
+        new CommonMetrics(new MetricsManager(new MetricRegistry())), null, null);
   }
 
   private boolean isTestSetup() {
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index efa6c12..6718abc 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -29,8 +29,9 @@ import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.db.DatabaseOperator;
 import azkaban.executor.AlerterHolder;
-import azkaban.executor.ExecutionFlowDBManager;
+import azkaban.executor.ExecutionFlowDao;
 import azkaban.executor.Executor;
+import azkaban.executor.ExecutorDao;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
 import azkaban.project.ProjectLoader;
@@ -122,7 +123,7 @@ public class AzkabanWebServerTest {
     executorLoader.updateExecutor(executor);
 
     assertNotNull(injector.getInstance(AzkabanWebServer.class));
-    assertNotNull(injector.getInstance(ExecutionFlowDBManager.class));
+    assertNotNull(injector.getInstance(ExecutionFlowDao.class));
 
     //Test if triggermanager is singletonly guiced. If not, the below test will fail.
     assertSingleton(ExecutorLoader.class, injector);
@@ -135,7 +136,8 @@ public class AzkabanWebServerTest {
     assertSingleton(TriggerManager.class, injector);
     assertSingleton(AlerterHolder.class, injector);
     assertSingleton(Emailer.class, injector);
-    assertSingleton(ExecutionFlowDBManager.class, injector);
+    assertSingleton(ExecutionFlowDao.class, injector);
+    assertSingleton(ExecutorDao.class, injector);
 
     SERVICE_PROVIDER.unsetInjector();
   }