azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 8ffb8f7..31070bf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -16,6 +16,8 @@
 
 package azkaban.executor;
 
+import azkaban.utils.Utils;
+
 /**
  * Class to represent an AzkabanExecutorServer details for ExecutorManager
  *
@@ -23,27 +25,40 @@ package azkaban.executor;
  */
 public class Executor {
   private final int id;
-  private String host;
-  private int port;
-  private boolean active = true;
+  private final String host;
+  private final int port;
+  private boolean isActive;
 
   // TODO: ExecutorStats to be added
 
-  public Executor(int id) {
-    this.id = id;
-  }
+  /**
+   * <pre>
+   * Construct an Executor Object
+   * Note: port should be a within unsigned 2 byte
+   * integer range
+   * </pre>
+   *
+   * @param executor_id
+   * @param executor_host
+   * @param executor_port
+   */
+  public Executor(int id, String host, int port, boolean isActive) {
+    if (!Utils.isValidPort(port)) {
+      throw new IllegalArgumentException(String.format(
+        "Invalid port number %d for host %s, executor_id %d", port, host, id));
+    }
 
-  public Executor(int id, String host, int port) {
     this.id = id;
     this.host = host;
     this.port = port;
+    this.isActive = isActive;
   }
 
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + (active ? 1231 : 1237);
+    result = prime * result + (isActive ? 1231 : 1237);
     result = prime * result + ((host == null) ? 0 : host.hashCode());
     result = prime * result + id;
     result = prime * result + port;
@@ -59,7 +74,7 @@ public class Executor {
     if (!(obj instanceof Executor))
       return false;
     Executor other = (Executor) obj;
-    if (active != other.active)
+    if (isActive != other.isActive)
       return false;
     if (host == null) {
       if (other.host != null)
@@ -77,28 +92,19 @@ public class Executor {
     return host;
   }
 
-  public void setHost(String host) {
-    this.host = host;
-  }
-
   public int getPort() {
     return port;
   }
 
-  public void setPort(int port) {
-    this.port = port;
-  }
-
   public boolean isActive() {
-    return active;
-  }
-
-  public void setActive(boolean active) {
-    this.active = active;
+    return isActive;
   }
 
   public int getId() {
     return id;
   }
 
+  public void setActive(boolean isActive) {
+    this.isActive = isActive;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index e2d4fac..1bfa910 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -49,7 +49,25 @@ public interface ExecutorLoader {
       long endData, int skip, int num) throws ExecutorManagerException;
 
   /**
+   * <pre>
+   * Fetch all executors from executors table
+   * Note:-
+   * 1 throws an Exception in case of a SQL issue
+   * 2 returns an empty list in case of no executor
+   * </pre>
+   *
+   * @return List<Executor>
+   * @throws ExecutorManagerException
+   */
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+
+  /**
+   * <pre>
    * Fetch all executors from executors table with active = true
+   * Note:-
+   * 1 throws an Exception in case of a SQL issue
+   * 2 returns an empty list in case of no active executor
+   * </pre>
    *
    * @return List<Executor>
    * @throws ExecutorManagerException
@@ -57,7 +75,13 @@ public interface ExecutorLoader {
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
 
   /**
+   * <pre>
    * Fetch executor from executors with a given (host, port)
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found
+   * with the given (host,port)
+   * </pre>
    *
    * @return Executor
    * @throws ExecutorManagerException
@@ -66,7 +90,12 @@ public interface ExecutorLoader {
     throws ExecutorManagerException;
 
   /**
+   * <pre>
    * Fetch executor from executors with a given executorId
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executorId
+   * </pre>
    *
    * @return Executor
    * @throws ExecutorManagerException
@@ -74,8 +103,13 @@ public interface ExecutorLoader {
   public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
 
   /**
-   * create an executor and insert in executors table. Fails, if a executor with
-   * (host, port) already exist
+   * <pre>
+   * create an executor and insert in executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if a executor with (host, port) already exist
+   * 3. return null when no executor is found with the given executorId
+   * </pre>
    *
    * @return Executor
    * @throws ExecutorManagerException
@@ -84,24 +118,25 @@ public interface ExecutorLoader {
     throws ExecutorManagerException;
 
   /**
-   * inactivate an executor in executors table
-   *
-   * @param executorId
-   * @throws ExecutorManagerException
-   */
-  public void inactivateExecutor(int executorId)
-    throws ExecutorManagerException;
-
-  /**
-   * Re-activate an executor in executors table
+   * <pre>
+   * create an executor and insert in executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if there is no executor with the given id
+   * 3. return null when no executor is found with the given executorId
+   * </pre>
    *
    * @param executorId
    * @throws ExecutorManagerException
    */
-  void activateExecutor(int executorId) throws ExecutorManagerException;
+  public void updateExecutor(Executor executor) throws ExecutorManagerException;
 
   /**
-   * Log an event in executor_event audit table
+   * <pre>
+   * Log an event in executor_event audit table Note:- throws an Exception in
+   * case of a SQL issue
+   * Note: throws an Exception in case of a SQL issue
+   * </pre>
    *
    * @param executor
    * @param type
@@ -109,11 +144,17 @@ public interface ExecutorLoader {
    * @param message
    * @return isSuccess
    */
-  boolean postEvent(Executor executor, EventType type, String user,
-    String message);
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException;
 
   /**
-   * Fetch num events associated with a given executor, starting from skip
+   * <pre>
+   * This method is to fetch events recorded in executor audit table, inserted
+   * by postExecutorEvents with a given executor, starting from skip
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. Returns an empty list in case of no events
+   * </pre>
    *
    * @param executor
    * @param num
@@ -121,8 +162,8 @@ public interface ExecutorLoader {
    * @return List<ExecutorLogEvent>
    * @throws ExecutorManagerException
    */
-  List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num, int skip)
-    throws ExecutorManagerException;
+  List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int offset) throws ExecutorManagerException;
 
   public void addActiveExecutableReference(ExecutionReference ref)
       throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
index 7842441..5598a0d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -42,7 +42,8 @@ public class ExecutorLogEvent {
       return numVal;
     }
 
-    public static EventType fromInteger(int x) {
+    public static EventType fromInteger(int x)
+        throws IllegalArgumentException {
       switch (x) {
       case 1:
         return HOST_UPDATE;
@@ -57,7 +58,8 @@ public class ExecutorLogEvent {
       case 128:
         return ERROR;
       default:
-        return ERROR;
+        throw new IllegalArgumentException(String.format(
+          "inalid status code %d", x));
       }
     }
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 9b12724..2a1cf26 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -784,6 +784,26 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
    */
   @Override
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+      return executors;
+    } catch (Exception e) {
+      throw new ExecutorManagerException("Error fetching executors", e);
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+   */
+  @Override
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
     QueryRunner runner = createQueryRunner();
     FetchExecutorHandler executorHandler = new FetchExecutorHandler();
@@ -793,7 +813,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
           executorHandler);
       return executors;
-    } catch (SQLException e) {
+    } catch (Exception e) {
       throw new ExecutorManagerException("Error fetching active executors", e);
     }
   }
@@ -818,7 +838,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       } else {
         return executors.get(0);
       }
-    } catch (SQLException e) {
+    } catch (Exception e) {
       throw new ExecutorManagerException(String.format(
         "Error fetching executor %s:%d", host, port), e);
     }
