Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 04a3434..9e8d441 100644
@@ -97,9 +97,9 @@ public class Executor implements Comparable<Executor> {
@Override
public String toString() {
- return String.format("%s:%s (id: %s)",
+ return String.format("%s:%s (id: %s), active=%s",
null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
- this.port, this.id);
+ this.port, this.id, this.isActive);
}
public String getHost() {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index d583d59..ad0c167 100644
@@ -277,7 +277,10 @@ public class AzkabanExecutorServer {
checkState(port != -1);
final Executor executor = this.executionLoader.fetchExecutor(host, port);
if (executor == null) {
+ logger.info("This executor wasn't found in the DB. Adding self.");
this.executionLoader.addExecutor(host, port);
+ } else {
+ logger.info("This executor is already in the DB. Found: " + executor);
}
// If executor already exists, ignore it
} catch (final ExecutorManagerException e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index f0b77cc..29ec76a 100644
@@ -27,7 +27,6 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -372,18 +371,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
private void setActiveInternal(final boolean value)
throws ExecutorManagerException {
- final ExecutorLoader executorLoader = this.application.getExecutorLoader();
- final Executor executor = executorLoader.fetchExecutor(this.application.getHost(),
- this.application.getPort());
- Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
- if (executor.isActive() != value) {
- executor.setActive(value);
- executorLoader.updateExecutor(executor);
- this.flowRunnerManager.setExecutorActive(value);
- } else {
- logger.warn(
- "Set active action ignored. Executor is already " + (value ? "active" : "inactive"));
- }
+ this.flowRunnerManager.setExecutorActive(value,
+ this.application.getHost(), this.application.getPort());
}
/**
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3cb2311..32e475b 100644
@@ -26,6 +26,7 @@ import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.execapp.metric.NumFailedFlowMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -48,6 +49,7 @@ import azkaban.utils.Props;
import azkaban.utils.ThreadPoolExecutingListener;
import azkaban.utils.TrackingThreadPool;
import azkaban.utils.UndefinedPropertyException;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -320,13 +322,20 @@ public class FlowRunnerManager implements EventListener,
return allProjects;
}
- // todo chengren311: this method will be invoked by executor activate API, but in SOLO mode
- // the API is not called. So we should either have everything run in "multi-executor" mode
- // or make SOLO server mode call the API.
- public void setExecutorActive(final boolean isActive) {
- this.isExecutorActive = isActive;
- if (this.isExecutorActive) {
- this.installedProjects = this.loadExistingProjectsAsCache();
+ public void setExecutorActive(final boolean isActive, final String host, final int port)
+ throws ExecutorManagerException {
+ final Executor executor = this.executorLoader.fetchExecutor(host, port);
+ Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
+ if (executor.isActive() != isActive) {
+ executor.setActive(isActive);
+ this.executorLoader.updateExecutor(executor);
+ this.isExecutorActive = isActive;
+ if (this.isExecutorActive) {
+ this.installedProjects = this.loadExistingProjectsAsCache();
+ }
+ } else {
+ logger.info(
+ "Set active action ignored. Executor is already " + (isActive ? "active" : "inactive"));
}
}
diff --git a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
index fab9267..124fe51 100644
@@ -124,6 +124,10 @@ public class AzkabanSingleServer {
AzkabanExecutorServer.launch(this.executor);
log.info("Azkaban Exec Server started...");
+ this.executor.getFlowRunnerManager()
+ .setExecutorActive(true, this.executor.getHost(), this.executor.getPort());
+ log.info("Azkaban Exec Server activated...");
+
AzkabanWebServer.launch(this.webServer);
log.info("Azkaban Web Server started...");
}
diff --git a/azkaban-solo-server/src/main/resources/conf/azkaban.properties b/azkaban-solo-server/src/main/resources/conf/azkaban.properties
index 624c6c7..9fe1c3d 100644
@@ -43,3 +43,4 @@ executor.connector.stats=true
azkaban.jobtype.plugin.dir=plugins/jobtypes
# Number of executions to be displayed
azkaban.display.execution_page_size=16
+azkaban.use.multiple.executors=true