azkaban-developers

Merge pull request #1 from azkaban/multipleexecutors Merge

8/28/2015 3:01:57 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
new file mode 100644
index 0000000..31070bf
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.utils.Utils;
+
+/**
+ * Class to represent an AzkabanExecutorServer details for ExecutorManager
+ *
+ * @author gaggarwa
+ */
+public class Executor {
+  private final int id;
+  private final String host;
+  private final int port;
+  private boolean isActive;
+
+  // TODO: ExecutorStats to be added
+
+  /**
+   * <pre>
+   * Construct an Executor Object
+   * Note: port should be a within unsigned 2 byte
+   * integer range
+   * </pre>
+   *
+   * @param executor_id
+   * @param executor_host
+   * @param executor_port
+   */
+  public Executor(int id, String host, int port, boolean isActive) {
+    if (!Utils.isValidPort(port)) {
+      throw new IllegalArgumentException(String.format(
+        "Invalid port number %d for host %s, executor_id %d", port, host, id));
+    }
+
+    this.id = id;
+    this.host = host;
+    this.port = port;
+    this.isActive = isActive;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (isActive ? 1231 : 1237);
+    result = prime * result + ((host == null) ? 0 : host.hashCode());
+    result = prime * result + id;
+    result = prime * result + port;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof Executor))
+      return false;
+    Executor other = (Executor) obj;
+    if (isActive != other.isActive)
+      return false;
+    if (host == null) {
+      if (other.host != null)
+        return false;
+    } else if (!host.equals(other.host))
+      return false;
+    if (id != other.id)
+      return false;
+    if (port != other.port)
+      return false;
+    return true;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public boolean isActive() {
+    return isActive;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public void setActive(boolean isActive) {
+    this.isActive = isActive;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6dc0e11..1bfa910 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.util.List;
 import java.util.Map;
 
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -47,6 +48,123 @@ public interface ExecutorLoader {
       String flowContains, String userNameContains, int status, long startData,
       long endData, int skip, int num) throws ExecutorManagerException;
 
+  /**
+   * <pre>
+   * Fetch all executors from executors table
+   * Note:-
+   * 1 throws an Exception in case of a SQL issue
+   * 2 returns an empty list in case of no executor
+   * </pre>
+   *
+   * @return List<Executor>
+   * @throws ExecutorManagerException
+   */
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch all executors from executors table with active = true
+   * Note:-
+   * 1 throws an Exception in case of a SQL issue
+   * 2 returns an empty list in case of no active executor
+   * </pre>
+   *
+   * @return List<Executor>
+   * @throws ExecutorManagerException
+   */
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch executor from executors with a given (host, port)
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found
+   * with the given (host,port)
+   * </pre>
+   *
+   * @return Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor fetchExecutor(String host, int port)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch executor from executors with a given executorId
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @return Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * create an executor and insert in executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if a executor with (host, port) already exist
+   * 3. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @return Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor addExecutor(String host, int port)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * create an executor and insert in executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if there is no executor with the given id
+   * 3. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @param executorId
+   * @throws ExecutorManagerException
+   */
+  public void updateExecutor(Executor executor) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Log an event in executor_event audit table Note:- throws an Exception in
+   * case of a SQL issue
+   * Note: throws an Exception in case of a SQL issue
+   * </pre>
+   *
+   * @param executor
+   * @param type
+   * @param user
+   * @param message
+   * @return isSuccess
+   */
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * This method is to fetch events recorded in executor audit table, inserted
+   * by postExecutorEvents with a given executor, starting from skip
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. Returns an empty list in case of no events
+   * </pre>
+   *
+   * @param executor
+   * @param num
+   * @param skip
+   * @return List<ExecutorLogEvent>
+   * @throws ExecutorManagerException
+   */
+  List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int offset) throws ExecutorManagerException;
+
   public void addActiveExecutableReference(ExecutionReference ref)
       throws ExecutorManagerException;
 
@@ -105,5 +223,4 @@ public interface ExecutorLoader {
 
   public int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
new file mode 100644
index 0000000..5598a0d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+
+/**
+ * Class to represent events on Azkaban executors
+ *
+ * @author gaggarwa
+ */
+public class ExecutorLogEvent {
+  /**
+   * Log event type messages. Do not change the numeric representation of each
+   * enum. Only represent from 0 to 255 different codes.
+   */
+  public static enum EventType {
+    ERROR(128), HOST_UPDATE(1), PORT_UPDATE(2), ACTIVATION(3), INACTIVATION(4),
+    CREATED(5);
+
+    private int numVal;
+
+    EventType(int numVal) {
+      this.numVal = numVal;
+    }
+
+    public int getNumVal() {
+      return numVal;
+    }
+
+    public static EventType fromInteger(int x)
+        throws IllegalArgumentException {
+      switch (x) {
+      case 1:
+        return HOST_UPDATE;
+      case 2:
+        return PORT_UPDATE;
+      case 3:
+        return ACTIVATION;
+      case 4:
+        return INACTIVATION;
+      case 5:
+        return CREATED;
+      case 128:
+        return ERROR;
+      default:
+        throw new IllegalArgumentException(String.format(
+          "inalid status code %d", x));
+      }
+    }
+  }
+
+  private final int executorId;
+  private final String user;
+  private final Date time;
+  private final EventType type;
+  private final String message;
+
+  public ExecutorLogEvent(int executorId, String user, Date time,
+    EventType type, String message) {
+    this.executorId = executorId;
+    this.user = user;
+    this.time = time;
+    this.type = type;
+    this.message = message;
+  }
+
+  public int getExecutorId() {
+    return executorId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public Date getTime() {
+    return time;
+  }
+
+  public EventType getType() {
+    return type;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 8d230ba..2a1cf26 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.annotation.Inherited;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.database.AbstractJdbcLoader;
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.GZIPUtils;
@@ -773,6 +776,206 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return connection;
   }
 
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+   */
+  @Override
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+      return executors;
+    } catch (Exception e) {
+      throw new ExecutorManagerException("Error fetching executors", e);
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+   */
+  @Override
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
+          executorHandler);
+      return executors;
+    } catch (Exception e) {
+      throw new ExecutorManagerException("Error fetching active executors", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
+   */
+  @Override
+  public Executor fetchExecutor(String host, int port)
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+          executorHandler, host, port);
+      if (executors.isEmpty()) {
+        return null;
+      } else {
+        return executors.get(0);
+      }
+    } catch (Exception e) {
+      throw new ExecutorManagerException(String.format(
+        "Error fetching executor %s:%d", host, port), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
+   */
+  @Override
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+          executorHandler, executorId);
+      if (executors.isEmpty()) {
+        return null;
+      } else {
+        return executors.get(0);
+      }
+    } catch (Exception e) {
+      throw new ExecutorManagerException(String.format(
+        "Error fetching executor with id: %d", executorId), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
+   */
+  @Override
+  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE executors SET host=?, port=?, active=? where id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows =
+        runner.update(UPDATE, executor.getHost(), executor.getPort(),
+          executor.isActive(), executor.getId());
+      if (rows == 0) {
+        throw new ExecutorManagerException("No executor with id :"
+          + executor.getId());
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error inactivating executor "
+        + executor.getId(), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
+   */
+  @Override
+  public Executor addExecutor(String host, int port)
+    throws ExecutorManagerException {
+    // verify, if executor already exists
+    Executor executor = fetchExecutor(host, port);
+    if (executor != null) {
+      throw new ExecutorManagerException(String.format(
+        "Executor %s:%d already exist", host, port));
+    }
+    // add new executor
+    addExecutorHelper(host, port);
+    // fetch newly added executor
+    executor = fetchExecutor(host, port);
+
+    return executor;
+  }
+
+  private void addExecutorHelper(String host, int port)
+    throws ExecutorManagerException {
+    final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
+    QueryRunner runner = createQueryRunner();
+    try {
+      runner.update(INSERT, host, port);
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+        host, port), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
+   *      azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
+   *      java.lang.String)
+   */
+  @Override
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException{
+    QueryRunner runner = createQueryRunner();
+
+    final String INSERT_PROJECT_EVENTS =
+      "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+    Date updateDate = new Date();
+    try {
+      runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+        updateDate, user, message);
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Failed to post executor event", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#getExecutorEvents(azkaban.executor.Executor,
+   *      int, int)
+   */
+  @Override
+  public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int offset) throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+
+    ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+    List<ExecutorLogEvent> events = null;
+    try {
+      events =
+        runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+          logHandler, executor.getId(), num, offset);
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(
+        "Failed to fetch events for executor id : " + executor.getId(), e);
+    }
+
+    return events;
+  }
+
   private static class LastInsertID implements ResultSetHandler<Long> {
     private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
 
@@ -1134,4 +1337,71 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
     return updateNum;
   }
+
+  /**
+   * JDBC ResultSetHandler to fetch records from executors table
+   */
+  private static class FetchExecutorHandler implements
+    ResultSetHandler<List<Executor>> {
+    private static String FETCH_ALL_EXECUTORS =
+      "SELECT id, host, port, active FROM executors";
+    private static String FETCH_ACTIVE_EXECUTORS =
+      "SELECT id, host, port, active FROM executors where active=true";
+    private static String FETCH_EXECUTOR_BY_ID =
+      "SELECT id, host, port, active FROM executors where id=?";
+    private static String FETCH_EXECUTOR_BY_HOST_PORT =
+      "SELECT id, host, port, active FROM executors where host=? AND port=?";
+
+    @Override
+    public List<Executor> handle(ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.<Executor> emptyList();
+      }
+
+      List<Executor> executors = new ArrayList<Executor>();
+      do {
+        int id = rs.getInt(1);
+        String host = rs.getString(2);
+        int port = rs.getInt(3);
+        boolean active = rs.getBoolean(4);
+        Executor executor = new Executor(id, host, port, active);
+        executors.add(executor);
+      } while (rs.next());
+
+      return executors;
+    }
+  }
+
+  /**
+   * JDBC ResultSetHandler to fetch records from executor_events table
+   */
+  private static class ExecutorLogsResultHandler implements
+    ResultSetHandler<List<ExecutorLogEvent>> {
+    private static String SELECT_EXECUTOR_EVENTS_ORDER =
+      "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+        + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
+
+    @Override
+    public List<ExecutorLogEvent> handle(ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.<ExecutorLogEvent> emptyList();
+      }
+
+      ArrayList<ExecutorLogEvent> events = new ArrayList<ExecutorLogEvent>();
+      do {
+        int executorId = rs.getInt(1);
+        int eventType = rs.getInt(2);
+        Date eventTime = rs.getDate(3);
+        String username = rs.getString(4);
+        String message = rs.getString(5);
+
+        ExecutorLogEvent event =
+          new ExecutorLogEvent(executorId, username, eventTime,
+            EventType.fromInteger(eventType), message);
+        events.add(event);
+      } while (rs.next());
+
+      return events;
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 134602b..e8e2dac 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -172,10 +172,10 @@ public class JavaProcessJob extends ProcessJob {
 
       if (xmx > sizeMaxXmx) {
         throw new Exception(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
-                getId(), maxXms));
+                getId(), maxXmx));
       }
     }
 
     return new Pair<Long, Long>(xms, xmx);
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7227b0b..138b80f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -111,6 +111,19 @@ public class Utils {
     System.exit(exitCode);
   }
 