@@ -843,7 +863,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       } else {
         return executors.get(0);
       }
-    } catch (SQLException e) {
+    } catch (Exception e) {
       throw new ExecutorManagerException(String.format(
         "Error fetching executor with id: %d", executorId), e);
     }
@@ -852,38 +872,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   /**
    * {@inheritDoc}
    *
-   * @see azkaban.executor.ExecutorLoader#inactivateExecutor(int)
+   * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
    */
   @Override
-  public void inactivateExecutor(int executorId)
-    throws ExecutorManagerException {
-    final String INACTIVATE = "UPDATE executors SET active=false where id=?";
+  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE executors SET host=?, port=?, active=? where id=?";
 
     QueryRunner runner = createQueryRunner();
     try {
-      runner.update(INACTIVATE, executorId);
+      int rows =
+        runner.update(UPDATE, executor.getHost(), executor.getPort(),
+          executor.isActive(), executor.getId());
+      if (rows == 0) {
+        throw new ExecutorManagerException("No executor with id :"
+          + executor.getId());
+      }
     } catch (SQLException e) {
       throw new ExecutorManagerException("Error inactivating executor "
-        + executorId, e);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorLoader#activateExecutor(int)
-   */
-  @Override
-  public void activateExecutor(int executorId)
-    throws ExecutorManagerException {
-    final String ACTIVATE = "UPDATE executors SET active=true where id=?";
-
-    QueryRunner runner = createQueryRunner();
-    try {
-      runner.update(ACTIVATE, executorId);
-    } catch (SQLException e) {
-      throw new ExecutorManagerException("Error activating executor "
-        + executorId, e);
+        + executor.getId(), e);
     }
   }
 
@@ -911,30 +918,26 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   private void addExecutorHelper(String host, int port)
     throws ExecutorManagerException {
-    final String INSERT =
-      "INSERT INTO executors " + "(host, port) values (?,?)";
+    final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
     QueryRunner runner = createQueryRunner();
-    String errString = String.format("Error adding %s:%d ", host, port);
     try {
-      int rows = runner.update(INSERT, host, port);
-      if (rows == 0) {
-        throw new ExecutorManagerException(errString);
-      }
+      runner.update(INSERT, host, port);
     } catch (SQLException e) {
-      throw new ExecutorManagerException(errString, e);
+      throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+        host, port), e);
     }
   }
 
   /**
    * {@inheritDoc}
    *
-   * @see azkaban.executor.ExecutorLoader#postEvent(azkaban.executor.Executor,
+   * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
    *      azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
    *      java.lang.String)
    */
   @Override
