Details
diff --git a/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java b/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
index a8e36aa..8267104 100644
--- a/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
+++ b/azkaban-common/src/main/java/azkaban/database/DataSourceUtils.java
@@ -129,11 +129,13 @@ public class DataSourceUtils {
private static MonitorThread monitorThread = null;
+ private final String url;
+
private MySQLBasicDataSource(String host, int port, String dbName,
String user, String password, int numConnections) {
super();
- String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
+ url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
addConnectionProperty("useUnicode", "yes");
addConnectionProperty("characterEncoding", "UTF-8");
setDriverClassName("com.mysql.jdbc.Driver");
@@ -196,10 +198,7 @@ public class DataSourceUtils {
PreparedStatement query = connection.prepareStatement("SELECT 1");
query.execute();
} catch (SQLException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- logger
- .error("MySQL connection test failed. Please check MySQL connection health!");
+ logger.error("Unable to reach MySQL server on " + url);
} finally {
DbUtils.closeQuietly(connection);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8abbd61..3f51d75 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -34,6 +34,8 @@ public interface ConnectorParams {
public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
+ public static final String ACTIVATE = "activate";
+ public static final String DEACTIVATE = "deactivate";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 80e8167..0606467 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -26,25 +26,25 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
public interface ExecutorLoader {
- public void uploadExecutableFlow(ExecutableFlow flow)
+ void uploadExecutableFlow(ExecutableFlow flow)
throws ExecutorManagerException;
- public ExecutableFlow fetchExecutableFlow(int execId)
+ ExecutableFlow fetchExecutableFlow(int execId)
throws ExecutorManagerException;
- public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException;
- public List<ExecutableFlow> fetchFlowHistory(int skip, int num)
+ List<ExecutableFlow> fetchFlowHistory(int skip, int num)
throws ExecutorManagerException;
- public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
+ List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
int skip, int num) throws ExecutorManagerException;
- public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
+ List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
int skip, int num, Status status) throws ExecutorManagerException;
- public List<ExecutableFlow> fetchFlowHistory(String projContain,
+ List<ExecutableFlow> fetchFlowHistory(String projContain,
String flowContains, String userNameContains, int status, long startData,
long endData, int skip, int num) throws ExecutorManagerException;
@@ -59,7 +59,7 @@ public interface ExecutorLoader {
* @return List<Executor>
* @throws ExecutorManagerException
*/
- public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+ List<Executor> fetchAllExecutors() throws ExecutorManagerException;
/**
* <pre>
@@ -72,7 +72,7 @@ public interface ExecutorLoader {
* @return List<Executor>
* @throws ExecutorManagerException
*/
- public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
+ List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
/**
* <pre>
@@ -86,7 +86,7 @@ public interface ExecutorLoader {
* @return Executor
* @throws ExecutorManagerException
*/
- public Executor fetchExecutor(String host, int port)
+ Executor fetchExecutor(String host, int port)
throws ExecutorManagerException;
/**
@@ -100,7 +100,7 @@ public interface ExecutorLoader {
* @return Executor
* @throws ExecutorManagerException
*/
- public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+ Executor fetchExecutor(int executorId) throws ExecutorManagerException;
/**
* <pre>
@@ -114,7 +114,7 @@ public interface ExecutorLoader {
* @return Executor
* @throws ExecutorManagerException
*/
- public Executor addExecutor(String host, int port)
+ Executor addExecutor(String host, int port)
throws ExecutorManagerException;
/**
@@ -129,7 +129,7 @@ public interface ExecutorLoader {
* @param executorId
* @throws ExecutorManagerException
*/
- public void updateExecutor(Executor executor) throws ExecutorManagerException;
+ void updateExecutor(Executor executor) throws ExecutorManagerException;
/**
* <pre>
@@ -144,7 +144,7 @@ public interface ExecutorLoader {
* @param message
* @return isSuccess
*/
- public void postExecutorEvent(Executor executor, EventType type, String user,
+ void postExecutorEvent(Executor executor, EventType type, String user,
String message) throws ExecutorManagerException;
/**
@@ -165,10 +165,10 @@ public interface ExecutorLoader {
List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
int offset) throws ExecutorManagerException;
- public void addActiveExecutableReference(ExecutionReference ref)
+ void addActiveExecutableReference(ExecutionReference ref)
throws ExecutorManagerException;
- public void removeActiveExecutableReference(int execId)
+ void removeActiveExecutableReference(int execId)
throws ExecutorManagerException;
@@ -183,7 +183,7 @@ public interface ExecutorLoader {
* @param execId
* @throws ExecutorManagerException
*/
- public void unassignExecutor(int executionId) throws ExecutorManagerException;
+ void unassignExecutor(int executionId) throws ExecutorManagerException;
/**
* <pre>
@@ -197,7 +197,7 @@ public interface ExecutorLoader {
* @param execId
* @throws ExecutorManagerException
*/
- public void assignExecutor(int executorId, int execId)
+ void assignExecutor(int executorId, int execId)
throws ExecutorManagerException;
/**
@@ -212,7 +212,7 @@ public interface ExecutorLoader {
* @return fetched Executor
* @throws ExecutorManagerException
*/
- public Executor fetchExecutorByExecutionId(int executionId)
+ Executor fetchExecutorByExecutionId(int executionId)
throws ExecutorManagerException;
/**
@@ -226,59 +226,59 @@ public interface ExecutorLoader {
* @return List of queued flows and corresponding execution reference
* @throws ExecutorManagerException
*/
- public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
throws ExecutorManagerException;
- public boolean updateExecutableReference(int execId, long updateTime)
+ boolean updateExecutableReference(int execId, long updateTime)
throws ExecutorManagerException;
- public LogData fetchLogs(int execId, String name, int attempt, int startByte,
+ LogData fetchLogs(int execId, String name, int attempt, int startByte,
int endByte) throws ExecutorManagerException;
- public List<Object> fetchAttachments(int execId, String name, int attempt)
+ List<Object> fetchAttachments(int execId, String name, int attempt)
throws ExecutorManagerException;
- public void uploadLogFile(int execId, String name, int attempt, File... files)
+ void uploadLogFile(int execId, String name, int attempt, File... files)
throws ExecutorManagerException;
- public void uploadAttachmentFile(ExecutableNode node, File file)
+ void uploadAttachmentFile(ExecutableNode node, File file)
throws ExecutorManagerException;
- public void updateExecutableFlow(ExecutableFlow flow)
+ void updateExecutableFlow(ExecutableFlow flow)
throws ExecutorManagerException;
- public void uploadExecutableNode(ExecutableNode node, Props inputParams)
+ void uploadExecutableNode(ExecutableNode node, Props inputParams)
throws ExecutorManagerException;
- public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
+ List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
throws ExecutorManagerException;
- public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
+ ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
throws ExecutorManagerException;
- public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
+ List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
int skip, int size) throws ExecutorManagerException;
- public void updateExecutableNode(ExecutableNode node)
+ void updateExecutableNode(ExecutableNode node)
throws ExecutorManagerException;
- public int fetchNumExecutableFlows(int projectId, String flowId)
+ int fetchNumExecutableFlows(int projectId, String flowId)
throws ExecutorManagerException;
- public int fetchNumExecutableFlows() throws ExecutorManagerException;
+ int fetchNumExecutableFlows() throws ExecutorManagerException;
- public int fetchNumExecutableNodes(int projectId, String jobId)
+ int fetchNumExecutableNodes(int projectId, String jobId)
throws ExecutorManagerException;
- public Props fetchExecutionJobInputProps(int execId, String jobId)
+ Props fetchExecutionJobInputProps(int execId, String jobId)
throws ExecutorManagerException;
- public Props fetchExecutionJobOutputProps(int execId, String jobId)
+ Props fetchExecutionJobOutputProps(int execId, String jobId)
throws ExecutorManagerException;
- public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
+ Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
throws ExecutorManagerException;
- public int removeExecutionLogsByTime(long millis)
+ int removeExecutionLogsByTime(long millis)
throws ExecutorManagerException;
}
diff --git a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
index c993db9..9546fb9 100644
--- a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
+++ b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
@@ -37,8 +37,7 @@ public class AbstractServiceServlet extends HttpServlet {
@Override
public void init(ServletConfig config) throws ServletException {
application =
- (AzkabanServer) config.getServletContext().getAttribute(
- ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY);
+ (AzkabanServer) config.getServletContext().getAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index 7f50989..d9ca84b 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -34,10 +34,6 @@ import azkaban.server.session.SessionCache;
public abstract class AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanServer.class);
- public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
- public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
- "azkaban.private.properties";
- public static final String DEFAULT_CONF_PATH = "conf";
private static Props azkabanProperties = null;
public static Props loadProps(String[] args) {
@@ -83,8 +79,8 @@ public abstract class AzkabanServer {
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
File azkabanPrivatePropsFile =
- new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+ new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
Props props = null;
try {
@@ -128,7 +124,7 @@ public abstract class AzkabanServer {
return null;
}
- File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+ File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
logger
.error(azkabanHome + " does not contain a readable conf directory.");
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 47fbfad..5f832d5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -16,8 +16,20 @@
package azkaban.execapp;
+import com.google.common.base.Throwables;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -32,14 +44,6 @@ import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.log4j.Logger;
-import org.joda.time.DateTimeZone;
-import org.mortbay.jetty.Connector;
-import org.mortbay.jetty.Server;
-import org.mortbay.jetty.servlet.Context;
-import org.mortbay.jetty.servlet.ServletHolder;
-import org.mortbay.thread.QueuedThreadPool;
-
import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -48,7 +52,9 @@ import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.execapp.metric.NumQueuedFlowMetric;
import azkaban.execapp.metric.NumRunningFlowMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxJettyServer;
import azkaban.metric.IMetricEmitter;
@@ -58,11 +64,15 @@ import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.server.AzkabanServer;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
import azkaban.utils.Props;
import azkaban.utils.SystemMemoryInfo;
import azkaban.utils.Utils;
+import static azkaban.server.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
public class AzkabanExecutorServer {
private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY =
"jmx.attribute.processor.class";
@@ -70,15 +80,9 @@ public class AzkabanExecutorServer {
.getLogger(AzkabanExecutorServer.class);
private static final int MAX_FORM_CONTENT_SIZE = 10 * 1024 * 1024;
- public static final String AZKABAN_HOME = "AZKABAN_HOME";
- public static final String DEFAULT_CONF_PATH = "conf";
- public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
- public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
- "azkaban.private.properties";
public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
public static final String METRIC_INTERVAL =
"executor.metric.milisecinterval.";
- public static final int DEFAULT_PORT_NUMBER = 12321;
public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
@@ -86,14 +90,13 @@ public class AzkabanExecutorServer {
private static AzkabanExecutorServer app;
- private ExecutorLoader executionLoader;
- private ProjectLoader projectLoader;
- private FlowRunnerManager runnerManager;
- private Props props;
- private Props executorGlobalProps;
- private Server server;
+ private final ExecutorLoader executionLoader;
+ private final ProjectLoader projectLoader;
+ private final FlowRunnerManager runnerManager;
+ private final Props props;
+ private final Server server;
- private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+ private final ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
private MBeanServer mbeanServer;
/**
@@ -103,11 +106,45 @@ public class AzkabanExecutorServer {
*/
public AzkabanExecutorServer(Props props) throws Exception {
this.props = props;
+ server = createJettyServer(props);
+
+ executionLoader = new JdbcExecutorLoader(props);
+ projectLoader = new JdbcProjectLoader(props);
+ runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader());
- int portNumber = props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+ JmxJobMBeanManager.getInstance().initialize(props);
+
+ // make sure this happens before
+ configureJobCallback(props);
+
+ configureMBeanServer();
+ configureMetricReports();
+
+ SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));
+
+ loadCustomJMXAttributeProcessor(props);
+
+ try {
+ server.start();
+ } catch (Exception e) {
+ logger.error(e);
+ Utils.croak(e.getMessage(), 1);
+ }
+
+ insertExecutorEntryIntoDB();
+ dumpPortToFile();
+ logger.info("Started Executor Server on " + getExecutorHostPort());
+ }
+
+ private Server createJettyServer(Props props) {
int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
- server = new Server(portNumber);
+ /*
+ * Default to a port number 0 (zero)
+ * The Jetty server automatically finds an unused port when the port number is set to zero
+ * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
+ */
+ Server server = new Server(props.getInt("executor.port", 0));
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
server.setThreadPool(httpThreadPool);
@@ -134,34 +171,35 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
- root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
-
- executionLoader = createExecLoader(props);
- projectLoader = createProjectLoader(props);
- runnerManager =
- new FlowRunnerManager(props, executionLoader, projectLoader, this
- .getClass().getClassLoader());
-
- JmxJobMBeanManager.getInstance().initialize(props);
-
- // make sure this happens before
- configureJobCallback(props);
-
- configureMBeanServer();
- configureMetricReports();
-
- SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));
-
- loadCustomJMXAttributeProcessor(props);
+ root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+ return server;
+ }
+ private void insertExecutorEntryIntoDB() {
try {
- server.start();
- } catch (Exception e) {
- logger.warn(e);
- Utils.croak(e.getMessage(), 1);
+ final String host = requireNonNull(getHost());
+ final int port = getPort();
+ checkState(port != -1);
+ final Executor executor = executionLoader.fetchExecutor(host, port);
+ if (executor == null) {
+ executionLoader.addExecutor(host, port);
+ }
+ // If executor already exists, ignore it
+ } catch (ExecutorManagerException e) {
+ logger.error("Error inserting executor entry into DB", e);
+ Throwables.propagate(e);
}
+ }
- logger.info("Azkaban Executor Server started on port " + portNumber);
+ private void dumpPortToFile() {
+ // By default this should write to the working directory
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(AZKABAN_EXECUTOR_PORT_FILENAME))) {
+ writer.write(String.valueOf(getPort()));
+ writer.write("\n");
+ } catch (IOException e) {
+ logger.error(e);
+ Throwables.propagate(e);
+ }
}
private void configureJobCallback(Props props) {
@@ -259,14 +297,6 @@ public class AzkabanExecutorServer {
}
}
- private ExecutorLoader createExecLoader(Props props) {
- return new JdbcExecutorLoader(props);
- }
-
- private ProjectLoader createProjectLoader(Props props) {
- return new JdbcProjectLoader(props);
- }
-
public void stopServer() throws Exception {
server.stop();
server.destroy();
@@ -289,10 +319,6 @@ public class AzkabanExecutorServer {
return props;
}
- public Props getExecutorGlobalProps() {
- return executorGlobalProps;
- }
-
/**
* Returns the currently executing executor server, if one exists.
*
@@ -392,7 +418,7 @@ public class AzkabanExecutorServer {
return null;
}
- File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+ File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
logger
.error(azkabanHome + " does not contain a readable conf directory.");
@@ -409,13 +435,12 @@ public class AzkabanExecutorServer {
/**
* Loads the Azkaban conf file int a Props object
*
- * @param path
* @return
*/
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
File azkabanPrivatePropsFile =
- new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+ new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
Props props = null;
try {
@@ -503,17 +528,39 @@ public class AzkabanExecutorServer {
}
}
+
/**
- * Returns host:port combination for currently running executor
- * @return
+ * Get the hostname
+ *
+ * @return hostname
*/
- public String getExecutorHostPort() {
+ public String getHost() {
String host = "unkownHost";
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
} catch (Exception e) {
logger.error("Failed to fetch LocalHostName");
}
- return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+ return host;
+ }
+
+ /**
+ * Get the current server port
+ * @return the port at which the executor server is running
+ */
+ public int getPort() {
+ final Connector[] connectors = server.getConnectors();
+ checkState(connectors.length >= 1, "Server must have at least 1 connector");
+
+ // The first connector is created upon initializing the server. That's the one that has the port.
+ return connectors[0].getLocalPort();
+ }
+
+ /**
+ * Returns host:port combination for currently running executor
+ * @return
+ */
+ public String getExecutorHostPort() {
+ return getHost() + ":" + getPort();
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 6b04a85..482d36f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -16,6 +16,8 @@
package azkaban.execapp;
+import com.google.common.base.Preconditions;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -35,8 +37,10 @@ import org.codehaus.jackson.map.ObjectMapper;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
@@ -58,7 +62,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
public void init(ServletConfig config) throws ServletException {
application =
(AzkabanExecutorServer) config.getServletContext().getAttribute(
- ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY);
+ Constants.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
@@ -95,6 +99,12 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
} else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
logger.info("Reloading Jobtype plugins");
handleReloadJobTypePlugins(respMap);
+ } else if (action.equals(ACTIVATE)) {
+ logger.warn("Setting ACTIVE flag to true");
+ setActive(true, respMap);
+ } else if (action.equals(DEACTIVATE)) {
+ logger.warn("Setting ACTIVE flag to false");
+ setActive(false, respMap);
} else {
int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
String user = getParam(req, USER_PARAM, null);
@@ -338,6 +348,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
+ private void setActive(boolean value, Map<String, Object> respMap)
+ throws ServletException {
+ try {
+ ExecutorLoader executorLoader = application.getExecutorLoader();
+ Executor executor = executorLoader.fetchExecutor(application.getHost(), application.getPort());
+ Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
+ if (executor.isActive() != value) {
+ executor.setActive(value);
+ executorLoader.updateExecutor(executor);
+ } else {
+ logger.warn("Set active action ignored. Executor is already " + (value? "active" : "inactive"));
+ }
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
index 6412e0b..e3d42cd 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ConnectorParams;
import azkaban.server.HttpRequestUtils;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
import azkaban.utils.JSONUtils;
public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
@@ -45,7 +45,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
public void init(ServletConfig config) throws ServletException {
server =
(AzkabanExecutorServer) config.getServletContext().getAttribute(
- ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY);
+ Constants.AZKABAN_SERVLET_CONTEXT_KEY);
}
public boolean hasParam(HttpServletRequest request, String param) {
diff --git a/azkaban-sql/src/sql/create.executors.sql b/azkaban-sql/src/sql/create.executors.sql
index 8e59ce3..b5516a6 100644
--- a/azkaban-sql/src/sql/create.executors.sql
+++ b/azkaban-sql/src/sql/create.executors.sql
@@ -2,7 +2,7 @@ CREATE TABLE executors (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
host VARCHAR(64) NOT NULL,
port INT NOT NULL,
- active BOOLEAN DEFAULT true,
+ active BOOLEAN DEFAULT false,
UNIQUE (host, port),
UNIQUE INDEX executor_id (id)
);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index c425913..243512c 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -67,7 +67,7 @@ import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
import azkaban.server.AzkabanServer;
-import azkaban.server.ServerConstants;
+import azkaban.server.Constants;
import azkaban.server.session.SessionCache;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
@@ -804,7 +804,7 @@ public class AzkabanWebServer extends AzkabanServer {
// TODO: find something else to do the job
app.getTriggerManager().start();
- root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, app);
+ root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
try {
server.start();
} catch (Exception e) {