+  /**
+   * Tests whether a port is valid or not
+   *
+   * @param port
+   * @return true, if port is valid
+   */
+  public static boolean isValidPort(int port) {
+    if (port >= 1 && port <= 65535) {
+      return true;
+    }
+    return false;
+  }
+
   public static File createTempDir() {
     return createTempDir(new File(System.getProperty("java.io.tmpdir")));
   }
@@ -410,7 +423,7 @@ public class Utils {
   }
 
   /**
-   * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000 
+   * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
    * @return : long value of memory amount in kb
    */
   public static long parseMemString(String strMemSize) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index f453f0c..71be50f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
 import javax.sql.DataSource;
@@ -30,17 +32,18 @@ import javax.sql.DataSource;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-
 import org.joda.time.DateTime;
-
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
+import azkaban.user.User;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
@@ -117,12 +120,31 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    DbUtils.closeQuietly(connection);
+    try {
+      runner.query(connection, "SELECT COUNT(1) FROM executors",
+          countHandler);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
 
-    clearDB();
+    try {
+      runner.query(connection, "SELECT COUNT(1) FROM executor_events",
+          countHandler);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
+
+    DbUtils.closeQuietly(connection);
   }
 
-  private static void clearDB() {
+  @After
+  public void clearDB() {
     if (!testDBExists) {
       return;
     }
@@ -178,6 +200,24 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
+    try {
+      runner.update(connection, "DELETE FROM executors");
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
+
+    try {
+      runner.update(connection, "DELETE FROM executor_events");
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
+
     DbUtils.closeQuietly(connection);
   }
 
@@ -186,6 +226,7 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
+    Assert.assertEquals(1, 0);
 
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow = createExecutableFlow("exec1");
@@ -293,6 +334,252 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+
+  /* Test all executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = loader.fetchAllExecutors();
+    Assert.assertEquals(executors.size(), 0);
+  }
+
+  /* Test active executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyActiveExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = loader.fetchActiveExecutors();
+    Assert.assertEquals(executors.size(), 0);
+  }
+
+  /* Test missing executor fetch with search by executor id */
+  @Test
+  public void testFetchMissingExecutorId() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.fetchExecutor(0);
+    Assert.assertEquals(executor, null);
+  }
+
+  /* Test missing executor fetch with search by host:port */
+  @Test
+  public void testFetchMissingExecutorHostPort() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.fetchExecutor("localhost", 12345);
+    Assert.assertEquals(executor, null);
+  }
+
+  /* Test executor events fetch from with no logged executor */
+  @Test
+  public void testFetchEmptyExecutorEvents() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Executor executor = new Executor(1, "localhost", 12345, true);
+    List<ExecutorLogEvent> executorEvents =
+      loader.getExecutorEvents(executor, 5, 0);
+    Assert.assertEquals(executorEvents.size(), 0);
+  }
+
+  /* Test logging ExecutorEvents */
+  @Test
+  public void testExecutorEvents() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    int skip = 1;
+    User user = new User("testUser");
+    Executor executor = new Executor(1, "localhost", 12345, true);
+    String message = "My message ";
+    EventType[] events =
+      { EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
+
+    for (EventType event : events) {
+      loader.postExecutorEvent(executor, event, user.getUserId(),
+        message + event.getNumVal());
+    }
+
+    List<ExecutorLogEvent> eventLogs =
+      loader.getExecutorEvents(executor, 10, skip);
+    Assert.assertTrue(eventLogs.size() == 2);
+
+    for (int index = 0; index < eventLogs.size(); ++index) {
+      ExecutorLogEvent eventLog = eventLogs.get(index);
+      Assert.assertEquals(eventLog.getExecutorId(), executor.getId());
+      Assert.assertEquals(eventLog.getUser(), user.getUserId());
+      Assert.assertEquals(eventLog.getType(), events[index + skip]);
+      Assert.assertEquals(eventLog.getMessage(),
+        message + events[index + skip].getNumVal());
+    }
+  }
+
+  /* Test to add duplicate executors */
+  @Test
+  public void testDuplicateAddExecutor() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      String host = "localhost";
+      int port = 123456;
+      loader.addExecutor(host, port);
+      loader.addExecutor(host, port);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test to try update a non-existent executor */
+  @Test
+  public void testMissingExecutorUpdate() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      Executor executor = new Executor(1, "localhost", 1234, true);
+      loader.updateExecutor(executor);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test add & fetch by Id Executors */
+  @Test
+  public void testSingleExecutorFetchById() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+    for (Executor executor : executors) {
+      Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+      Assert.assertEquals(executor, fetchedExecutor);
+    }
+  }
+
+  /* Test fetch all executors */
+  @Test
+  public void testFetchAllExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+
+    executors.get(0).setActive(false);
+    loader.updateExecutor(executors.get(0));
+
+    List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+    Assert.assertEquals(executors.size(), fetchedExecutors.size());
+
+    Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+  }
+
+  /* Test fetch only active executors */
+  @Test
+  public void testFetchActiveExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+
+    executors.get(0).setActive(false);
+    loader.updateExecutor(executors.get(0));
+
+    List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
+    Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
+    executors.remove(0);
+
+    Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+  }
+
+  /* Test add & fetch by host:port Executors */
+  @Test
+  public void testSingleExecutorFetchHostPort() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+    for (Executor executor : executors) {
+      Executor fetchedExecutor =
+        loader.fetchExecutor(executor.getHost(), executor.getPort());
+      Assert.assertEquals(executor, fetchedExecutor);
+    }
+  }
+
+  /* Helper method used in methods testing jdbc interface for executors table */
+  private List<Executor> addTestExecutors(ExecutorLoader loader)
+    throws ExecutorManagerException {
+    List<Executor> executors = new ArrayList<Executor>();
+    executors.add(loader.addExecutor("localhost1", 12345));
+    executors.add(loader.addExecutor("localhost2", 12346));
+    executors.add(loader.addExecutor("localhost1", 12347));
+    return executors;
+  }
+
+  /* Test Executor Inactivation */
+  @Test
+  public void testExecutorInactivation() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.addExecutor("localhost1", 12345);
+    Assert.assertTrue(executor.isActive());
+
+    executor.setActive(false);
+    loader.updateExecutor(executor);
+
+    Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+    Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
+    Assert.assertEquals(executor.getId(), fetchedExecutor.getId());
+    Assert.assertEquals(executor.getPort(), fetchedExecutor.getPort());
+    Assert.assertFalse(fetchedExecutor.isActive());
+  }
+
+  /* Test Executor reactivation */
+  @Test
+  public void testExecutorActivation() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.addExecutor("localhost1", 12345);
+    Assert.assertTrue(executor.isActive());
+
+    executor.setActive(false);
+    loader.updateExecutor(executor);
+    Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+    Assert.assertFalse(fetchedExecutor.isActive());
+
+    executor.setActive(true);
+    loader.updateExecutor(executor);
+    fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+    Assert.assertEquals(executor, fetchedExecutor);
+  }
+
   @Test
   public void testActiveReference() throws Exception {
     if (!isTestSetup()) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index b9ad178..f2ffee8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -17,10 +17,13 @@
 package azkaban.executor;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -36,6 +39,10 @@ public class MockExecutorLoader implements ExecutorLoader {
   HashMap<String, Integer> jobUpdateCount = new HashMap<String, Integer>();
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
       new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+  List<Executor> executors = new ArrayList<Executor>();
+  int executorIdCounter = 0;
+  Map<Integer, ArrayList<ExecutorLogEvent>> executorEvents =
+    new HashMap<Integer, ArrayList<ExecutorLogEvent>>();
 
   @Override
   public void uploadExecutableFlow(ExecutableFlow flow)
@@ -249,4 +256,82 @@ public class MockExecutorLoader implements ExecutorLoader {
 
   }
 
+  @Override
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+    List<Executor> activeExecutors = new ArrayList<Executor>();
+    for (Executor executor : executors) {
+      if (executor.isActive()) {
+        activeExecutors.add(executor);
+      }
+    }
+    return activeExecutors;
+  }
+
+  @Override
+  public Executor fetchExecutor(String host, int port)
+    throws ExecutorManagerException {
+    for (Executor executor : executors) {
+      if (executor.getHost().equals(host) && executor.getPort() == port) {
+        return executor;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+    for (Executor executor : executors) {
+      if (executor.getId() == executorId) {
+        return executor;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Executor addExecutor(String host, int port)
+    throws ExecutorManagerException {
+    if (fetchExecutor(host, port) != null) {
+
+    }
+    executorIdCounter++;
+    Executor executor = new Executor(executorIdCounter, host, port, true);
+    return executor;
+  }
+
+  @Override
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException {
+    ExecutorLogEvent event =
+      new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
+
+    if (!executorEvents.containsKey(executor.getId())) {
+      executorEvents.put(executor.getId(), new ArrayList<ExecutorLogEvent>());
+    }
+
+    executorEvents.get(executor.getId()).add(event);
+  }
+
+  @Override
+  public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int skip) throws ExecutorManagerException {
+    if (!executorEvents.containsKey(executor.getId())) {
+      List<ExecutorLogEvent> events = executorEvents.get(executor.getId());
+      return events.subList(skip, Math.min(num + skip - 1, events.size() - 1));
+    }
+    return null;
+  }
+
+  @Override
+  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+    Executor oldExecutor = fetchExecutor(executor.getId());
+    executors.remove(oldExecutor);
+    executors.add(executor);
+  }
+
+  @Override
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    return executors;
+  }
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
new file mode 100644
index 0000000..4ea0930
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for azkaban.utils.Utils
+ */
+public class UtilsTest {
+
+  /* Test negative port case */
+  @Test
+  public void testNegativePort() {
+    Assert.assertFalse(Utils.isValidPort(-1));
+    Assert.assertFalse(Utils.isValidPort(-10));
+  }
+
+  /* Test zero port case */
+  @Test
+  public void testZeroPort() {
+    Assert.assertFalse(Utils.isValidPort(0));
+  }
+
+  /* Test port beyond limit */
+  @Test
+  public void testOverflowPort() {
+    Assert.assertFalse(Utils.isValidPort(70000));
+    Assert.assertFalse(Utils.isValidPort(65536));
+  }
+
+  /* Test happy isValidPort case*/
+  @Test
+  public void testValidPort() {
+    Assert.assertTrue(Utils.isValidPort(1023));
+    Assert.assertTrue(Utils.isValidPort(10000));
+    Assert.assertTrue(Utils.isValidPort(3030));
+    Assert.assertTrue(Utils.isValidPort(1045));
+  }
+}
diff --git a/azkaban-sql/src/sql/create.executor_events.sql b/azkaban-sql/src/sql/create.executor_events.sql
new file mode 100644
index 0000000..1ff4799
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executor_events.sql
@@ -0,0 +1,9 @@
+CREATE TABLE executor_events (
+  executor_id INT NOT NULL,
+  event_type TINYINT NOT NULL,
+  event_time DATETIME NOT NULL,
+  username VARCHAR(64),
+  message VARCHAR(512)
+);
+
+CREATE INDEX executor_log ON executor_events(executor_id, event_time);
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/create.executors.sql b/azkaban-sql/src/sql/create.executors.sql
new file mode 100644
index 0000000..8e59ce3
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executors.sql
@@ -0,0 +1,10 @@
+CREATE TABLE executors (
+  id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+  host VARCHAR(64) NOT NULL,
+  port INT NOT NULL,
+  active BOOLEAN DEFAULT true,
+  UNIQUE (host, port),
+  UNIQUE INDEX executor_id (id)
+);
+
+CREATE INDEX executor_connection ON executors(host, port);