diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index f73a30a..123f9e4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,7 +60,7 @@ import azkaban.utils.SwapQueue;
/**
* Class that handles the running of a ExecutableFlow DAG
- *
+ *
*/
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
@@ -122,7 +122,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Constructor. This will create its own ExecutorService for thread pools
- *
+ *
* @param flow
* @param executorLoader
* @param projectLoader
@@ -138,7 +138,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Constructor. If executorService is null, then it will create it's own for
* thread pools.
- *
+ *
* @param flow
* @param executorLoader
* @param projectLoader
@@ -356,7 +356,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Main method that executes the jobs.
- *
+ *
* @throws Exception
*/
private void runFlow() throws Exception {
@@ -725,7 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Determines what the state of the next node should be. Returns null if the
* node should not be run.
- *
+ *
* @param node
* @return
*/
@@ -1094,8 +1094,4 @@ public class FlowRunner extends EventHandler implements Runnable {
public int getNumRunningJobs() {
return activeJobRunners.size();
}
-
- public int getExecutionId() {
- return execId;
- }
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 0f4c840..a044f70 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -27,16 +27,16 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
+import azkaban.project.ProjectLoader;
import azkaban.event.Event;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
@@ -48,28 +48,18 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
-import azkaban.project.ProjectLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
-import azkaban.utils.ThreadPoolExecutingListener;
-import azkaban.utils.TrackingThreadPool;
/**
* Execution manager for the server side execution.
- *
+ *
*/
-public class FlowRunnerManager implements EventListener,
- ThreadPoolExecutingListener {
- private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
- "executor.use.bounded.threadpool.queue";
- private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
- "executor.threadpool.workqueue.size";
- private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
- private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
+public class FlowRunnerManager implements EventListener {
private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
private File executionDirectory;
private File projectDirectory;
@@ -80,18 +70,16 @@ public class FlowRunnerManager implements EventListener,
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
- private Map<Future<?>, Integer> submittedFlows =
- new ConcurrentHashMap<Future<?>, Integer>();
private Map<Integer, FlowRunner> runningFlows =
new ConcurrentHashMap<Integer, FlowRunner>();
private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
new ConcurrentHashMap<Integer, ExecutableFlow>();
-
+ private LinkedBlockingQueue<FlowRunner> flowQueue =
+ new LinkedBlockingQueue<FlowRunner>();
private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
- private int threadPoolQueueSize = -1;
-
- private TrackingThreadPool executorService;
+ private ExecutorService executorService;
+ private SubmitterThread submitterThread;
private CleanerThread cleanerThread;
private int numJobThreadPerFlow = 10;
@@ -104,6 +92,7 @@ public class FlowRunnerManager implements EventListener,
private final Props azkabanProps;
+ private long lastSubmitterThreadCheckTime = -1;
private long lastCleanerThreadCheckTime = -1;
private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
@@ -143,10 +132,10 @@ public class FlowRunnerManager implements EventListener,
// azkaban.temp.dir
numThreads =
- props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
+ props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
numJobThreadPerFlow =
- props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
- executorService = createExecutorService(numThreads);
+ props.getInt("flow.num.job.threads", numJobThreadPerFlow);
+ executorService = Executors.newFixedThreadPool(numThreads);
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
@@ -157,6 +146,9 @@ public class FlowRunnerManager implements EventListener,
this.validateProxyUser =
azkabanProps.getBoolean("proxy.user.lock.down", false);
+ submitterThread = new SubmitterThread(flowQueue);
+ submitterThread.start();
+
cleanerThread = new CleanerThread();
cleanerThread.start();
@@ -173,32 +165,6 @@ public class FlowRunnerManager implements EventListener,
parentClassLoader);
}
- private TrackingThreadPool createExecutorService(int nThreads) {
- boolean useNewThreadPool =
- azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
- logger.info("useNewThreadPool: " + useNewThreadPool);
-
- if (useNewThreadPool) {
- threadPoolQueueSize =
- azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads);
- logger.info("workQueueSize: " + threadPoolQueueSize);
-
- // using a bounded queue for the work queue. The default rejection policy
- // {@ThreadPoolExecutor.AbortPolicy} is used
- TrackingThreadPool executor =
- new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(threadPoolQueueSize), this);
-
- return executor;
- } else {
- // the old way of using unbounded task queue.
- // if the running tasks are taking a long time or stuck, this queue
- // will be very very long.
- return new TrackingThreadPool(nThreads, nThreads, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), this);
- }
- }
-
private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
new HashMap<Pair<Integer, Integer>, ProjectVersion>();
@@ -236,6 +202,34 @@ public class FlowRunnerManager implements EventListener,
this.globalProps = globalProps;
}
+ private class SubmitterThread extends Thread {
+ private BlockingQueue<FlowRunner> queue;
+ private boolean shutdown = false;
+
+ public SubmitterThread(BlockingQueue<FlowRunner> queue) {
+ this.setName("FlowRunnerManager-Submitter-Thread");
+ this.queue = queue;
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ while (!shutdown) {
+ try {
+ lastSubmitterThreadCheckTime = System.currentTimeMillis();
+ FlowRunner flowRunner = queue.take();
+ executorService.submit(flowRunner);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+ }
+ }
+ }
+
private class CleanerThread extends Thread {
// Every hour, clean execution dir.
private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000;
@@ -436,18 +430,18 @@ public class FlowRunnerManager implements EventListener,
}
int numJobThreads = numJobThreadPerFlow;
- if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
+ if (options.getFlowParameters().containsKey("flow.num.job.threads")) {
try {
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
- FLOW_NUM_JOB_THREADS));
+ "flow.num.job.threads"));
if (numJobs > 0 && numJobs <= numJobThreads) {
numJobThreads = numJobs;
}
} catch (Exception e) {
throw new ExecutorManagerException(
"Failed to set the number of job threads "
- + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
+ + options.getFlowParameters().get("flow.num.job.threads")
+ " for flow " + execId, e);
}
}
@@ -467,18 +461,7 @@ public class FlowRunnerManager implements EventListener,
// Finally, queue the sucker.
runningFlows.put(execId, runner);
-
- try {
- // The executorService already has a queue
- Future<?> future = executorService.submit(runner);
- // keep track of this future
- submittedFlows.put(future, runner.getExecutionId());
- } catch (RejectedExecutionException re) {
- throw new ExecutorManagerException(
- "Azkaban server can't execute any more flows. "
- + "The number of running flows has reached the system configured limit."
- + "Please notify Azkaban administrators");
- }
+ flowQueue.add(runner);
}
private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -588,7 +571,6 @@ public class FlowRunnerManager implements EventListener,
logger.info("Flow " + flow.getExecutionId()
+ " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
- submittedFlows.remove(flow.getExecutionId());
}
}
@@ -731,10 +713,22 @@ public class FlowRunnerManager implements EventListener,
return lastCleanerThreadCheckTime;
}
+ public long getLastSubmitterThreadCheckTime() {
+ return lastSubmitterThreadCheckTime;
+ }
+
+ public boolean isSubmitterThreadActive() {
+ return this.submitterThread.isAlive();
+ }
+
public boolean isCleanerThreadActive() {
return this.cleanerThread.isAlive();
}
+ public State getSubmitterThreadState() {
+ return this.submitterThread.getState();
+ }
+
public State getCleanerThreadState() {
return this.cleanerThread.getState();
}
@@ -743,73 +737,26 @@ public class FlowRunnerManager implements EventListener,
return executorService.isShutdown();
}
- public int getNumQueuedFlows() {
- return executorService.getQueue().size();
- }
-
- public int getNumRunningFlows() {
- return executorService.getActiveCount();
+ public int getNumExecutingFlows() {
+ return runningFlows.size();
}
public String getRunningFlowIds() {
- Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
-
- List<Integer> runningFlowIds =
- new ArrayList<Integer>(inProgressTasks.size());
-
- for (Runnable task : inProgressTasks) {
- Integer execId = submittedFlows.get(task);
- if (execId != null) {
- runningFlowIds.add(execId);
- } else {
- logger.warn("getRunningFlowIds: got null execId for task: " + task);
- }
- }
-
- Collections.sort(runningFlowIds);
- return runningFlowIds.toString();
+ ArrayList<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+ Collections.sort(ids);
+ return ids.toString();
}
- public String getQueuedFlowIds() {
- List<Integer> flowIdList =
- new ArrayList<Integer>(executorService.getQueue().size());
-
- for (Runnable task : executorService.getQueue()) {
- Integer execId = submittedFlows.get(task);
- if (execId != null) {
- flowIdList.add(execId);
- } else {
- logger
- .warn("getQueuedFlowIds: got null execId for queuedTask: " + task);
- }
+ public int getNumExecutingJobs() {
+ int jobCount = 0;
+ for (FlowRunner runner : runningFlows.values()) {
+ jobCount += runner.getNumRunningJobs();
}
- Collections.sort(flowIdList);
- return flowIdList.toString();
- }
- public int getMaxNumRunningFlows() {
- return numThreads;
- }
-
- public int getTheadPoolQueueSize() {
- return threadPoolQueueSize;
+ return jobCount;
}
public void reloadJobTypePlugins() throws JobTypeManagerException {
jobtypeManager.loadPlugins();
}
-
- public int getTotalNumExecutedFlows() {
- return executorService.getTotalTasks();
- }
-
- @Override
- public void beforeExecute(Runnable r) {
- }
-
- @Override
- public void afterExecute(Runnable r) {
- submittedFlows.remove(r);
- }
-
}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 21319d7..c5ae77c 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -27,14 +27,12 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.log4j.Logger;
-
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.Flow;
@@ -46,8 +44,8 @@ import azkaban.scheduler.ScheduleManagerException;
import azkaban.server.HttpRequestUtils;
import azkaban.server.session.Session;
import azkaban.user.Permission;
-import azkaban.user.Permission.Type;
import azkaban.user.User;
+import azkaban.user.Permission.Type;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.plugin.PluginRegistry;
@@ -55,9 +53,6 @@ import azkaban.webapp.plugin.ViewerPlugin;
public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
-
- private static final Logger logger = Logger.getLogger(ExecutorServlet.class);
-
private ProjectManager projectManager;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
@@ -354,7 +349,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
/**
* Gets the logs through plain text stream to reduce memory overhead.
- *
+ *
* @param req
* @param resp
* @param user
@@ -394,7 +389,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
/**
* Gets the logs through ajax plain text stream to reduce memory overhead.
- *
+ *
* @param req
* @param resp
* @param user
@@ -714,7 +709,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
HttpServletResponse resp, HashMap<String, Object> ret, User user,
ExecutableFlow exFlow) throws ServletException {
Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
- logger.info("Fetching " + exFlow.getExecutionId());
+ System.out.println("Fetching " + exFlow.getExecutionId());
Project project =
getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
@@ -734,7 +729,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private void ajaxFetchExecutableFlow(HttpServletRequest req,
HttpServletResponse resp, HashMap<String, Object> ret, User user,
ExecutableFlow exFlow) throws ServletException {
- logger.info("Fetching " + exFlow.getExecutionId());
+ System.out.println("Fetching " + exFlow.getExecutionId());
Project project =
getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);