-  public boolean postEvent(Executor executor, EventType type, String user,
-    String message) {
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException{
     QueryRunner runner = createQueryRunner();
 
     final String INSERT_PROJECT_EVENTS =
@@ -944,11 +947,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
         updateDate, user, message);
     } catch (SQLException e) {
-      e.printStackTrace();
-      return false;
+      throw new ExecutorManagerException("Failed to post executor event", e);
     }
-
-    return true;
   }
 
   /**
@@ -959,7 +959,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   @Override
   public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
-    int skip) throws ExecutorManagerException {
+    int offset) throws ExecutorManagerException {
     QueryRunner runner = createQueryRunner();
 
     ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
@@ -967,9 +967,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     try {
       events =
         runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
-          logHandler, executor.getId(), num, skip);
+          logHandler, executor.getId(), num, offset);
     } catch (SQLException e) {
-      logger.error(e);
+      throw new ExecutorManagerException(
+        "Failed to fetch events for executor id : " + executor.getId(), e);
     }
 
     return events;
@@ -1342,6 +1343,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    */
   private static class FetchExecutorHandler implements
     ResultSetHandler<List<Executor>> {
+    private static String FETCH_ALL_EXECUTORS =
+      "SELECT id, host, port, active FROM executors";
     private static String FETCH_ACTIVE_EXECUTORS =
       "SELECT id, host, port, active FROM executors where active=true";
     private static String FETCH_EXECUTOR_BY_ID =
@@ -1361,8 +1364,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         String host = rs.getString(2);
         int port = rs.getInt(3);
         boolean active = rs.getBoolean(4);
-        Executor executor = new Executor(id, host, port);
-        executor.setActive(active);
+        Executor executor = new Executor(id, host, port, active);
         executors.add(executor);
       } while (rs.next());
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7227b0b..138b80f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -111,6 +111,19 @@ public class Utils {
     System.exit(exitCode);
   }
 
+  /**
+   * Tests whether a port is valid or not
+   *
+   * @param port
+   * @return true, if port is valid
+   */
+  public static boolean isValidPort(int port) {
+    if (port >= 1 && port <= 65535) {
+      return true;
+    }
+    return false;
+  }
+
   public static File createTempDir() {
     return createTempDir(new File(System.getProperty("java.io.tmpdir")));
   }
@@ -410,7 +423,7 @@ public class Utils {
   }
 
   /**
-   * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000 
+   * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
    * @return : long value of memory amount in kb
    */
   public static long parseMemString(String strMemSize) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index fed5c6b..81b5a87 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -334,13 +334,25 @@ public class JdbcExecutorLoaderTest {
 
   }
 
