azkaban-aplcache
Changes
src/java/azkaban/executor/ExecutableFlow.java 64(+43 -21)
src/java/azkaban/executor/ExecutorManager.java 505(+343 -162)
src/java/azkaban/executor/FlowRunner.java 93(+74 -19)
src/java/azkaban/executor/JobRunner.java 20(+15 -5)
src/java/azkaban/utils/ExecutableFlowLoader.java 158(+134 -24)
src/java/azkaban/utils/JSONUtils.java 14(+14 -0)
Details
src/java/azkaban/executor/ExecutableFlow.java 64(+43 -21)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 997d5fc..45ad983 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -18,21 +18,22 @@ public class ExecutableFlow {
private String flowId;
private String projectId;
private String executionPath;
-
+
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
- private ArrayList<String> startNodes = new ArrayList<String>();
+ private ArrayList<String> startNodes;
+ private ArrayList<String> endNodes;
private long submitTime = -1;
private long startTime = -1;
private long endTime = -1;
+ private int updateNumber = 0;
private Status flowStatus = Status.UNKNOWN;
-
private String submitUser;
public enum Status {
- FAILED, SUCCEEDED, RUNNING, WAITING, KILLED, IGNORED, READY, UNKNOWN
+ FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN
}
public ExecutableFlow(String id, Flow flow) {
@@ -58,6 +59,14 @@ public class ExecutableFlow {
return flowProps.values();
}
+ public int getUpdateNumber() {
+ return updateNumber;
+ }
+
+ public void setUpdateNumber(int number) {
+ updateNumber = number;
+ }
+
private void setFlow(Flow flow) {
for (Node node: flow.getNodes()) {
String id = node.getId();
@@ -73,15 +82,35 @@ public class ExecutableFlow {
targetNode.addInNode(edge.getSourceId());
}
- for (ExecutableNode node : executableNodes.values()) {
- if (node.getInNodes().size()==0) {
- startNodes.add(node.id);
+ flowProps.putAll(flow.getAllFlowProps());
+ }
+
+ public List<String> getStartNodes() {
+ if (startNodes == null) {
+ startNodes = new ArrayList<String>();
+ for (ExecutableNode node: executableNodes.values()) {
+ if (node.getInNodes().isEmpty()) {
+ startNodes.add(node.id);
+ }
}
}
- flowProps.putAll(flow.getAllFlowProps());
+ return startNodes;
}
-
+
+ public List<String> getEndNodes() {
+ if (endNodes == null) {
+ endNodes = new ArrayList<String>();
+ for (ExecutableNode node: executableNodes.values()) {
+ if (node.getOutNodes().isEmpty()) {
+ endNodes.add(node.id);
+ }
+ }
+ }
+
+ return endNodes;
+ }
+
public void setStatus(String nodeId, Status status) {
ExecutableNode exNode = executableNodes.get(nodeId);
exNode.setStatus(status);
@@ -151,10 +180,6 @@ public class ExecutableFlow {
this.flowStatus = flowStatus;
}
- public List<String> getStartNodes() {
- return new ArrayList<String>(startNodes);
- }
-
public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
@@ -167,7 +192,6 @@ public class ExecutableFlow {
flowObj.put("endTime", endTime);
flowObj.put("status", flowStatus.toString());
flowObj.put("submitUser", submitUser);
- flowObj.put("startNodes", startNodes);
ArrayList<Object> props = new ArrayList<Object>();
for (FlowProps fprop: flowProps.values()) {
@@ -222,8 +246,6 @@ public class ExecutableFlow {
FlowProps flowProps = new FlowProps(inheritedSource, source);
exFlow.flowProps.put(source, flowProps);
}
-
- exFlow.startNodes.addAll((List<String>)flowObj.get("startNodes"));
return exFlow;
}
@@ -240,9 +262,9 @@ public class ExecutableFlow {
public void updateExecutableFlowFromObject(Object obj) {
HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
- submitTime = (Long)flowObj.get("submitTime");
- startTime = (Long)flowObj.get("startTime");
- endTime = (Long)flowObj.get("endTime");
+ submitTime = getLongFromObject(flowObj.get("submitTime"));
+ startTime = getLongFromObject(flowObj.get("startTime"));
+ endTime = getLongFromObject(flowObj.get("endTime"));
flowStatus = Status.valueOf((String)flowObj.get("status"));
List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -385,8 +407,8 @@ public class ExecutableFlow {
HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
status = Status.valueOf((String)objMap.get("status"));
- startTime = (Long)objMap.get("startTime");
- endTime = (Long)objMap.get("endTime");
+ startTime = getLongFromObject(objMap.get("startTime"));
+ endTime = getLongFromObject(objMap.get("endTime"));
}
public long getStartTime() {
src/java/azkaban/executor/ExecutorManager.java 505(+343 -162)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index a38f5c9..5893857 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -12,18 +12,23 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
-import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.JSONUtils;
@@ -43,9 +48,11 @@ public class ExecutorManager {
private int portNumber;
private String url = "localhost";
- private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
+ private ConcurrentHashMap<String, ExecutableFlow> runningFlows = new ConcurrentHashMap<String, ExecutableFlow>();
+ private LinkedList<ExecutableFlow> recentlyFinished = new LinkedList<ExecutableFlow>();
+ private int recentlyFinishedSize = 10;
- public ExecutorManager(Props props) {
+ public ExecutorManager(Props props) throws IOException, ExecutorManagerException {
basePath = new File(props.getString("execution.directory"));
if (!basePath.exists()) {
logger.info("Execution directory " + basePath + " not found.");
@@ -57,134 +64,93 @@ public class ExecutorManager {
}
}
- portNumber = props.getInt("executor.port", AzkabanExecutorServer.DEFAULT_PORT_NUMBER);
- token = props.getString("executor.shared.token", "");
- counter.set(0);
- loadActiveExecutions();
- }
-
- public List<ExecutableFlow> getExecutableFlowByProject(String projectId, int from, int maxResults) {
- File activeFlows = new File(basePath, projectId + File.separatorChar + "active");
-
- if (!activeFlows.exists()) {
- return Collections.emptyList();
+ File activePath = new File(basePath, "active");
+ if(!activePath.exists() && !activePath.mkdirs()) {
+ throw new RuntimeException("Execution directory " + activePath + " does not exist and cannot be created.");
}
- File[] executionFiles = activeFlows.listFiles();
- if (executionFiles.length == 0 || from >= executionFiles.length) {
- return Collections.emptyList();
+ File archivePath = new File(basePath, "archive");
+ if(!archivePath.exists() && !archivePath.mkdirs()) {
+ throw new RuntimeException("Execution directory " + archivePath + " does not exist and cannot be created.");
}
-
- Arrays.sort(executionFiles);
-
- ArrayList<ExecutableFlow> executionFlows = new ArrayList<ExecutableFlow>();
- int index = (executionFiles.length - from - 1);
- for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
- File exDir = executionFiles[index];
- ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
-
- if (flow != null) {
- executionFlows.add(flow);
- }
- else {
- logger.info("Skipping loading " + exDir + ". Couldn't load execution.");
- }
- }
+ portNumber = props.getInt("executor.port", AzkabanExecutorServer.DEFAULT_PORT_NUMBER);
+ token = props.getString("executor.shared.token", "");
+ counter.set(0);
+ loadActiveExecutions();
- return executionFlows;
+ ExecutingManagerUpdaterThread executingManager = new ExecutingManagerUpdaterThread();
+ executingManager.start();
}
- public int getExecutableFlowByProjectFlow(String projectId, String flowName, int from, int maxResults, List<ExecutableFlow> results) {
- File activeFlows = new File(basePath, projectId + File.separatorChar + "active");
-
- if (!activeFlows.exists()) {
+ public int getExecutableFlows(String projectId, String flowId, int from, int maxResults, List<ExecutableFlow> output) {
+ String projectPath = projectId + File.separator + flowId;
+ File flowProjectPath = new File(basePath, projectPath);
+
+ if (!flowProjectPath.exists()) {
return 0;
}
- File[] executionFiles = activeFlows.listFiles(new SuffixFilter(flowName, false));
- //File[] executionFiles = activeFlows.listFiles();
+ File[] executionFiles = flowProjectPath.listFiles();
if (executionFiles.length == 0 || from >= executionFiles.length) {
return 0;
}
+
+ // Sorts the file in ascending order, so we read the list backwards.
Arrays.sort(executionFiles);
-
- int count = 0;
- for (int index = executionFiles.length - from - 1; count < maxResults && index>=0; --index ) {
+ int index = (executionFiles.length - from - 1);
+
+ for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
File exDir = executionFiles[index];
- ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
-
- if (flow != null) {
- results.add(flow);
- count++;
+ try {
+ ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
+ output.add(flow);
}
- else {
- logger.info("Skipping loading " + exDir + ". Couldn't load execution.");
+ catch (ExecutorManagerException e) {
+ logger.error("Skipping loading " + exDir + ". Couldn't load execution.", e);
}
}
return executionFiles.length;
}
-//
-// private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
-// logger.info("Loading execution " + exDir.getName());
-// String exFlowName = exDir.getName();
-//
-// String flowFileName = "_" + exFlowName + ".flow";
-// File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
-// Arrays.sort(exFlowFiles);
-//
-// if (exFlowFiles.length <= 0) {
-// logger.error("Execution flow " + exFlowName + " missing flow file.");
-// return null;
-// }
-// File lastExFlow = exFlowFiles[exFlowFiles.length-1];
-//
-// Object exFlowObj = null;
-// try {
-// exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
-// } catch (IOException e) {
-// logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
-// return null;
-// }
-//
-// ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
-// return flow;
-// }
-//
- private void loadActiveExecutions() {
- File[] executingProjects = basePath.listFiles();
- for (File project: executingProjects) {
- File activeFlows = new File(project, "active");
- if (!activeFlows.exists()) {
- continue;
- }
-
- for (File exflow: activeFlows.listFiles()) {
- logger.info("Loading execution " + exflow.getName());
- ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exflow);
+
+
+ private void loadActiveExecutions() throws IOException, ExecutorManagerException {
+ File activeFlows = new File(basePath, "active");
+ File[] activeFlowDirs = activeFlows.listFiles();
+
+ for (File activeFlowDir: activeFlowDirs) {
+ if (activeFlowDir.isDirectory()) {
+ ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
+ ExecutableFlow flow = this.getFlowFromReference(reference);
if (flow != null) {
- logger.info("Adding active execution flow " + flow.getExecutionId());
runningFlows.put(flow.getExecutionId(), flow);
}
+ else {
+ logger.error("Flow " + reference.getExecId() + " not found.");
+ }
+ }
+ else {
+ logger.info("Path " + activeFlowDir + " not a directory.");
}
}
}
public synchronized ExecutableFlow createExecutableFlow(Flow flow) {
String projectId = flow.getProjectId();
+ String flowId = flow.getId();
- File projectExecutionDir = new File(basePath, projectId);
- String id = flow.getId();
-
+ String flowExecutionDir = projectId + File.separator + flowId;
+ File projectExecutionDir = new File(basePath, flowExecutionDir);
+
// Find execution
File executionDir;
String executionId;
int count = counter.getAndIncrement();
String countString = String.format("%05d", count);
do {
- executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + id;
+ executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + flowId;
executionDir = new File(projectExecutionDir, executionId);
}
while(executionDir.exists());
@@ -195,7 +161,8 @@ public class ExecutorManager {
public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
String executionId = exflow.getExecutionId();
- String projectFlowDir = exflow.getProjectId() + File.separator + "active" + File.separator + executionId;
+
+ String projectFlowDir = exflow.getProjectId() + File.separator + exflow.getFlowId() + File.separator + executionId;
File executionPath = new File(basePath, projectFlowDir);
if (executionPath.exists()) {
throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
@@ -203,13 +170,63 @@ public class ExecutorManager {
executionPath.mkdirs();
exflow.setExecutionPath(executionPath.getPath());
- runningFlows.put(executionId, exflow);
}
-
- public synchronized ExecutableFlow getExecutableFlow(String flowId) throws ExecutorManagerException {
- ExecutableFlow flow = runningFlows.get(flowId);
+
+ public synchronized ExecutableFlow getExecutableFlow(String executionId) throws ExecutorManagerException {
+ ExecutableFlow flow = runningFlows.get(executionId);
+ if (flow != null) {
+ return flow;
+ }
- return flow;
+ // Check active
+ File baseActiveDir = new File(basePath, "active");
+ File referenceDir = new File(baseActiveDir, executionId);
+
+ if (!referenceDir.exists()) {
+ File baseArchiveDir = new File(basePath, "archive");
+ referenceDir = new File(baseArchiveDir, executionId);
+ if (!referenceDir.exists()) {
+ throw new ExecutorManagerException("Execution id '" + executionId + "' not found. Searching " + referenceDir);
+ }
+ }
+
+ ExecutionReference reference = null;
+ try {
+ reference = ExecutionReference.readFromDirectory(referenceDir);
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e.getMessage(), e);
+ }
+
+
+ return getFlowFromReference(reference);
+ }
+
+ private ExecutableFlow getFlowFromReference(ExecutionReference reference) throws ExecutorManagerException {
+ File executionPath = new File(reference.getExecPath());
+ if (executionPath.exists()) {
+ return ExecutableFlowLoader.loadExecutableFlowFromDir(executionPath);
+ }
+ return null;
+ }
+
+ private synchronized void addActiveExecutionReference(ExecutableFlow flow) throws ExecutorManagerException {
+ File activeDirectory = new File(basePath, "active");
+ if (!activeDirectory.exists()) {
+ activeDirectory.mkdirs();
+ }
+
+ // Create execution reference directory
+ File referenceDir = new File(activeDirectory, flow.getExecutionId());
+ referenceDir.mkdirs();
+
+ // We don't really need to save the reference,
+ ExecutionReference reference = new ExecutionReference(flow);
+ try {
+ reference.writeToDirectory(referenceDir);
+ } catch (IOException e) {
+ throw new ExecutorManagerException("Couldn't write execution to directory.", e);
+ }
+ runningFlows.put(flow.getExecutionId(), flow);
}
public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -217,16 +234,20 @@ public class ExecutorManager {
File executionDir = new File(executionPath);
flow.setSubmitTime(System.currentTimeMillis());
- File resourceFile = writeResourceFile(executionDir, flow);
- File executableFlowFile = writeExecutableFlowFile(executionDir, flow);
+ writeResourceFile(executionDir, flow);
+ ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
+ addActiveExecutionReference(flow);
+ runningFlows.put(flow.getExecutionId(), flow);
+
logger.info("Setting up " + flow.getExecutionId() + " for execution.");
URIBuilder builder = new URIBuilder();
builder.setScheme("http")
.setHost(url)
.setPort(portNumber)
- .setPath("/submit")
+ .setPath("/executor")
.setParameter("sharedToken", token)
+ .setParameter("action", "execute")
.setParameter("execid", flow.getExecutionId())
.setParameter("execpath", flow.getExecutionPath());
@@ -239,17 +260,24 @@ public class ExecutorManager {
return;
}
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
HttpClient httpclient = new DefaultHttpClient();
HttpGet httpget = new HttpGet(uri);
- HttpResponse response = null;
+ String response = null;
try {
- response = httpclient.execute(httpget);
+ response = httpclient.execute(httpget, responseHandler);
} catch (IOException e) {
flow.setStatus(ExecutableFlow.Status.FAILED);
e.printStackTrace();
return;
}
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ logger.debug("Submitted Response: " + response);
}
public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
@@ -298,36 +326,6 @@ public class ExecutorManager {
return resourceFile;
}
- private File writeExecutableFlowFile(File executionDir, ExecutableFlow flow) throws ExecutorManagerException {
- // Write out the execution file
- String flowFileName = "_" + flow.getExecutionId() + ".flow";
- File flowFile = new File(executionDir, flowFileName);
- if (flowFile.exists()) {
- throw new ExecutorManagerException("The flow file " + flowFileName + " already exists. Race condition?");
- }
-
- BufferedOutputStream out = null;
- try {
- logger.info("Writing executable file " + flowFile);
- out = new BufferedOutputStream(new FileOutputStream(flowFile));
- JSONUtils.toJSON(flow.toObject(), out, true);
- } catch (IOException e) {
- throw new ExecutorManagerException(e.getMessage(), e);
- }
- finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
- return flowFile;
- }
-
private HashMap<String, Object> createResourcesList(File baseDir, File dir, Set<String> sourceFiles) {
boolean containsSource = false;
@@ -430,47 +428,230 @@ public class ExecutorManager {
}
}
- private class ExecutingFlow implements Runnable {
- public void run() {
-
+ private String getFlowStatusInExecutor(ExecutableFlow flow) throws IOException {
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(url)
+ .setPort(portNumber)
+ .setPath("/executor")
+ .setParameter("sharedToken", token)
+ .setParameter("action", "status")
+ .setParameter("execid", flow.getExecutionId());
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ throw new IOException("Bad URI", e);
}
- }
-
- private void updateRunningJobs() {
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ String response = null;
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+
+ try {
+ response = httpclient.execute(httpget, responseHandler);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new IOException("Connection problem", e);
+ }
+ finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+ return response;
}
- private String createUniqueId(String projectId, String flowId) {
- return null;
+ private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
+
+ // Write final file
+ int updateNum = exFlow.getUpdateNumber();
+ updateNum++;
+ ExecutableFlowLoader.writeExecutableFlowFile(new File(exFlow.getExecutionPath()), exFlow, updateNum);
+
+ String activeReferencePath = "active" + File.separator + exFlow.getExecutionId();
+ File activeDirectory = new File(basePath, activeReferencePath);
+ if (!activeDirectory.exists()) {
+ logger.error("WTF!! Active reference " + activeDirectory + " directory doesn't exist.");
+ throw new ExecutorManagerException("Active reference " + activeDirectory + " doesn't exists.");
+ }
+
+ String archiveReferencePath = "archive" + File.separator + exFlow.getExecutionId();
+ File archiveDirectory = new File(basePath, archiveReferencePath);
+ if (archiveDirectory.exists()) {
+ logger.error("WTF!! Archive reference already exists!");
+ throw new ExecutorManagerException("Active reference " + archiveDirectory + " already exists.");
+ }
+
+ // Move file.
+ if (!activeDirectory.renameTo(archiveDirectory)) {
+ throw new ExecutorManagerException("Cannot move " + activeDirectory + " to " + archiveDirectory);
+ }
+
+ runningFlows.remove(exFlow.getExecutionId());
+ cleanupUnusedFiles(exFlow);
}
- private static class PrefixFilter implements FileFilter {
- private String prefix;
-
- public PrefixFilter(String prefix) {
- this.prefix = prefix;
- }
+ private class ExecutingManagerUpdaterThread extends Thread {
+ private boolean shutdown = false;
+ private int updateTimeMs = 100;
+ public void run() {
+ while (!shutdown) {
+ ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>(runningFlows.values());
+ for(ExecutableFlow exFlow : flows) {
+ File executionDir = new File(exFlow.getExecutionPath());
+
+ if (!executionDir.exists()) {
+ logger.error("WTF!! Execution dir " + executionDir + " doesn't exist!");
+ // @TODO probably should handle this error case somehow. Cleanup?
+ continue;
+ }
- @Override
- public boolean accept(File pathname) {
- String name = pathname.getName();
+ // Query the executor service to see if the item is running.
+ String responseString = null;
+ try {
+ responseString = getFlowStatusInExecutor(exFlow);
+ } catch (IOException e) {
+ // Connection issue. Backoff 1 sec.
+ synchronized(this) {
+ try {
+ wait(1000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ continue;
+ }
+
+ Object executorResponseObj;
+ try {
+ executorResponseObj = JSONUtils.parseJSONFromString(responseString);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ continue;
+ }
+
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> response = (HashMap<String, Object>)executorResponseObj;
+ String status = (String)response.get("status");
+
+ try {
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+ } catch (ExecutorManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ continue;
+ }
- return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
+ // If it's completed, and not running, we clean up.
+ if (exFlow.getStatus() == Status.SUCCEEDED || exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.KILLED) {
+ if (status.equals("notfound")) {
+ // Cleanup
+ logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
+ try {
+ cleanFinishedJob(exFlow);
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ continue;
+ }
+ }
+ else {
+ logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running");
+ }
+ }
+ else {
+ // If it's not finished, and not running, we will fail it and clean up.
+ if (status.equals("notfound")) {
+ logger.error("Flow " + exFlow.getExecutionId() + " is running, but the Executor can't find it.");
+ exFlow.setEndTime(System.currentTimeMillis());
+ exFlow.setStatus(Status.FAILED);
+ }
+ }
+
+ synchronized(this) {
+ try {
+ wait(updateTimeMs);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
}
}
- private static class SuffixFilter implements FileFilter {
- private String suffix;
- private boolean filesOnly = false;
-
- public SuffixFilter(String suffix, boolean filesOnly) {
- this.suffix = suffix;
+ private static class ExecutionReference {
+ private String execId;
+ private String projectId;
+ private String flowId;
+ private String userId;
+ private String execPath;
+
+ public ExecutionReference() {
+ }
+
+ public ExecutionReference(ExecutableFlow flow) {
+ this.execId = flow.getExecutionId();
+ this.projectId = flow.getProjectId();
+ this.flowId = flow.getFlowId();
+ this.userId = flow.getSubmitUser();
+ this.execPath = flow.getExecutionPath();
}
+
+ private Object toObject() {
+ HashMap<String, Object> obj = new HashMap<String, Object>();
+ obj.put("execId", execId);
+ obj.put("projectId", projectId);
+ obj.put("flowId", flowId);
+ obj.put("userId", userId);
+ obj.put("execPath", execPath);
+ return obj;
+ }
+
+ public void writeToDirectory(File directory) throws IOException {
+ File file = new File(directory, "execution.json");
+ if (!file.exists()) {
+ JSONUtils.toJSON(this.toObject(), file);
+ }
+ }
+
+ public static ExecutionReference readFromDirectory(File directory) throws IOException {
+ File file = new File(directory, "execution.json");
+ if (!file.exists()) {
+ throw new IOException("Execution file execution.json does not exist.");
+ }
+
+ HashMap<String, Object> obj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(file);
+ ExecutionReference reference = new ExecutionReference();
+ reference.execId = (String)obj.get("execId");
+ reference.projectId = (String)obj.get("projectId");
+ reference.flowId = (String)obj.get("flowId");
+ reference.userId = (String)obj.get("userId");
+ reference.execPath = (String)obj.get("execPath");
- @Override
- public boolean accept(File pathname) {
- String name = pathname.getName();
- return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
+ return reference;
+ }
+
+ public String getExecId() {
+ return execId;
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getExecPath() {
+ return execPath;
}
}
}
src/java/azkaban/executor/FlowRunner.java 93(+74 -19)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 26a9ff2..43dd743 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -3,11 +3,15 @@ package azkaban.executor;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
@@ -16,6 +20,7 @@ import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
import azkaban.executor.event.EventListener;
import azkaban.flow.FlowProps;
+import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
@@ -26,18 +31,18 @@ public class FlowRunner extends EventHandler implements Runnable {
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
private int numThreads = NUM_CONCURRENT_THREADS;
private boolean cancelled = true;
- private boolean done = false;
private Map<String, JobRunner> jobRunnersMap;
private JobRunnerEventListener listener;
private Map<String, Props> sharedProps = new HashMap<String, Props>();
private Map<String, Props> outputProps = new HashMap<String, Props>();
private File basePath;
-
+ private AtomicInteger commitCount = new AtomicInteger(0);
+ private HashSet<String> finalNodes = new HashSet<String>();
+
public enum FailedFlowOptions {
FINISH_RUNNING_JOBS,
- COMPLETE_ALL_DEPENDENCIES,
- CANCEL_ALL
+ KILL_ALL
}
private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
@@ -55,7 +60,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public void cancel() {
- done = true;
+ finalNodes.clear();
cancelled = true;
executorService.shutdownNow();
@@ -74,8 +79,21 @@ public class FlowRunner extends EventHandler implements Runnable {
return cancelled;
}
+ private synchronized void commitFlow() {
+ int count = commitCount.getAndIncrement();
+
+ try {
+ ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
+ } catch (ExecutorManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
@Override
public void run() {
+ flow.setStatus(Status.RUNNING);
+ flow.setStartTime(System.currentTimeMillis());
this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
// Load all shared props
@@ -104,18 +122,48 @@ public class FlowRunner extends EventHandler implements Runnable {
return;
}
- while(!done) {
+ // When this is empty, we will stop.
+ finalNodes.addAll(flow.getEndNodes());
+
+ // Main loop
+ while(!finalNodes.isEmpty()) {
JobRunner runner = null;
try {
runner = jobsToRun.take();
} catch (InterruptedException e) {
}
- if (!done && runner != null) {
- executorService.submit(runner);
+ if (!finalNodes.isEmpty() && runner != null) {
+ try {
+ ExecutableNode node = runner.getNode();
+ node.setStatus(Status.WAITING);
+ executorService.submit(runner);
+ finalNodes.remove(node.getId());
+ } catch (RejectedExecutionException e) {
+ // Should reject if I shutdown executor.
+ break;
+ }
}
}
+ executorService.shutdown();
+ while (executorService.isTerminated()) {
+ try {
+ executorService.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ if (flow.getStatus() == Status.RUNNING) {
+ flow.setStatus(Status.SUCCEEDED);
+ }
+ else {
+ flow.setStatus(Status.FAILED);
+ }
+ flow.setEndTime(System.currentTimeMillis());
+ commitFlow();
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
}
@@ -136,7 +184,7 @@ public class FlowRunner extends EventHandler implements Runnable {
File propsFile = new File(basePath, source);
Props jobProps = new Props(parentProps, propsFile);
- JobRunner jobRunner = new JobRunner(node, jobProps);
+ JobRunner jobRunner = new JobRunner(node, jobProps, new File(flow.getExecutionPath()));
jobRunner.addListener(listener);
jobRunnersMap.put(node.getId(), jobRunner);
@@ -176,7 +224,7 @@ public class FlowRunner extends EventHandler implements Runnable {
for (String dependency: dependentNode.getInNodes()) {
ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
if (dependencyNode.getStatus() != Status.SUCCEEDED &&
- dependencyNode.getStatus() != Status.IGNORED) {
+ dependencyNode.getStatus() != Status.DISABLED) {
ready = false;
break;
}
@@ -213,14 +261,20 @@ public class FlowRunner extends EventHandler implements Runnable {
private void handleFailedJob(ExecutableNode node) {
System.err.println("Job " + node.getId() + " failed.");
this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
- if (failedOptions == FailedFlowOptions.FINISH_RUNNING_JOBS) {
- done = true;
- }
- else if (failedOptions == FailedFlowOptions.CANCEL_ALL) {
- this.cancel();
- }
- else if (failedOptions == FailedFlowOptions.COMPLETE_ALL_DEPENDENCIES) {
+
+ switch (failedOptions) {
+ // We finish running current jobs and then fail. Do not accept new jobs.
+ case FINISH_RUNNING_JOBS:
+ finalNodes.clear();
+ executorService.shutdown();
+ this.notify();
+ break;
+ // We kill all running jobs and fail immediately
+ case KILL_ALL:
+ this.cancel();
+ break;
}
+
}
private class JobRunnerEventListener implements EventListener {
@@ -236,16 +290,17 @@ public class FlowRunner extends EventHandler implements Runnable {
String jobID = runner.getNode().getId();
System.out.println("Event " + jobID + " " + event.getType().toString());
+ // On Job success, we add the output props and then set up the next run.
if (event.getType() == Type.JOB_SUCCEEDED) {
- // Continue adding items.
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
-
flowRunner.handleSucceededJob(runner.getNode());
}
else if (event.getType() == Type.JOB_FAILED) {
flowRunner.handleFailedJob(runner.getNode());
}
+
+ flowRunner.commitFlow();
}
}
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index f07af9d..b4879cd 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -10,6 +10,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventListener;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
@@ -65,6 +66,15 @@ public class FlowRunnerManager {
return runningFlows.get(id);
}
+ public ExecutableFlow getExecutableFlow(String id) {
+ FlowRunner runner = runningFlows.get(id);
+ if (runner == null) {
+ return null;
+ }
+
+ return runner.getFlow();
+ }
+
private class SubmitterThread extends Thread {
private BlockingQueue<FlowRunner> queue;
private boolean shutdown = false;
@@ -102,7 +112,13 @@ public class FlowRunnerManager {
public synchronized void handleEvent(Event event) {
FlowRunner runner = (FlowRunner)event.getRunner();
ExecutableFlow flow = runner.getFlow();
+
System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
+ if (event.getType() == Type.FLOW_FINISHED) {
+ logger.info("Flow " + flow.getExecutionId() + " has finished.");
+ runningFlows.remove(flow.getExecutionId());
+ }
+
}
}
}
src/java/azkaban/executor/JobRunner.java 20(+15 -5)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 7e2d45a..24615a5 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -1,5 +1,7 @@
package azkaban.executor;
+import java.io.File;
+
import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
@@ -11,11 +13,13 @@ public class JobRunner extends EventHandler implements Runnable {
private Props props;
private Props outputProps;
private ExecutableNode node;
+ private File workingDir;
- public JobRunner(ExecutableNode node, Props props) {
+ public JobRunner(ExecutableNode node, Props props, File workingDir) {
this.props = props;
this.node = node;
this.node.setStatus(Status.WAITING);
+ this.workingDir = workingDir;
}
public ExecutableNode getNode() {
@@ -24,17 +28,23 @@ public class JobRunner extends EventHandler implements Runnable {
@Override
public void run() {
- if (node.getStatus() == Status.KILLED) {
+ if (node.getStatus() == Status.DISABLED) {
+ this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
+ return;
+ }
+ else if (node.getStatus() == Status.KILLED) {
this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
return;
}
-
- this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+ node.setStartTime(System.currentTimeMillis());
node.setStatus(Status.RUNNING);
+ this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+
// Run Job
boolean succeeded = true;
+ node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
@@ -44,7 +54,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
}
}
-
+
public void cancel() {
// Cancel code here
src/java/azkaban/utils/ExecutableFlowLoader.java 158(+134 -24)
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 613f3d4..3f862b4 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -1,18 +1,83 @@
package azkaban.utils;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
public class ExecutableFlowLoader {
private static final Logger logger = Logger.getLogger(ExecutableFlowLoader.class.getName());
- public static ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+ /**
+ * Loads and create ExecutableFlow from the latest execution file.
+ *
+ * @param exDir
+ * @return
+ * @throws ExecutorManagerException
+ */
+ public static ExecutableFlow loadExecutableFlowFromDir(File exDir) throws ExecutorManagerException {
+ File flowFile = getLatestExecutableFlowDir(exDir);
+ Object exFlowObj = getFlowObjectFromFile(flowFile);
+
+ int updateNumber = getFlowUpdateNumber(flowFile);
+ ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+ flow.setUpdateNumber(updateNumber);
+ flow.setExecutionPath(exDir.getPath());
+ return flow;
+ }
+
+ /**
+ * Get the latest update number from file.
+ * @param file
+ * @return
+ */
+ private static int getFlowUpdateNumber(File file) {
+ String[] namesplit = file.getName().split("\\.");
+
+ Integer number = 0;
+ try {
+ number = Integer.parseInt(namesplit[namesplit.length - 1]);
+ }
+ catch(NumberFormatException e) {
+ }
+
+ return number;
+ }
+
+ /**
+ * Get Flow object from file
+ *
+ * @param file
+ * @return
+ * @throws ExecutorManagerException
+ */
+ private static Object getFlowObjectFromFile(File file) throws ExecutorManagerException {
+ Object exFlowObj = null;
+ try {
+ exFlowObj = JSONUtils.parseJSONFromFile(file);
+ } catch (IOException e) {
+ logger.error("Error loading execution flow " + file.getName() + ". Problems parsing json file.");
+ throw new ExecutorManagerException(e.getMessage(), e);
+ }
+
+ return exFlowObj;
+ }
+
+ /**
+ * Get the latest executable flow dir
+ *
+ * @param exDir
+ * @return
+ * @throws ExecutorManagerException
+ */
+ private static File getLatestExecutableFlowDir(File exDir) throws ExecutorManagerException {
String exFlowName = exDir.getName();
String flowFileName = "_" + exFlowName + ".flow";
@@ -21,23 +86,83 @@ public class ExecutableFlowLoader {
if (exFlowFiles.length <= 0) {
logger.error("Execution flow " + exFlowName + " missing flow file.");
- return null;
+ throw new ExecutorManagerException("Execution flow " + exFlowName + " missing flow file.");
}
File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+ return lastExFlow;
+ }
+
+ /**
+ * Update Flow status
+ *
+ * @param exDir
+ * @param flow
+ * @return
+ * @throws ExecutorManagerException
+ */
+ public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow) throws ExecutorManagerException {
+ File file = getLatestExecutableFlowDir(exDir);
+ int number = getFlowUpdateNumber(file);
+ if (flow.getUpdateNumber() >= number) {
+ return false;
+ }
- Object exFlowObj = null;
+ Object exFlowObj = getFlowObjectFromFile(file);
+ flow.updateExecutableFlowFromObject(exFlowObj);
+ flow.setUpdateNumber(number);
+
+ return true;
+ }
+
+ /**
+ * Write executable flow file
+ *
+ * @param executionDir
+ * @param flow
+ * @param commitValue
+ * @return
+ * @throws ExecutorManagerException
+ */
+ public static File writeExecutableFlowFile(File executionDir, ExecutableFlow flow, Integer commitValue) throws ExecutorManagerException {
+ // Write out the execution file
+ String flowFileName = "_" + flow.getExecutionId() + ".flow";
+ if (commitValue != null) {
+ String countString = String.format("%05d", commitValue);
+ flowFileName += "." + countString;
+ }
+
+ File flowFile = new File(executionDir, flowFileName);
+ if (flowFile.exists()) {
+ throw new ExecutorManagerException("The flow file " + flowFileName + " already exists. Race condition?");
+ }
+
+ File tempFlowFile = new File(executionDir, "_tmp" + flowFileName);
+ BufferedOutputStream out = null;
try {
- exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+ logger.debug("Writing executable file " + flowFile);
+ out = new BufferedOutputStream(new FileOutputStream(tempFlowFile));
+ JSONUtils.toJSON(flow.toObject(), out, true);
} catch (IOException e) {
- logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
- return null;
+ throw new ExecutorManagerException(e.getMessage(), e);
+ }
+ finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
}
- ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
- flow.setExecutionPath(exDir.getPath());
- return flow;
+ tempFlowFile.renameTo(flowFile);
+ return flowFile;
}
+ /**
+ *
+ */
private static class PrefixFilter implements FileFilter {
private String prefix;
@@ -52,20 +177,5 @@ public class ExecutableFlowLoader {
return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
}
}
-
-
- private static class SuffixFilter implements FileFilter {
- private String suffix;
- private boolean filesOnly = false;
-
- public SuffixFilter(String suffix, boolean filesOnly) {
- this.suffix = suffix;
- }
- @Override
- public boolean accept(File pathname) {
- String name = pathname.getName();
- return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
- }
- }
}
src/java/azkaban/utils/JSONUtils.java 14(+14 -0)
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index 93e0cd2..d682dbe 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -1,6 +1,10 @@
package azkaban.utils;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -56,6 +60,16 @@ public class JSONUtils {
}
}
+ public static void toJSON(Object obj, File file) throws IOException {
+ toJSON(obj, file, false);
+ }
+
+ public static void toJSON(Object obj, File file, boolean prettyPrint) throws IOException {
+ BufferedOutputStream stream = new BufferedOutputStream(new FileOutputStream(file));
+ toJSON(obj, stream, prettyPrint);
+ stream.close();
+ }
+
public static Object parseJSONFromString(String json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = new JsonFactory();
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index 5fa9611..0c189b1 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -77,7 +77,7 @@ public class AzkabanExecutorServer {
String sharedToken = props.getString("executor.shared.token", "");
ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
- root.addServlet(executorHolder, "/submit");
+ root.addServlet(executorHolder, "/executor");
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
runnerManager = new FlowRunnerManager(props);
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index d14ed46..63deab0 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -103,14 +103,14 @@ public class AzkabanWebServer {
* Constructor usually called by tomcat AzkabanServletContext to create the
* initial server
*/
- public AzkabanWebServer() {
+ public AzkabanWebServer() throws Exception {
this(loadConfigurationFromAzkabanHome());
}
/**
* Constructor
*/
- public AzkabanWebServer(Props props) {
+ public AzkabanWebServer(Props props) throws Exception {
this.props = props;
velocityEngine = configureVelocityEngine(props.getBoolean( VELOCITY_DEV_MODE_PARAM, false));
sessionCache = new SessionCache(props);
@@ -179,7 +179,7 @@ public class AzkabanWebServer {
return manager;
}
- private ExecutorManager loadExecutorManager(Props props) {
+ private ExecutorManager loadExecutorManager(Props props) throws Exception {
ExecutorManager execManager = new ExecutorManager(props);
return execManager;
}
@@ -268,7 +268,7 @@ public class AzkabanWebServer {
*
* @param args
*/
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
OptionSpec<String> configDirectory = parser
diff --git a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
index 7c5745a..b8fad67 100644
--- a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
+++ b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
@@ -37,10 +37,15 @@ public class AzkabanServletContextListener implements ServletContextListener {
}
/**
- * Load the app
+ * Load the app for use in non jetty containers.
*/
public void contextInitialized(ServletContextEvent event) {
- this.app = new AzkabanWebServer();
+ try {
+ this.app = new AzkabanWebServer();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
event.getServletContext().setAttribute(AZKABAN_SERVLET_CONTEXT_KEY, this.app);
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 260d8fb..75b1ebc 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -1,6 +1,7 @@
package azkaban.webapp.servlet;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import javax.servlet.ServletConfig;
@@ -12,6 +13,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
+import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.FlowRunnerManager;
import azkaban.utils.Props;
@@ -50,7 +52,8 @@ public class ExecutorServlet extends HttpServlet {
protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
resp.setContentType(JSON_MIME_TYPE);
ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(resp.getOutputStream(), obj);
+ OutputStream stream = resp.getOutputStream();
+ mapper.writeValue(stream, obj);
}
@Override
@@ -61,26 +64,43 @@ public class ExecutorServlet extends HttpServlet {
if (!token.equals(sharedToken)) {
respMap.put("error", "Mismatched token. Will not run.");
}
+ else if (!hasParam(req, "action")) {
+ respMap.put("error", "Parameter action not set");
+ }
else if (!hasParam(req, "execid")) {
respMap.put("error", "Parameter execid not set.");
}
- else if (!hasParam(req, "execpath")) {
- respMap.put("error", "Parameter execpath not set.");
- }
else {
+ String action = getParam(req, "action");
String execid = getParam(req, "execid");
- String execpath = getParam(req, "execpath");
- logger.info("Submitted " + execid + " with " + execpath);
- try {
- flowRunnerManager.submitFlow(execid, execpath);
- } catch (ExecutorManagerException e) {
- e.printStackTrace();
- respMap.put("error", e.getMessage());
+ // Handle execute
+ if (action.equals("execute")) {
+ String execpath = getParam(req, "execpath");
+
+ logger.info("Submitted " + execid + " with " + execpath);
+ try {
+ flowRunnerManager.submitFlow(execid, execpath);
+ respMap.put("status", "success");
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ respMap.put("error", e.getMessage());
+ }
+ }
+ // Handle Status
+ else if (action.equals("status")) {
+ ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
+ if (flow == null) {
+ respMap.put("status", "notfound");
+ }
+ else {
+ respMap.put("status", flow.getStatus().toString());
+ }
}
}
writeJSON(resp, respMap);
+ resp.flushBuffer();
}
@Override
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 038ee96..4703822 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -207,7 +207,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
- exflow.setStatus(entry.getKey(), nodeDisabled ? Status.IGNORED : Status.READY);
+ exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
}
// Create directory
@@ -237,6 +237,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
+
try {
executorManager.executeFlow(exflow);
} catch (ExecutorManagerException e) {
@@ -249,6 +250,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("error", e.getMessage());
return;
}
+
String execId = exflow.getExecutionId();
// The following is just a test for cleanup
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index c6affe7..480af43 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1,6 +1,5 @@
package azkaban.webapp.servlet;
-import java.awt.geom.Point2D;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -197,7 +196,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
int length = Integer.valueOf(getParam(req, "length"));
ArrayList<ExecutableFlow> exFlows = new ArrayList<ExecutableFlow>();
- int total = executorManager.getExecutableFlowByProjectFlow(project.getName(), flowId, from, length, exFlows);
+ int total = executorManager.getExecutableFlows(project.getName(), flowId, from, length, exFlows);
ret.put("flow", flowId);
ret.put("total", total);