azkaban-aplcache

Executor to use any port if not configured. Added REST APIs to

11/14/2016 10:55:58 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java b/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
index a8e36aa..8267104 100644
--- a/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
+++ b/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
@@ -129,11 +129,13 @@ public class DataSourceUtils {
 
     private static MonitorThread monitorThread = null;
 
+    private final String url;
+
     private MySQLBasicDataSource(String host, int port, String dbName,
         String user, String password, int numConnections) {
       super();
 
-      String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
+      url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
       addConnectionProperty("useUnicode", "yes");
       addConnectionProperty("characterEncoding", "UTF-8");
       setDriverClassName("com.mysql.jdbc.Driver");
@@ -196,10 +198,7 @@ public class DataSourceUtils {
           PreparedStatement query = connection.prepareStatement("SELECT 1");
           query.execute();
         } catch (SQLException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-          logger
-              .error("MySQL connection test failed. Please check MySQL connection health!");
+          logger.error("Unable to reach MySQL server on " + url);
         } finally {
           DbUtils.closeQuietly(connection);
         }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8abbd61..3f51d75 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -34,6 +34,8 @@ public interface ConnectorParams {
   public static final String ATTACHMENTS_ACTION = "attachments";
   public static final String METADATA_ACTION = "metadata";
   public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
+  public static final String ACTIVATE = "activate";
+  public static final String DEACTIVATE = "deactivate";
 
   public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
   public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 80e8167..0606467 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -26,25 +26,25 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
 public interface ExecutorLoader {
-  public void uploadExecutableFlow(ExecutableFlow flow)
+  void uploadExecutableFlow(ExecutableFlow flow)
       throws ExecutorManagerException;
 
-  public ExecutableFlow fetchExecutableFlow(int execId)
+  ExecutableFlow fetchExecutableFlow(int execId)
       throws ExecutorManagerException;
 
-  public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
+  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException;
 
-  public List<ExecutableFlow> fetchFlowHistory(int skip, int num)
+  List<ExecutableFlow> fetchFlowHistory(int skip, int num)
       throws ExecutorManagerException;
 
-  public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
+  List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
       int skip, int num) throws ExecutorManagerException;
 
-  public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
+  List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
       int skip, int num, Status status) throws ExecutorManagerException;
 
-  public List<ExecutableFlow> fetchFlowHistory(String projContain,
+  List<ExecutableFlow> fetchFlowHistory(String projContain,
       String flowContains, String userNameContains, int status, long startData,
       long endData, int skip, int num) throws ExecutorManagerException;
 
@@ -59,7 +59,7 @@ public interface ExecutorLoader {
    * @return List<Executor>
    * @throws ExecutorManagerException
    */
-  public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+  List<Executor> fetchAllExecutors() throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -72,7 +72,7 @@ public interface ExecutorLoader {
    * @return List<Executor>
    * @throws ExecutorManagerException
    */
-  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
+  List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -86,7 +86,7 @@ public interface ExecutorLoader {
    * @return Executor
    * @throws ExecutorManagerException
    */
-  public Executor fetchExecutor(String host, int port)
+  Executor fetchExecutor(String host, int port)
     throws ExecutorManagerException;
 
   /**
@@ -100,7 +100,7 @@ public interface ExecutorLoader {
    * @return Executor
    * @throws ExecutorManagerException
    */
-  public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+  Executor fetchExecutor(int executorId) throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -114,7 +114,7 @@ public interface ExecutorLoader {
    * @return Executor
    * @throws ExecutorManagerException
    */
-  public Executor addExecutor(String host, int port)
+  Executor addExecutor(String host, int port)
     throws ExecutorManagerException;
 
   /**
@@ -129,7 +129,7 @@ public interface ExecutorLoader {
    * @param executorId
    * @throws ExecutorManagerException
    */
-  public void updateExecutor(Executor executor) throws ExecutorManagerException;
+  void updateExecutor(Executor executor) throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -144,7 +144,7 @@ public interface ExecutorLoader {
    * @param message
    * @return isSuccess
    */
-  public void postExecutorEvent(Executor executor, EventType type, String user,
+  void postExecutorEvent(Executor executor, EventType type, String user,
     String message) throws ExecutorManagerException;
 
   /**
@@ -165,10 +165,10 @@ public interface ExecutorLoader {
   List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
     int offset) throws ExecutorManagerException;
 
-  public void addActiveExecutableReference(ExecutionReference ref)
+  void addActiveExecutableReference(ExecutionReference ref)
       throws ExecutorManagerException;
 
-  public void removeActiveExecutableReference(int execId)
+  void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
 
@@ -183,7 +183,7 @@ public interface ExecutorLoader {
    * @param execId
    * @throws ExecutorManagerException
    */
-  public void unassignExecutor(int executionId) throws ExecutorManagerException;
+  void unassignExecutor(int executionId) throws ExecutorManagerException;
 
   /**
    * <pre>
@@ -197,7 +197,7 @@ public interface ExecutorLoader {
    * @param execId
    * @throws ExecutorManagerException
    */
-  public void assignExecutor(int executorId, int execId)
+  void assignExecutor(int executorId, int execId)
     throws ExecutorManagerException;
 
   /**
@@ -212,7 +212,7 @@ public interface ExecutorLoader {
    * @return fetched Executor
    * @throws ExecutorManagerException
    */
-  public Executor fetchExecutorByExecutionId(int executionId)
+  Executor fetchExecutorByExecutionId(int executionId)
     throws ExecutorManagerException;
 
   /**
@@ -226,59 +226,59 @@ public interface ExecutorLoader {
    * @return List of queued flows and corresponding execution reference
    * @throws ExecutorManagerException
    */
-  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+  List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
     throws ExecutorManagerException;
 
-  public boolean updateExecutableReference(int execId, long updateTime)
+  boolean updateExecutableReference(int execId, long updateTime)
       throws ExecutorManagerException;
 
-  public LogData fetchLogs(int execId, String name, int attempt, int startByte,
+  LogData fetchLogs(int execId, String name, int attempt, int startByte,
       int endByte) throws ExecutorManagerException;
 
-  public List<Object> fetchAttachments(int execId, String name, int attempt)
+  List<Object> fetchAttachments(int execId, String name, int attempt)
       throws ExecutorManagerException;
 
-  public void uploadLogFile(int execId, String name, int attempt, File... files)
+  void uploadLogFile(int execId, String name, int attempt, File... files)
       throws ExecutorManagerException;
 
-  public void uploadAttachmentFile(ExecutableNode node, File file)
+  void uploadAttachmentFile(ExecutableNode node, File file)
       throws ExecutorManagerException;
 
-  public void updateExecutableFlow(ExecutableFlow flow)
+  void updateExecutableFlow(ExecutableFlow flow)
       throws ExecutorManagerException;
 
-  public void uploadExecutableNode(ExecutableNode node, Props inputParams)
+  void uploadExecutableNode(ExecutableNode node, Props inputParams)
       throws ExecutorManagerException;
 
-  public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
+  List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
       throws ExecutorManagerException;
 
-  public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
+  ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
       throws ExecutorManagerException;
 
-  public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
+  List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
       int skip, int size) throws ExecutorManagerException;
 
-  public void updateExecutableNode(ExecutableNode node)
+  void updateExecutableNode(ExecutableNode node)
       throws ExecutorManagerException;
 
-  public int fetchNumExecutableFlows(int projectId, String flowId)
+  int fetchNumExecutableFlows(int projectId, String flowId)
       throws ExecutorManagerException;
 
-  public int fetchNumExecutableFlows() throws ExecutorManagerException;
+  int fetchNumExecutableFlows() throws ExecutorManagerException;
 
-  public int fetchNumExecutableNodes(int projectId, String jobId)
+  int fetchNumExecutableNodes(int projectId, String jobId)
       throws ExecutorManagerException;
 
-  public Props fetchExecutionJobInputProps(int execId, String jobId)
+  Props fetchExecutionJobInputProps(int execId, String jobId)
       throws ExecutorManagerException;
 
-  public Props fetchExecutionJobOutputProps(int execId, String jobId)
+  Props fetchExecutionJobOutputProps(int execId, String jobId)
       throws ExecutorManagerException;
 
-  public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
+  Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
       throws ExecutorManagerException;
 
-  public int removeExecutionLogsByTime(long millis)
+  int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
index c993db9..9546fb9 100644
--- a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
+++ b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
@@ -37,8 +37,7 @@ public class AbstractServiceServlet extends HttpServlet {
   @Override
   public void init(ServletConfig config) throws ServletException {
     application =
-        (AzkabanServer) config.getServletContext().getAttribute(
-            ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY);
+        (AzkabanServer) config.getServletContext().getAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY);
 
     if (application == null) {
       throw new IllegalStateException(
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index 7f50989..d9ca84b 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -34,10 +34,6 @@ import azkaban.server.session.SessionCache;
 
 public abstract class AzkabanServer {
   private static final Logger logger = Logger.getLogger(AzkabanServer.class);
-  public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
-  public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
-      "azkaban.private.properties";
-  public static final String DEFAULT_CONF_PATH = "conf";
   private static Props azkabanProperties = null;
 
   public static Props loadProps(String[] args) {
@@ -83,8 +79,8 @@ public abstract class AzkabanServer {
 
   private static Props loadAzkabanConfigurationFromDirectory(File dir) {
     File azkabanPrivatePropsFile =
-        new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
-    File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+        new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+    File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
 
     Props props = null;
     try {
@@ -128,7 +124,7 @@ public abstract class AzkabanServer {
       return null;
     }
 
-    File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+    File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
     if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
       logger
           .error(azkabanHome + " does not contain a readable conf directory.");
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 47fbfad..5f832d5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -16,8 +16,20 @@
 
 package azkaban.execapp;
 
+import com.google.common.base.Throwables;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -32,14 +44,6 @@ import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.log4j.Logger;
-import org.joda.time.DateTimeZone;
-import org.mortbay.jetty.Connector;
-import org.mortbay.jetty.Server;
-import org.mortbay.jetty.servlet.Context;
-import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.thread.QueuedThreadPool;
-
 import azkaban.execapp.event.JobCallbackManager;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -48,7 +52,9 @@ import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.execapp.metric.NumQueuedFlowMetric;
 import azkaban.execapp.metric.NumRunningFlowMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.metric.IMetricEmitter;
@@ -58,11 +64,15 @@ import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.server.AzkabanServer;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
 import azkaban.utils.Props;
 import azkaban.utils.SystemMemoryInfo;
 import azkaban.utils.Utils;
 
+import static azkaban.server.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 public class AzkabanExecutorServer {
   private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY =
       "jmx.attribute.processor.class";
@@ -70,15 +80,9 @@ public class AzkabanExecutorServer {
       .getLogger(AzkabanExecutorServer.class);
   private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
 
-  public static final String AZKABAN_HOME = "AZKABAN_HOME";
-  public static final String DEFAULT_CONF_PATH = "conf";
-  public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
-  public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
-      "azkaban.private.properties";
   public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
   public static final String METRIC_INTERVAL =
       "executor.metric.milisecinterval.";
-  public static final int DEFAULT_PORT_NUMBER = 12321;
   public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
 
   private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -86,14 +90,13 @@ public class AzkabanExecutorServer {
 
   private static AzkabanExecutorServer app;
 
-  private ExecutorLoader executionLoader;
-  private ProjectLoader projectLoader;
-  private FlowRunnerManager runnerManager;
-  private Props props;
-  private Props executorGlobalProps;
-  private Server server;
+  private final ExecutorLoader executionLoader;
+  private final ProjectLoader projectLoader;
+  private final FlowRunnerManager runnerManager;
+  private final Props props;
+  private final Server server;
 
-  private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+  private final ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
   private MBeanServer mbeanServer;
 
   /**
@@ -103,11 +106,45 @@ public class AzkabanExecutorServer {
    */
   public AzkabanExecutorServer(Props props) throws Exception {
     this.props = props;
+    server = createJettyServer(props);
+
+    executionLoader = new JdbcExecutorLoader(props);
+    projectLoader = new JdbcProjectLoader(props);
+    runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader());
 
-    int portNumber = props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+    JmxJobMBeanManager.getInstance().initialize(props);
+
+    // make sure this happens before
+    configureJobCallback(props);
+
+    configureMBeanServer();
+    configureMetricReports();
+
+    SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));
+
+    loadCustomJMXAttributeProcessor(props);
+
+    try {
+      server.start();
+    } catch (Exception e) {
+      logger.error(e);
+      Utils.croak(e.getMessage(), 1);
+    }
+
+    insertExecutorEntryIntoDB();
+    dumpPortToFile();
+    logger.info("Started Executor Server on " + getExecutorHostPort());
+  }
+
+  private Server createJettyServer(Props props) {
     int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
 
-    server = new Server(portNumber);
+    /*
+     * Default to a port number 0 (zero)
+     * The Jetty server automatically finds an unused port when the port number is set to zero
+     * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
+     */
+    Server server = new Server(props.getInt("executor.port", 0));
     QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
     server.setThreadPool(httpThreadPool);
 
@@ -134,34 +171,35 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
     root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
 
-    root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
-
-    executionLoader = createExecLoader(props);
-    projectLoader = createProjectLoader(props);
-    runnerManager =
-        new FlowRunnerManager(props, executionLoader, projectLoader, this
-            .getClass().getClassLoader());
-
-    JmxJobMBeanManager.getInstance().initialize(props);
-
-    // make sure this happens before
-    configureJobCallback(props);
-
-    configureMBeanServer();
-    configureMetricReports();
-
-    SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));
-
-    loadCustomJMXAttributeProcessor(props);
+    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+    return server;
+  }
 
+  private void insertExecutorEntryIntoDB() {
     try {
-      server.start();
-    } catch (Exception e) {
-      logger.warn(e);
-      Utils.croak(e.getMessage(), 1);
+      final String host = requireNonNull(getHost());
+      final int port = getPort();
+      checkState(port != -1);
+      final Executor executor = executionLoader.fetchExecutor(host, port);
+      if (executor == null) {
+        executionLoader.addExecutor(host, port);
+      }
+      // If executor already exists, ignore it
+    } catch (ExecutorManagerException e) {
+      logger.error("Error inserting executor entry into DB", e);
+      Throwables.propagate(e);
     }
+  }
 
-    logger.info("Azkaban Executor Server started on port " + portNumber);
+  private void dumpPortToFile() {
+    // By default this should write to the working directory
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(AZKABAN_EXECUTOR_PORT_FILENAME))) {
+      writer.write(String.valueOf(getPort()));
+      writer.write("\n");
+    } catch (IOException e) {
+      logger.error(e);
+      Throwables.propagate(e);
+    }
   }
 
   private void configureJobCallback(Props props) {
@@ -259,14 +297,6 @@ public class AzkabanExecutorServer {
     }
   }
 
-  private ExecutorLoader createExecLoader(Props props) {
-    return new JdbcExecutorLoader(props);
-  }
-
-  private ProjectLoader createProjectLoader(Props props) {
-    return new JdbcProjectLoader(props);
-  }
-
   public void stopServer() throws Exception {
     server.stop();
     server.destroy();
@@ -289,10 +319,6 @@ public class AzkabanExecutorServer {
     return props;
   }
 
-  public Props getExecutorGlobalProps() {
-    return executorGlobalProps;
-  }
-
   /**
    * Returns the currently executing executor server, if one exists.
    *
@@ -392,7 +418,7 @@ public class AzkabanExecutorServer {
       return null;
     }
 
-    File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+    File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
     if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
       logger
           .error(azkabanHome + " does not contain a readable conf directory.");
@@ -409,13 +435,12 @@ public class AzkabanExecutorServer {
   /**
    * Loads the Azkaban conf file int a Props object
    *
-   * @param path
    * @return
    */
   private static Props loadAzkabanConfigurationFromDirectory(File dir) {
     File azkabanPrivatePropsFile =
-        new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
-    File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+        new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+    File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
 
     Props props = null;
     try {
@@ -503,17 +528,39 @@ public class AzkabanExecutorServer {
     }
   }
 
+
   /**
-   * Returns host:port combination for currently running executor
-   * @return
+   * Get the hostname
+   *
+   * @return hostname
    */
-  public String getExecutorHostPort() {
+  public String getHost() {
     String host = "unkownHost";
     try {
       host = InetAddress.getLocalHost().getCanonicalHostName();
     } catch (Exception e) {
       logger.error("Failed to fetch LocalHostName");
     }
-    return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+    return host;
+  }
+
+  /**
+   * Get the current server port
+   * @return the port at which the executor server is running
+   */
+  public int getPort() {
+    final Connector[] connectors = server.getConnectors();
+    checkState(connectors.length >= 1, "Server must have at least 1 connector");
+
+    // The first connector is created upon initializing the server. That's the one that has the port.
+    return connectors[0].getLocalPort();
+  }
+
+  /**
+   * Returns host:port combination for currently running executor
+   * @return
+   */
+  public String getExecutorHostPort() {
+    return getHost() + ":" + getPort();
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 6b04a85..482d36f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -16,6 +16,8 @@
 
 package azkaban.execapp;
 
+import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -35,8 +37,10 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
@@ -58,7 +62,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
   public void init(ServletConfig config) throws ServletException {
     application =
         (AzkabanExecutorServer) config.getServletContext().getAttribute(
-            ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY);
+            Constants.AZKABAN_SERVLET_CONTEXT_KEY);
 
     if (application == null) {
       throw new IllegalStateException(
@@ -95,6 +99,12 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
         } else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
           logger.info("Reloading Jobtype plugins");
           handleReloadJobTypePlugins(respMap);
+        } else if (action.equals(ACTIVATE)) {
+          logger.warn("Setting ACTIVE flag to true");
+          setActive(true, respMap);
+        } else if (action.equals(DEACTIVATE)) {
+          logger.warn("Setting ACTIVE flag to false");
+          setActive(false, respMap);
         } else {
           int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
           String user = getParam(req, USER_PARAM, null);
@@ -338,6 +348,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
     }
   }
 
+  private void setActive(boolean value, Map<String, Object> respMap)
+      throws ServletException {
+    try {
+      ExecutorLoader executorLoader = application.getExecutorLoader();
+      Executor executor = executorLoader.fetchExecutor(application.getHost(), application.getPort());
+      Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
+      if (executor.isActive() != value) {
+        executor.setActive(value);
+        executorLoader.updateExecutor(executor);
+      } else {
+        logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
+      }
+      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+    } catch (Exception e) {
+      logger.error(e);
+      respMap.put(RESPONSE_ERROR, e.getMessage());
+    }
+  }
+
   @Override
   public void doPost(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
index 6412e0b..e3d42cd 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ConnectorParams;
 import azkaban.server.HttpRequestUtils;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
 import azkaban.utils.JSONUtils;
 
 public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
@@ -45,7 +45,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
   public void init(ServletConfig config) throws ServletException {
     server =
         (AzkabanExecutorServer) config.getServletContext().getAttribute(
-            ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY);
+            Constants.AZKABAN_SERVLET_CONTEXT_KEY);
   }
 
   public boolean hasParam(HttpServletRequest request, String param) {
diff --git a/azkaban-sql/src/sql/create.executors.sql b/azkaban-sql/src/sql/create.executors.sql
index 8e59ce3..b5516a6 100644
--- a/azkaban-sql/src/sql/create.executors.sql
+++ b/azkaban-sql/src/sql/create.executors.sql
@@ -2,7 +2,7 @@ CREATE TABLE executors (
   id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
   host VARCHAR(64) NOT NULL,
   port INT NOT NULL,
-  active BOOLEAN DEFAULT true,
+  active BOOLEAN DEFAULT false,
   UNIQUE (host, port),
   UNIQUE INDEX executor_id (id)
 );
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index c425913..243512c 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -67,7 +67,7 @@ import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
 import azkaban.server.AzkabanServer;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
 import azkaban.server.session.SessionCache;
 import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
@@ -804,7 +804,7 @@ public class AzkabanWebServer extends AzkabanServer {
     // TODO: find something else to do the job
     app.getTriggerManager().start();
 
-    root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, app);
+    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
     try {
       server.start();
     } catch (Exception e) {