-  /* Test active executors fetch from empty executors */
+
+  /* Test all executors fetch from empty executors */
   @Test
   public void testFetchEmptyExecutors() throws Exception {
     if (!isTestSetup()) {
       return;
     }
     ExecutorLoader loader = createLoader();
+    List<Executor> executors = loader.fetchAllExecutors();
+    Assert.assertEquals(executors.size(), 0);
+  }
+
+  /* Test active executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyActiveExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
     List<Executor> executors = loader.fetchActiveExecutors();
     Assert.assertEquals(executors.size(), 0);
   }
@@ -374,7 +386,7 @@ public class JdbcExecutorLoaderTest {
       return;
     }
     ExecutorLoader loader = createLoader();
-    Executor executor = new Executor(1, "localhost", 12345);
+    Executor executor = new Executor(1, "localhost", 12345, true);
     List<ExecutorLogEvent> executorEvents =
       loader.getExecutorEvents(executor, 5, 0);
     Assert.assertEquals(executorEvents.size(), 0);
@@ -389,13 +401,13 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     int skip = 1;
     User user = new User("testUser");
-    Executor executor = new Executor(1, "localhost", 12345);
+    Executor executor = new Executor(1, "localhost", 12345, true);
     String message = "My message ";
     EventType[] events =
       { EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
 
     for (EventType event : events) {
-      loader.postEvent(executor, event, user.getUserId(),
+      loader.postExecutorEvent(executor, event, user.getUserId(),
         message + event.getNumVal());
     }
 
@@ -433,6 +445,23 @@ public class JdbcExecutorLoaderTest {
     clearDB();
   }
 
+  /* Test to try update a non-existent executor */
+  @Test
+  public void testMissingExecutorUpdate() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      Executor executor = new Executor(1, "localhost", 1234, true);
+      loader.updateExecutor(executor);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+    clearDB();
+  }
+
   /* Test add & fetch by Id Executors */
   @Test
   public void testSingleExecutorFetchById() throws Exception {
@@ -448,6 +477,25 @@ public class JdbcExecutorLoaderTest {
     clearDB();
   }
 
+  /* Test fetch all executors */
+  @Test
+  public void testFetchAllExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+
+    executors.get(0).setActive(false);
+    loader.updateExecutor(executors.get(0));
+
+    List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+    Assert.assertEquals(executors.size(), fetchedExecutors.size());
+
+    Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+    clearDB();
+  }
+
   /* Test fetch only active executors */
   @Test
   public void testFetchActiveExecutors() throws Exception {
@@ -457,7 +505,8 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     List<Executor> executors = addTestExecutors(loader);
 
-    loader.inactivateExecutor(executors.get(0).getId());
+    executors.get(0).setActive(false);
+    loader.updateExecutor(executors.get(0));
 
     List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
     Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
@@ -504,7 +553,9 @@ public class JdbcExecutorLoaderTest {
     Executor executor = loader.addExecutor("localhost1", 12345);
     Assert.assertTrue(executor.isActive());
 
-    loader.inactivateExecutor(executor.getId());
+    executor.setActive(false);
+    loader.updateExecutor(executor);
+
     Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
 
     Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
@@ -525,11 +576,13 @@ public class JdbcExecutorLoaderTest {
     Executor executor = loader.addExecutor("localhost1", 12345);
     Assert.assertTrue(executor.isActive());
 
-    loader.inactivateExecutor(executor.getId());
+    executor.setActive(false);
+    loader.updateExecutor(executor);
     Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
     Assert.assertFalse(fetchedExecutor.isActive());
 
-    loader.activateExecutor(executor.getId());
+    executor.setActive(true);
+    loader.updateExecutor(executor);
     fetchedExecutor = loader.fetchExecutor(executor.getId());
 
     Assert.assertEquals(executor, fetchedExecutor);
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 7f6eeff..f2ffee8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -295,33 +295,21 @@ public class MockExecutorLoader implements ExecutorLoader {
 
     }
     executorIdCounter++;
-    Executor executor = new Executor(executorIdCounter, host, port);
-    executors.indexOf(executor);
+    Executor executor = new Executor(executorIdCounter, host, port, true);
     return executor;
   }
 
   @Override
-  public void inactivateExecutor(int executorId)
-    throws ExecutorManagerException {
-    Executor executor = fetchExecutor(executorId);
-    if (executor != null) {
-      executor.setActive(false);
-    }
-  }
-
-  @Override
-  public boolean postEvent(Executor executor, EventType type, String user,
-    String message) {
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException {
     ExecutorLogEvent event =
-      new ExecutorLogEvent(executor.getId(), user,
-        new Date(), type, message);
+      new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
 
     if (!executorEvents.containsKey(executor.getId())) {
       executorEvents.put(executor.getId(), new ArrayList<ExecutorLogEvent>());
     }
 
     executorEvents.get(executor.getId()).add(event);
-    return true;
   }
 
   @Override
@@ -335,11 +323,15 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public void activateExecutor(int executorId) throws ExecutorManagerException {
-    Executor executor = fetchExecutor(executorId);
-    if (executor != null) {
-      executor.setActive(true);
-    }
+  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+    Executor oldExecutor = fetchExecutor(executor.getId());
+    executors.remove(oldExecutor);
+    executors.add(executor);
+  }
+
+  @Override
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    return executors;
   }
 
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
new file mode 100644
index 0000000..4ea0930
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for azkaban.utils.Utils
+ */
+public class UtilsTest {
+
+  /* Test negative port case */
+  @Test
+  public void testNegativePort() {
+    Assert.assertFalse(Utils.isValidPort(-1));
+    Assert.assertFalse(Utils.isValidPort(-10));
+  }
+
+  /* Test zero port case */
+  @Test
+  public void testZeroPort() {
+    Assert.assertFalse(Utils.isValidPort(0));
+  }
+
+  /* Test port beyond limit */
+  @Test
+  public void testOverflowPort() {
+    Assert.assertFalse(Utils.isValidPort(70000));
+    Assert.assertFalse(Utils.isValidPort(65536));
+  }
+
+  /* Test happy isValidPort case*/
+  @Test
+  public void testValidPort() {
+    Assert.assertTrue(Utils.isValidPort(1023));
+    Assert.assertTrue(Utils.isValidPort(10000));
+    Assert.assertTrue(Utils.isValidPort(3030));
+    Assert.assertTrue(Utils.isValidPort(1045));
+  }
+}