azkaban-uncached
Changes
src/java/azkaban/executor/ExecutableFlow.java 56(+22 -34)
src/java/azkaban/executor/ExecutorManager.java 521(+377 -144)
src/java/azkaban/executor/FlowRunner.java 33(+30 -3)
src/java/azkaban/executor/JobRunner.java 34(+27 -7)
src/java/azkaban/webapp/AzkabanExecutorServer.java 101(+66 -35)
src/web/css/azkaban.css 2(+1 -1)
Details
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
new file mode 100644
index 0000000..233128e
--- /dev/null
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -0,0 +1,34 @@
+package azkaban.executor;
+
+public interface ConnectorParams {
+ public static final String ACTION_PARAM = "action";
+ public static final String EXECID_PARAM = "execid";
+ public static final String SHAREDTOKEN_PARAM = "token";
+ public static final String USER_PARAM = "user";
+
+ public static final String STATUS_ACTION = "status";
+ public static final String EXECUTE_ACTION = "execute";
+ public static final String CANCEL_ACTION = "cancel";
+ public static final String PAUSE_ACTION = "pause";
+ public static final String RESUME_ACTION = "resume";
+ public static final String PING_ACTION = "ping";
+
+ public static final String START_PARAM = "start";
+ public static final String END_PARAM = "end";
+ public static final String STATUS_PARAM = "status";
+ public static final String NODES_PARAM = "nodes";
+ public static final String EXECPATH_PARAM = "execpath";
+
+ public static final String RESPONSE_NOTFOUND = "notfound";
+ public static final String RESPONSE_ERROR = "error";
+ public static final String RESPONSE_SUCCESS = "success";
+ public static final String RESPONSE_ALIVE = "alive";
+ public static final String RESPONSE_UPDATETIME = "lasttime";
+
+ public static final int NODE_NAME_INDEX = 0;
+ public static final int NODE_STATUS_INDEX = 1;
+ public static final int NODE_START_INDEX = 2;
+ public static final int NODE_END_INDEX = 3;
+
+ public static final String FORCED_FAILED_MARKER = ".failed";
+}
diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
index 555fd51..2b52f72 100644
--- a/src/java/azkaban/executor/event/Event.java
+++ b/src/java/azkaban/executor/event/Event.java
@@ -9,7 +9,8 @@ public class Event {
JOB_SUCCEEDED,
JOB_FAILED,
JOB_KILLED,
- JOB_SKIPPED
+ JOB_SKIPPED,
+ ERROR
}
private final Object runner;
src/java/azkaban/executor/ExecutableFlow.java 56(+22 -34)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 73129d2..791df64 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -12,6 +12,7 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
public class ExecutableFlow {
@@ -19,7 +20,6 @@ public class ExecutableFlow {
private String flowId;
private String projectId;
private String executionPath;
- private long lastCheckedTime;
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
@@ -32,11 +32,11 @@ public class ExecutableFlow {
private long submitTime = -1;
private long startTime = -1;
private long endTime = -1;
+ private long updateTime = -1;
private int updateNumber = 0;
private Status flowStatus = Status.READY;
private String submitUser;
- private boolean submitted = false;
private boolean notifyOnFirstFailure = true;
private boolean notifyOnLastFailure = false;
@@ -66,12 +66,12 @@ public class ExecutableFlow {
public ExecutableFlow() {
}
- public long getLastCheckedTime() {
- return lastCheckedTime;
+ public long getUpdateTime() {
+ return updateTime;
}
- public void setLastCheckedTime(long lastCheckedTime) {
- this.lastCheckedTime = lastCheckedTime;
+ public void setUpdateTime(long updateTime) {
+ this.updateTime = updateTime;
}
public List<ExecutableNode> getExecutableNodes() {
@@ -117,8 +117,12 @@ public class ExecutableFlow {
targetNode.addInNode(edge.getSourceId());
}
- successEmails = new ArrayList<String>(flow.getSuccessEmails());
- failureEmails = new ArrayList<String>(flow.getFailureEmails());
+ if (flow.getSuccessEmails() != null) {
+ successEmails = new ArrayList<String>(flow.getSuccessEmails());
+ }
+ if (flow.getFailureEmails() != null) {
+ failureEmails = new ArrayList<String>(flow.getFailureEmails());
+ }
flowProps.putAll(flow.getAllFlowProps());
}
@@ -293,9 +297,9 @@ public class ExecutableFlow {
exFlow.executionPath = (String)flowObj.get("executionPath");
exFlow.flowId = (String)flowObj.get("flowId");
exFlow.projectId = (String)flowObj.get("projectId");
- exFlow.submitTime = getLongFromObject(flowObj.get("submitTime"));
- exFlow.startTime = getLongFromObject(flowObj.get("startTime"));
- exFlow.endTime = getLongFromObject(flowObj.get("endTime"));
+ exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
+ exFlow.startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
+ exFlow.endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
exFlow.submitUser = (String)flowObj.get("submitUser");
if (flowObj.containsKey("flowParameters")) {
@@ -340,21 +344,13 @@ public class ExecutableFlow {
return exFlow;
}
- private static long getLongFromObject(Object obj) {
- if (obj instanceof Integer) {
- return Long.valueOf((Integer)obj);
- }
-
- return (Long)obj;
- }
-
@SuppressWarnings("unchecked")
public void updateExecutableFlowFromObject(Object obj) {
HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
- submitTime = getLongFromObject(flowObj.get("submitTime"));
- startTime = getLongFromObject(flowObj.get("startTime"));
- endTime = getLongFromObject(flowObj.get("endTime"));
+ submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
+ startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
+ endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
flowStatus = Status.valueOf((String)flowObj.get("status"));
List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -390,14 +386,6 @@ public class ExecutableFlow {
this.submitUser = submitUser;
}
- public boolean isSubmitted() {
- return submitted;
- }
-
- public void setSubmitted(boolean submitted) {
- this.submitted = submitted;
- }
-
public void setPipelineLevel(int level) {
pipelineLevel = level;
}
@@ -514,8 +502,8 @@ public class ExecutableFlow {
exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
- exNode.startTime = getLongFromObject(objMap.get("startTime"));
- exNode.endTime = getLongFromObject(objMap.get("endTime"));
+ exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+ exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
exNode.level = (Integer)objMap.get("level");
exNode.flow = flow;
@@ -528,8 +516,8 @@ public class ExecutableFlow {
HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
status = Status.valueOf((String)objMap.get("status"));
- startTime = getLongFromObject(objMap.get("startTime"));
- endTime = getLongFromObject(objMap.get("endTime"));
+ startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+ endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
}
public long getStartTime() {
src/java/azkaban/executor/ExecutorManager.java 521(+377 -144)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 819d17b..03dace2 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -6,6 +6,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.URI;
@@ -17,6 +18,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,8 +40,8 @@ import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
+import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
-import azkaban.executor.ExecutorManager.ExecutionReference;
import azkaban.flow.Flow;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.JSONUtils;
@@ -55,9 +57,8 @@ public class ExecutorManager {
private static final String ARCHIVE_DIR = ".archive";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
// 30 seconds of retry before failure.
- private static final long ACCESS_ERROR_THRESHOLD = 30000;
- private static final int UPDATE_THREAD_MS = 1000;
-
+ private static final long ACCESS_ERROR_THRESHOLD_MS = 30000;
+
// log read buffer.
private static final int LOG_BUFFER_READ_SIZE = 10*1024;
@@ -69,10 +70,14 @@ public class ExecutorManager {
private String url = "localhost";
private ConcurrentHashMap<String, ExecutableFlow> runningFlows = new ConcurrentHashMap<String, ExecutableFlow>();
+ private ConcurrentHashMap<String, ExecutionReference> runningReference = new ConcurrentHashMap<String, ExecutionReference>();
private CacheManager manager = CacheManager.create();
private Cache recentFlowsCache;
private static final int LIVE_SECONDS = 600;
+ private Object BlockObj = new Object();
+
+ ExecutingManagerUpdaterThread executingManager;
public ExecutorManager(Props props) throws IOException, ExecutorManagerException {
basePath = new File(props.getString("execution.directory"));
@@ -102,7 +107,7 @@ public class ExecutorManager {
counter.set(0);
loadActiveExecutions();
- ExecutingManagerUpdaterThread executingManager = new ExecutingManagerUpdaterThread();
+ executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
}
@@ -214,7 +219,6 @@ public class ExecutorManager {
Arrays.sort(archivePartitionsDir, new Comparator<File>() {
@Override
public int compare(File arg0, File arg1) {
- // TODO Auto-generated method stub
return arg1.getName().compareTo(arg0.getName());
}});
@@ -237,7 +241,6 @@ public class ExecutorManager {
searchFlows.add(ref);
}
} catch (IOException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
@@ -263,14 +266,19 @@ public class ExecutorManager {
for (File activeFlowDir: activeFlowDirs) {
if (activeFlowDir.isDirectory()) {
ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
+ if (reference.getExecutorUrl() == null) {
+ reference.setExecutorPort(portNumber);
+ reference.setExecutorUrl(url);
+ }
ExecutableFlow flow = this.getFlowFromReference(reference);
if (flow == null) {
logger.error("Flow " + reference.getExecId() + " not found.");
}
- flow.setLastCheckedTime(System.currentTimeMillis());
- flow.setSubmitted(true);
+ reference.setLastCheckedTime(System.currentTimeMillis());
+
if (flow != null) {
+ runningReference.put(reference.getExecId(), reference);
runningFlows.put(flow.getExecutionId(), flow);
}
}
@@ -290,11 +298,14 @@ public class ExecutorManager {
// Find execution
File executionDir;
String executionId;
- int count = counter.getAndIncrement() % 100000;
- String countString = String.format("%05d", count);
+
+ int count = 0;
+
do {
- executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + flowId;
+ String countString = String.format("%05d", count);
+ executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + projectId + "." + flowId;
executionDir = new File(projectExecutionDir, executionId);
+ count++;
}
while(executionDir.exists());
@@ -321,6 +332,22 @@ public class ExecutorManager {
return flow;
}
+ String[] split = executionId.split("\\.");
+ // get project file from split.
+ String projectId = split[2];
+ File projectPath = new File(basePath, projectId);
+ if (projectPath.exists()) {
+ // Execution path sometimes looks like timestamp.count.projectId.flowId. Except flowId could have ..
+ String flowId = executionId.substring(split[0].length() + split[1].length() + projectId.length() + 3);
+ File flowPath = new File(projectPath, flowId);
+ if (flowPath.exists()) {
+ File executionPath = new File(flowPath, executionId);
+ if (executionPath.exists()) {
+ return ExecutableFlowLoader.loadExecutableFlowFromDir(executionPath);
+ }
+ }
+ }
+
// Check active
File baseActiveDir = new File(basePath, ACTIVE_DIR);
File referenceDir = new File(baseActiveDir, executionId);
@@ -355,7 +382,7 @@ public class ExecutorManager {
return null;
}
- private synchronized void addActiveExecutionReference(ExecutableFlow flow) throws ExecutorManagerException {
+ private synchronized ExecutionReference addActiveExecutionReference(ExecutableFlow flow) throws ExecutorManagerException {
File activeDirectory = new File(basePath, ACTIVE_DIR);
if (!activeDirectory.exists()) {
activeDirectory.mkdirs();
@@ -367,67 +394,191 @@ public class ExecutorManager {
// We don't really need to save the reference,
ExecutionReference reference = new ExecutionReference(flow);
+ reference.setExecutorUrl(url);
+ reference.setExecutorPort(portNumber);
try {
reference.writeToDirectory(referenceDir);
} catch (IOException e) {
throw new ExecutorManagerException("Couldn't write execution to directory.", e);
}
- runningFlows.put(flow.getExecutionId(), flow);
+
+ return reference;
}
- public void cancelFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+ private void forceFlowFailure(ExecutableFlow exFlow) throws ExecutorManagerException {
+ String logFileName = "_flow." + exFlow.getExecutionId() + ".log";
+ File executionDir = new File(exFlow.getExecutionPath());
+
+ // Add a marker to the directory as an indicator to zombie processes that this is off limits.
+ File forcedFailed = new File(executionDir, ConnectorParams.FORCED_FAILED_MARKER);
+ if (!forcedFailed.exists()) {
+ try {
+ forcedFailed.createNewFile();
+ } catch (IOException e) {
+ logger.error("Error creating failed marker in execution directory",e);
+ }
+ }
+
+ // Load last update
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
+
+ // Return if already finished.
+ if (exFlow.getStatus() == Status.FAILED ||
+ exFlow.getStatus() == Status.SUCCEEDED ||
+ exFlow.getStatus() == Status.KILLED) {
+ return;
+ }
+
+ // Finish log file
+ File logFile = new File(executionDir, logFileName);
+ if (logFile.exists()) {
+ // Finally add to log
+ FileWriter writer = null;
+ try {
+ writer = new FileWriter(logFile, true);
+ writer.append("\n" + System.currentTimeMillis() + " ERROR: Can't reach executor. Killing Flow!!!!");
+ } catch (IOException e) {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+ }
+
+ // We mark every untouched job with KILLED, and running jobs with FAILED.
+ long time = System.currentTimeMillis();
+ for (ExecutableNode node: exFlow.getExecutableNodes()) {
+ switch(node.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case SKIPPED:
+ case DISABLED:
+ continue;
+ case UNKNOWN:
+ case READY:
+ node.setStatus(Status.KILLED);
+ break;
+ default:
+ node.setStatus(Status.FAILED);
+ break;
+ }
+
+ if (node.getStartTime() == -1) {
+ node.setStartTime(time);
+ }
+ if (node.getEndTime() == -1) {
+ node.setEndTime(time);
+ }
+ }
+
+ if (exFlow.getEndTime() == -1) {
+ exFlow.setEndTime(time);
+ }
+
+ exFlow.setStatus(Status.FAILED);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void cancelFlow(String execId, String user) throws ExecutorManagerException {
logger.info("Calling cancel");
- String response = null;
+ ExecutionReference reference = runningReference.get(execId);
+ if (reference == null) {
+ throw new ExecutorManagerException("Execution " + execId + " not running.");
+ }
+
+ Map<String, Object> respObj = null;
try {
- response = callExecutionServer("cancel", flow, user);
+ String response = callExecutorServer(reference, ConnectorParams.CANCEL_ACTION);
+ respObj = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
} catch (IOException e) {
e.printStackTrace();
throw new ExecutorManagerException("Error cancelling flow.", e);
}
+
+ if (respObj.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ throw new ExecutorManagerException((String)respObj.get(ConnectorParams.RESPONSE_ERROR));
+ }
}
-
- public void pauseFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+
+ @SuppressWarnings("unchecked")
+ public void pauseFlow(String execId, String user) throws ExecutorManagerException {
logger.info("Calling pause");
- String response = null;
+ ExecutionReference reference = runningReference.get(execId);
+ if (reference == null) {
+ throw new ExecutorManagerException("Execution " + execId + " not running.");
+ }
+
+ Map<String, Object> respObj = null;
try {
- response = callExecutionServer("pause", flow, user);
+ String response = callExecutorServer(reference, ConnectorParams.PAUSE_ACTION);
+ respObj = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
} catch (IOException e) {
e.printStackTrace();
- throw new ExecutorManagerException("Error cancelling flow.", e);
+ throw new ExecutorManagerException("Error pausing flow.", e);
+ }
+
+ if (respObj.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ throw new ExecutorManagerException((String)respObj.get(ConnectorParams.RESPONSE_ERROR));
}
}
-
- public void resumeFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+
+ @SuppressWarnings("unchecked")
+ public void resumeFlow(String execId, String user) throws ExecutorManagerException {
logger.info("Calling resume");
- String response = null;
+ ExecutionReference reference = runningReference.get(execId);
+ if (reference == null) {
+ throw new ExecutorManagerException("Execution " + execId + " not running.");
+ }
+
+ Map<String, Object> respObj = null;
try {
- response = callExecutionServer("resume", flow, user);
+ String response = callExecutorServer(reference, ConnectorParams.RESUME_ACTION);
+ respObj = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
} catch (IOException e) {
e.printStackTrace();
- throw new ExecutorManagerException("Error cancelling flow.", e);
+ throw new ExecutorManagerException("Error resuming flow.", e);
}
+ if (respObj.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ throw new ExecutorManagerException((String)respObj.get(ConnectorParams.RESPONSE_ERROR));
+ }
}
- private String callExecutionServer(String action, ExecutableFlow flow) throws IOException{
- return callExecutionServer(action, flow, null);
+ private synchronized String callExecutorServer(ExecutionReference reference, String action) throws IOException {
+ return callExecutorServer(
+ reference.getExecutorUrl(),
+ reference.getExecutorPort(),
+ action,
+ reference.getExecId(),
+ reference.getExecPath(),
+ reference.userId);
}
- private String callExecutionServer(String action, ExecutableFlow flow, String user) throws IOException{
+ private synchronized String callExecutorServer(String url, int port, String action, String execid, String execPath, String user) throws IOException {
URIBuilder builder = new URIBuilder();
builder.setScheme("http")
.setHost(url)
- .setPort(portNumber)
+ .setPort(port)
.setPath("/executor")
- .setParameter("sharedToken", token)
- .setParameter("action", action)
- .setParameter("execid", flow.getExecutionId())
- .setParameter("execpath", flow.getExecutionPath());
+ .setParameter(ConnectorParams.SHAREDTOKEN_PARAM, token)
+ .setParameter(ConnectorParams.ACTION_PARAM, action);
+
+ if (execid != null) {
+ builder.setParameter(ConnectorParams.EXECID_PARAM, execid);
+ }
if (user != null) {
- builder.setParameter("user", user);
+ builder.setParameter(ConnectorParams.USER_PARAM, user);
}
+ if (execPath != null) {
+ builder.setParameter(ConnectorParams.EXECPATH_PARAM, execPath);
+ }
+
URI uri = null;
try {
uri = builder.build();
@@ -437,14 +588,12 @@ public class ExecutorManager {
ResponseHandler<String> responseHandler = new BasicResponseHandler();
- logger.info("Remotely querying " + flow.getExecutionId() + " for status.");
HttpClient httpclient = new DefaultHttpClient();
HttpGet httpget = new HttpGet(uri);
String response = null;
try {
response = httpclient.execute(httpget, responseHandler);
} catch (IOException e) {
- flow.setStatus(ExecutableFlow.Status.FAILED);
e.printStackTrace();
return response;
}
@@ -462,24 +611,30 @@ public class ExecutorManager {
writeResourceFile(executionDir, flow);
ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
- addActiveExecutionReference(flow);
- flow.setLastCheckedTime(System.currentTimeMillis());
- runningFlows.put(flow.getExecutionId(), flow);
+ ExecutionReference reference = addActiveExecutionReference(flow);
logger.info("Setting up " + flow.getExecutionId() + " for execution.");
String response;
try {
- response = callExecutionServer("execute", flow);
+ response = callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+ reference.setSubmitted(true);
+ runningReference.put(flow.getExecutionId(), reference);
+ runningFlows.put(flow.getExecutionId(), flow);
} catch (IOException e) {
e.printStackTrace();
- flow.setStatus(ExecutableFlow.Status.FAILED);
+ // Clean up.
+ forceFlowFailure(flow);
+ cleanFinishedJob(flow);
return;
}
logger.debug("Submitted Response: " + response);
- flow.setLastCheckedTime(System.currentTimeMillis());
- flow.setSubmitted(true);
+
+ reference.setStartTime(System.currentTimeMillis());
+ synchronized(BlockObj) {
+ BlockObj.notify();
+ }
}
private long readLog(File logFile, Writer writer, long startChar, long maxSize) {
@@ -608,7 +763,6 @@ public class ExecutorManager {
try {
out.close();
} catch (IOException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
}
@@ -725,22 +879,17 @@ public class ExecutorManager {
int index = execID.indexOf('.');
return execID.substring(0, index - 8);
}
-
- private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
-
+
+ private void cleanExecutionReferenceJob(ExecutionReference reference) throws ExecutorManagerException {
// Write final file
- int updateNum = exFlow.getUpdateNumber();
- updateNum++;
- ExecutableFlowLoader.writeExecutableFlowFile(new File(exFlow.getExecutionPath()), exFlow, updateNum);
-
- String activeReferencePath = ACTIVE_DIR + File.separator + exFlow.getExecutionId();
+ String exId = reference.getExecId();
+ String activeReferencePath = ACTIVE_DIR + File.separator + exId;
File activeDirectory = new File(basePath, activeReferencePath);
if (!activeDirectory.exists()) {
logger.error("WTF!! Active reference " + activeDirectory + " directory doesn't exist.");
throw new ExecutorManagerException("Active reference " + activeDirectory + " doesn't exists.");
}
- String exId = exFlow.getExecutionId();
String partitionVal = getExecutableReferencePartition(exId);
String archiveDatePartition = ARCHIVE_DIR + File.separator + partitionVal;
@@ -749,7 +898,7 @@ public class ExecutorManager {
archivePartitionDir.mkdirs();
}
- File archiveDirectory = new File(archivePartitionDir, exFlow.getExecutionId());
+ File archiveDirectory = new File(archivePartitionDir, exId);
if (archiveDirectory.exists()) {
logger.error("Archive reference already exists. Cleaning up.");
try {
@@ -766,7 +915,6 @@ public class ExecutorManager {
throw new ExecutorManagerException("Cannot create " + archiveDirectory);
}
- ExecutionReference reference = new ExecutionReference(exFlow);
try {
reference.writeToDirectory(archiveDirectory);
} catch (IOException e) {
@@ -779,126 +927,167 @@ public class ExecutorManager {
} catch (IOException e) {
throw new ExecutorManagerException("Cannot cleanup active directory " + activeDirectory);
}
+
+ runningReference.remove(exId);
+ }
+
+ private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
+ // Write final file
+ int updateNum = exFlow.getUpdateNumber();
+ updateNum++;
+ String exId = exFlow.getExecutionId();
+ ExecutableFlowLoader.writeExecutableFlowFile(new File(exFlow.getExecutionPath()), exFlow, updateNum);
+
+ // Clean up reference
+ ExecutionReference reference = runningReference.get(exId);
+ if (reference != null) {
+ reference.setStartTime(exFlow.getStartTime());
+ reference.setEndTime(exFlow.getEndTime());
+ reference.setStatus(exFlow.getStatus());
+ cleanExecutionReferenceJob(reference);
+ }
runningFlows.remove(exFlow.getExecutionId());
recentFlowsCache.put(new Element(exFlow.getExecutionId(), exFlow));
cleanupUnusedFiles(exFlow);
}
- /**
- * Thread that polls the executor for executing jobs.
- * It is also cleans up the flow execution files after it's done.
- */
private class ExecutingManagerUpdaterThread extends Thread {
private boolean shutdown = false;
- private int updateTimeMs = UPDATE_THREAD_MS;
+ private int waitTimeIdleMs = 1000;
+ private int waitTimeMs = 100;
+
+ @SuppressWarnings("unchecked")
public void run() {
- while (!shutdown) {
- ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>(runningFlows.values());
- for(ExecutableFlow exFlow : flows) {
- if (!exFlow.isSubmitted()) {
+ while(!shutdown) {
+ for (ExecutionReference reference: runningReference.values()) {
+ // Don't do anything if not submitted
+ if (!reference.isSubmitted()) {
continue;
}
- File executionDir = new File(exFlow.getExecutionPath());
+ String execId = reference.getExecId();
+ ExecutableFlow flow = runningFlows.get(execId);
+ if (flow != null) {
+ // Why doesn't flow exist?
+ }
+ File executionDir = new File(flow.getExecutionPath());
+ // The execution dir doesn't exist. So we clean up.
if (!executionDir.exists()) {
logger.error("WTF!! Execution dir " + executionDir + " doesn't exist!");
- // @TODO probably should handle this error case somehow. Cleanup?
+ // Removing reference.
+ reference.setStatus(Status.FAILED);
+ try {
+ cleanExecutionReferenceJob(reference);
+ } catch (ExecutorManagerException e) {
+ logger.error("The execution dir " + executionDir.getPath() + " doesn't exist for " + reference.toRefString(), e);
+ }
continue;
}
-
- // Query the executor service to see if the item is running.
- String responseString = null;
+
+ // Get status from the server. If the server response are errors, than we clean up after 30 seconds of errors.
+ HashMap<String,Object> map = null;
try {
- responseString = callExecutionServer("status", exFlow);
+ String string = callExecutorServer(reference, ConnectorParams.STATUS_ACTION);
+ map = (HashMap<String,Object>)JSONUtils.parseJSONFromString(string);
+ reference.setLastCheckedTime(System.currentTimeMillis());
} catch (IOException e) {
- e.printStackTrace();
- // Connection issue. Backoff 1 sec.
- synchronized(this) {
+ if (System.currentTimeMillis() - reference.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD_MS) {
+ logger.error("Error: Can't connect to server." + reference.toRefString() + ". Might be dead. Cleaning up project.", e);
+ // Cleanup. Since we haven't seen anyone.
try {
- wait(1000);
- } catch (InterruptedException ie) {
+ forceFlowFailure(flow);
+ cleanFinishedJob(flow);
+ } catch (ExecutorManagerException e1) {
+ logger.error("Foreced Fail: Error while cleaning up flow and job. " + reference.toRefString(), e1);
}
+
+ continue;
}
- continue;
- }
- catch (Exception e) {
- e.printStackTrace();
}
- Object executorResponseObj;
- try {
- executorResponseObj = JSONUtils.parseJSONFromString(responseString);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ // If the response from the server is an error, we print out the response and continue.
+ String error = (String)map.get(ConnectorParams.RESPONSE_ERROR);
+ if (error != null) {
+ logger.error("Server status response for " + reference.toRefString() + " was an error: " + error);
continue;
}
- @SuppressWarnings("unchecked")
- HashMap<String, Object> response = (HashMap<String, Object>)executorResponseObj;
- String status = (String)response.get("status");
-
- try {
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
- } catch (ExecutorManagerException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- continue;
+ // If not found, then we clean up.
+ String statusStr = (String)map.get(ConnectorParams.STATUS_PARAM);
+ boolean forceFail = false;
+ if (statusStr.equals(ConnectorParams.RESPONSE_NOTFOUND)) {
+ logger.info("Server status response for " + reference.toRefString() + " was 'notfound'. Cleaning up");
+ try {
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+ forceFail = true;
+ } catch (ExecutorManagerException e) {
+ logger.error("Error updating flow status " + flow.getExecutionId() + " from file.", e);
+ continue;
+ }
}
-
- // If it's completed, and not running, we clean up.
- if (exFlow.getStatus() == Status.SUCCEEDED || exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.KILLED) {
- if (status.equals("notfound")) {
- // Cleanup
- logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
+ else {
+ // It's found, so we check the status.
+ long time = JSONUtils.getLongFromObject(map.get(ConnectorParams.RESPONSE_UPDATETIME));
+
+ if (time > flow.getUpdateTime()) {
try {
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
- cleanFinishedJob(exFlow);
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+ // Update reference
+ reference.setStartTime(flow.getStartTime());
+ reference.setEndTime(flow.getEndTime());
+ reference.setStatus(flow.getStatus());
} catch (ExecutorManagerException e) {
- e.printStackTrace();
- continue;
+ logger.error("Error updating flow status " + flow.getExecutionId() + " from file.", e);
}
- exFlow.setLastCheckedTime(System.currentTimeMillis());
+
+ flow.setUpdateTime(time);
}
- else {
- logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running with msg: " + status);
- if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
- exFlow.setStatus(Status.FAILED);
- exFlow.setEndTime(System.currentTimeMillis());
- logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
+ }
+
+ switch(flow.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ try {
+ cleanFinishedJob(flow);
+ } catch (ExecutorManagerException e) {
+ logger.error("Error while cleaning up flow and job. " + reference.toRefString(), e);
+ }
+
+ break;
+ default:{
+ // We force the failure.
+ if (forceFail) {
+ try {
+ forceFlowFailure(flow);
+ cleanFinishedJob(flow);
+ } catch (ExecutorManagerException e) {
+ logger.error("Foreced Fail: Error while cleaning up flow and job. " + reference.toRefString(), e);
+ }
}
}
}
- else {
- // If it's not finished, and not running, we will fail it and clean up.
- if (status.equals("notfound")) {
- logger.error("Flow " + exFlow.getExecutionId() + " is running, but the Executor can't find it.");
- if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
- exFlow.setStatus(Status.FAILED);
- exFlow.setEndTime(System.currentTimeMillis());
- logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
- }
+ }
+
+ // Change to rotating queue?
+ synchronized(BlockObj) {
+ try {
+ if (runningReference.isEmpty()) {
+ BlockObj.wait(waitTimeIdleMs);
}
else {
- exFlow.setLastCheckedTime(System.currentTimeMillis());
- }
- }
-
- synchronized(this) {
- try {
- wait(updateTimeMs);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ BlockObj.wait(waitTimeMs);
}
+ } catch (InterruptedException e) {
}
}
}
}
}
+
/**
* Reference to a Flow Execution.
* It allows us to search for Flow and Project with only the Execution Id, it references
@@ -913,11 +1102,18 @@ public class ExecutorManager {
private long startTime;
private long endTime;
private Status status;
-
+ private String executorUrl;
+
+ private int executorPort;
+ private boolean isSubmitted = true;
+ private long lastCheckedTime = -1;
+
public ExecutionReference() {
+ this.lastCheckedTime = System.currentTimeMillis();
}
public ExecutionReference(ExecutableFlow flow) {
+ this();
this.execId = flow.getExecutionId();
this.projectId = flow.getProjectId();
this.flowId = flow.getFlowId();
@@ -927,6 +1123,7 @@ public class ExecutorManager {
this.startTime = flow.getStartTime();
this.endTime = flow.getEndTime();
this.status = flow.getStatus();
+ this.isSubmitted = false;
}
private Object toObject() {
@@ -939,6 +1136,8 @@ public class ExecutorManager {
obj.put("startTime", startTime);
obj.put("endTime", endTime);
obj.put("status", status);
+ obj.put("executorUrl", executorUrl);
+ obj.put("executorPort", executorPort);
return obj;
}
@@ -955,6 +1154,7 @@ public class ExecutorManager {
throw new IOException("Execution file execution.json does not exist.");
}
+ @SuppressWarnings("unchecked")
HashMap<String, Object> obj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(file);
ExecutionReference reference = new ExecutionReference();
reference.execId = (String)obj.get("execId");
@@ -962,12 +1162,22 @@ public class ExecutorManager {
reference.flowId = (String)obj.get("flowId");
reference.userId = (String)obj.get("userId");
reference.execPath = (String)obj.get("execPath");
- reference.startTime = getLongFromObject(obj.get("startTime"));
- reference.endTime = getLongFromObject(obj.get("endTime"));
+ reference.startTime = JSONUtils.getLongFromObject(obj.get("startTime"));
+ reference.endTime = JSONUtils.getLongFromObject(obj.get("endTime"));
reference.status = Status.valueOf((String)obj.get("status"));
+
+ if (obj.containsKey("executorUrl")) {
+ reference.executorUrl = (String)obj.get("executorUrl");
+ reference.executorPort = (Integer)obj.get("executorPort");
+ }
+
return reference;
}
+ public String toRefString() {
+ return execId + ":" + executorUrl + ":" + executorPort;
+ }
+
public String getExecId() {
return execId;
}
@@ -1016,15 +1226,38 @@ public class ExecutorManager {
public void setStatus(Status status) {
this.status = status;
}
- }
-
- private static long getLongFromObject(Object obj) {
- if (obj instanceof Integer) {
- return Long.valueOf((Integer)obj);
+
+ public String getExecutorUrl() {
+ return executorUrl;
}
-
- return (Long)obj;
- }
+ public void setExecutorUrl(String executorUrl) {
+ this.executorUrl = executorUrl;
+ }
+
+ public int getExecutorPort() {
+ return executorPort;
+ }
+
+ public void setExecutorPort(int port) {
+ this.executorPort = port;
+ }
+
+ public boolean isSubmitted() {
+ return isSubmitted;
+ }
+
+ public void setSubmitted(boolean isSubmitted) {
+ this.isSubmitted = isSubmitted;
+ }
+
+ public long getLastCheckedTime() {
+ return lastCheckedTime;
+ }
+
+ public void setLastCheckedTime(long lastCheckedTime) {
+ this.lastCheckedTime = lastCheckedTime;
+ }
+ }
}
src/java/azkaban/executor/FlowRunner.java 33(+30 -3)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 04f0991..51d8431 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -66,6 +66,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private Props flowOverrideProps = null;
private FailureAction failedAction;
+ private boolean testMode = false;
+ private File failedMarker;
public FlowRunner(ExecutableFlow flow) {
this.flow = flow;
@@ -79,6 +81,8 @@ public class FlowRunner extends EventHandler implements Runnable {
flowOverrideProps = new Props(null, flow.getFlowParameters());
}
failedAction = flow.getFailureAction();
+ failedMarker = new File(basePath, ConnectorParams.FORCED_FAILED_MARKER);
+
createLogger();
}
@@ -201,13 +205,17 @@ public class FlowRunner extends EventHandler implements Runnable {
@Override
public void run() {
+ if (testMode) {
+ logger.info("Running in testmode");
+ }
currentThread = Thread.currentThread();
flow.setStatus(Status.RUNNING);
flow.setStartTime(System.currentTimeMillis());
logger.info("Starting Flow");
this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
-
+ boolean forceFailed = false;
+
// Load all shared props
try {
logger.info("Loading all shared properties");
@@ -252,7 +260,16 @@ public class FlowRunner extends EventHandler implements Runnable {
continue;
}
}
-
+
+ if (failedMarker.exists()) {
+ logger.error("Looks like this job will be forced failed due to error.");
+ flow.setStatus(Status.FAILED);
+ forceFailed = true;
+ executorService.shutdownNow();
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+ return;
+ }
+
if (runner != null) {
try {
ExecutableNode node = runner.getNode();
@@ -273,7 +290,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
}
-
+
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
@@ -329,6 +346,8 @@ public class FlowRunner extends EventHandler implements Runnable {
Props jobProps = new Props(parentProps, propsFile);
JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
+ jobRunner.setTestMode(testMode);
+
jobRunner.addListener(listener);
return jobRunner;
@@ -506,6 +525,14 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ public boolean isTestMode() {
+ return testMode;
+ }
+
+ public void setTestMode(boolean testMode) {
+ this.testMode = testMode;
+ }
+
private class JobRunnerEventListener implements EventListener {
private FlowRunner flowRunner;
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 164479c..53b4b67 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -11,6 +11,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
import org.apache.log4j.Logger;
import azkaban.utils.Utils;
@@ -46,6 +52,14 @@ public class FlowRunnerManager {
private Props globalProps;
+ // Keep recent flows only one minute after it finished.
+ private CacheManager manager = CacheManager.create();
+ private Cache recentFlowsCache;
+ private static final int LIVE_SECONDS = 60;
+ private static final int RECENT_FLOWS_CACHE_SIZE = 100;
+
+ private boolean testMode = false;
+
public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
this.mailer = mailer;
@@ -53,6 +67,13 @@ public class FlowRunnerManager {
this.clientHostname = props.getString("jetty.hostname", "localhost");
this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+ setupCache();
+
+ testMode = props.getBoolean("test.mode", false);
+ if (testMode) {
+ logger.info("Running in testMode.");
+ }
+
basePath = new File(props.getString("execution.directory"));
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
@@ -62,6 +83,19 @@ public class FlowRunnerManager {
submitterThread.start();
}
+ private void setupCache() {
+ CacheConfiguration cacheConfig = new CacheConfiguration("recentFlowsCache",RECENT_FLOWS_CACHE_SIZE)
+ .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.FIFO)
+ .overflowToDisk(false)
+ .eternal(false)
+ .timeToLiveSeconds(LIVE_SECONDS)
+ .diskPersistent(false)
+ .diskExpiryThreadIntervalSeconds(0);
+
+ recentFlowsCache = new Cache(cacheConfig);
+ manager.addCache(recentFlowsCache);
+ }
+
public void submitFlow(String id, String path) throws ExecutorManagerException {
// Load file and submit
logger.info("Flow " + id + " submitted with path " + path);
@@ -71,6 +105,8 @@ public class FlowRunnerManager {
flow.setExecutionPath(path);
FlowRunner runner = new FlowRunner(flow);
+ runner.setTestMode(testMode);
+
runningFlows.put(id, runner);
runner.setGlobalProps(globalProps);
runner.addListener(eventListener);
@@ -105,7 +141,11 @@ public class FlowRunnerManager {
public ExecutableFlow getExecutableFlow(String id) {
FlowRunner runner = runningFlows.get(id);
if (runner == null) {
- return null;
+ Element elem = recentFlowsCache.get(id);
+ if (elem == null) {
+ return null;
+ }
+ return (ExecutableFlow)elem.getObjectValue();
}
return runner.getFlow();
@@ -157,6 +197,7 @@ public class FlowRunnerManager {
logger.info("Flow " + flow.getExecutionId() + " has finished.");
runningFlows.remove(flow.getExecutionId());
+ recentFlowsCache.put(new Element(flow.getExecutionId(), flow));
}
}
}
@@ -199,7 +240,9 @@ public class FlowRunnerManager {
body += (URL + "\n");
}
- mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+ if (!testMode) {
+ mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+ }
} catch (UnknownHostException uhe) {
logger.error(uhe);
} catch (Exception e) {
@@ -224,7 +267,9 @@ public class FlowRunnerManager {
body += (URL + "\n");
}
- mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+ if (!testMode) {
+ mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+ }
} catch (UnknownHostException uhe) {
logger.error(uhe);
} catch (Exception e) {
src/java/azkaban/executor/JobRunner.java 34(+27 -7)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index a1bdef0..5c79ea7 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -36,6 +36,7 @@ public class JobRunner extends EventHandler implements Runnable {
private Job job;
private String executionId = null;
+ private boolean testMode = false;
private static final Object logCreatorLock = new Object();
@@ -115,13 +116,24 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getId(), props, logger);
- try {
- job.run();
- } catch (Throwable e) {
- succeeded = false;
- node.setStatus(Status.FAILED);
- logError("Job run failed!");
- e.printStackTrace();
+ if (testMode) {
+ logInfo("Test Mode. Skipping.");
+ synchronized(this) {
+ try {
+ wait(5000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ else {
+ try {
+ job.run();
+ } catch (Throwable e) {
+ succeeded = false;
+ node.setStatus(Status.FAILED);
+ logError("Job run failed!");
+ e.printStackTrace();
+ }
}
node.setEndTime(System.currentTimeMillis());
@@ -175,4 +187,12 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ public boolean isTestMode() {
+ return testMode;
+ }
+
+ public void setTestMode(boolean testMode) {
+ this.testMode = testMode;
+ }
+
}
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 8ff1785..e7571d7 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -169,6 +169,7 @@ public class ExecutableFlowLoader {
}
tempFlowFile.renameTo(flowFile);
+ flow.setUpdateTime(System.currentTimeMillis());
return flowFile;
}
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index 5d2c199..1207a21 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -138,4 +138,12 @@ public class JSONUtils {
return null;
}
}
+
+ public static long getLongFromObject(Object obj) {
+ if (obj instanceof Integer) {
+ return Long.valueOf((Integer)obj);
+ }
+
+ return (Long)obj;
+ }
}
src/java/azkaban/webapp/AzkabanExecutorServer.java 101(+66 -35)
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index c7b1d6f..6afc9cc 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -39,6 +39,7 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.FlowRunnerManager;
@@ -71,7 +72,6 @@ public class AzkabanExecutorServer {
private File tempDir;
private Server server;
-
/**
* Constructor
*
@@ -263,13 +263,11 @@ public class AzkabanExecutorServer {
return null;
}
- public static class ExecutorServlet extends HttpServlet {
+ public static class ExecutorServlet extends HttpServlet implements ConnectorParams {
+ private static final long serialVersionUID = 1L;
private static final Logger logger = Logger.getLogger(ExecutorServlet.class.getName());
public static final String JSON_MIME_TYPE = "application/json";
-
- public enum State {
- FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
- }
+
private String sharedToken;
private AzkabanExecutorServer application;
private FlowRunnerManager flowRunnerManager;
@@ -303,43 +301,42 @@ public class AzkabanExecutorServer {
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
HashMap<String,Object> respMap= new HashMap<String,Object>();
- String token = getParam(req, "sharedToken");
+ String token = getParam(req, SHAREDTOKEN_PARAM);
if (!token.equals(sharedToken)) {
respMap.put("error", "Mismatched token. Will not run.");
}
- else if (!hasParam(req, "action")) {
+ else if (!hasParam(req, ACTION_PARAM)) {
respMap.put("error", "Parameter action not set");
}
- else if (!hasParam(req, "execid")) {
- respMap.put("error", "Parameter execid not set.");
- }
else {
- String action = getParam(req, "action");
- String execid = getParam(req, "execid");
+ String action = getParam(req, ACTION_PARAM);
+ String execid = getParam(req, EXECID_PARAM, null);
+ String user = getParam(req, USER_PARAM, null);
- // Handle execute
- if (action.equals("execute")) {
+ if (action.equals(PING_ACTION)) {
+ respMap.put("status", "alive");
+ }
+ else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
}
- // Handle Status
- else if (action.equals("status")) {
+ else if (action.equals(STATUS_ACTION)) {
handleAjaxFlowStatus(respMap, execid);
}
- else if (action.equals("cancel")) {
- String user = getParam(req, "user");
+ else if (action.equals(CANCEL_ACTION)) {
logger.info("Cancel called.");
handleAjaxCancel(respMap, execid, user);
}
- else if (action.equals("pause")) {
- String user = getParam(req, "user");
+ else if (action.equals(PAUSE_ACTION)) {
logger.info("Paused called.");
handleAjaxPause(respMap, execid, user);
}
- else if (action.equals("resume")) {
- String user = getParam(req, "user");
+ else if (action.equals(RESUME_ACTION)) {
logger.info("Resume called.");
handleAjaxResume(respMap, execid, user);
}
+ else {
+ respMap.put("error", "action: '" + action + "' not supported.");
+ }
}
writeJSON(resp, respMap);
@@ -347,55 +344,80 @@ public class AzkabanExecutorServer {
}
private void handleAjaxExecute(HttpServletRequest req, Map<String, Object> respMap, String execid) throws ServletException {
- String execpath = getParam(req, "execpath");
+ if (execid == null) {
+ respMap.put(RESPONSE_ERROR, EXECID_PARAM + " has not been set");
+ return;
+ }
+
+ String execpath = getParam(req, EXECPATH_PARAM);
logger.info("Submitted " + execid + " with " + execpath);
try {
flowRunnerManager.submitFlow(execid, execpath);
- respMap.put("status", "success");
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (ExecutorManagerException e) {
e.printStackTrace();
- respMap.put("error", e.getMessage());
+ respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
private void handleAjaxFlowStatus(Map<String, Object> respMap, String execid) {
+ if (execid == null) {
+ respMap.put(RESPONSE_ERROR, EXECID_PARAM + " has not been set");
+ return;
+ }
+
ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
if (flow == null) {
- respMap.put("status", "notfound");
+ respMap.put(STATUS_PARAM, RESPONSE_NOTFOUND);
}
else {
- respMap.put("status", flow.getStatus().toString());
+ respMap.put(STATUS_PARAM, flow.getStatus().toString());
+ respMap.put(RESPONSE_UPDATETIME, flow.getUpdateTime());
}
}
private void handleAjaxPause(Map<String, Object> respMap, String execid, String user) throws ServletException {
-
+ if (execid == null || user == null) {
+ respMap.put(RESPONSE_ERROR, "execid or user has not been set");
+ return;
+ }
+
try {
flowRunnerManager.pauseFlow(execid, user);
- respMap.put("status", "success");
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (ExecutorManagerException e) {
e.printStackTrace();
- respMap.put("error", e.getMessage());
+ respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
private void handleAjaxResume(Map<String, Object> respMap, String execid, String user) throws ServletException {
+ if (execid == null || user == null) {
+ respMap.put(RESPONSE_ERROR, "execid or user has not been set");
+ return;
+ }
+
try {
flowRunnerManager.resumeFlow(execid, user);
- respMap.put("status", "success");
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (ExecutorManagerException e) {
e.printStackTrace();
- respMap.put("error", e.getMessage());
+ respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
private void handleAjaxCancel(Map<String, Object> respMap, String execid, String user) throws ServletException {
+ if (execid == null || user == null) {
+ respMap.put(RESPONSE_ERROR, "execid or user has not been set");
+ return;
+ }
+
try {
flowRunnerManager.cancelFlow(execid, user);
- respMap.put("status", "success");
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
} catch (ExecutorManagerException e) {
e.printStackTrace();
- respMap.put("error", e.getMessage());
+ respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
@@ -419,6 +441,15 @@ public class AzkabanExecutorServer {
else
return p;
}
+
+ public String getParam(HttpServletRequest request, String name, String defaultVal ) {
+ String p = request.getParameter(name);
+ if (p == null) {
+ return defaultVal;
+ }
+
+ return p;
+ }
}
}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index ac000b5..751cea4 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -118,6 +118,24 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
}
/**
+ * Retrieves the param from the http servlet request.
+ *
+ * @param request
+ * @param name
+ * @param default
+ *
+ * @return
+ */
+ public String getParam(HttpServletRequest request, String name, String defaultVal){
+ String p = request.getParameter(name);
+ if (p == null) {
+ return defaultVal;
+ }
+ return p;
+ }
+
+
+ /**
* Returns the param and parses it into an int. Will throw an exception if
* not found, or a parse error if the type is incorrect.
*
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 86ec0ba..f9bb3f6 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -415,7 +415,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- executorManager.cancelFlow(exFlow, user.getUserId());
+ executorManager.cancelFlow(exFlow.getExecutionId(), user.getUserId());
} catch (ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
@@ -444,7 +444,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- executorManager.pauseFlow(exFlow, user.getUserId());
+ executorManager.pauseFlow(exFlow.getExecutionId(), user.getUserId());
} catch (ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
@@ -457,7 +457,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
try {
- executorManager.resumeFlow(exFlow, user.getUserId());
+ executorManager.resumeFlow(exFlow.getExecutionId(), user.getUserId());
} catch (ExecutorManagerException e) {
ret.put("resume", e.getMessage());
}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 89486ec..62f5f70 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -21,19 +21,15 @@ import azkaban.webapp.session.Session;
* Abstract Servlet that handles auto login when the session hasn't been
* verified.
*/
-public abstract class LoginAbstractAzkabanServlet extends
- AbstractAzkabanServlet {
+public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
- private static final Logger logger = Logger
- .getLogger(LoginAbstractAzkabanServlet.class.getName());
+ private static final Logger logger = Logger.getLogger(LoginAbstractAzkabanServlet.class.getName());
private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
@Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
-
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// Set session id
Session session = getSessionFromRequest(req);
if (hasParam(req, "logout")) {
@@ -89,7 +85,6 @@ public abstract class LoginAbstractAzkabanServlet extends
}
private void handleLogin(HttpServletRequest req, HttpServletResponse resp, String errorMsg) throws ServletException, IOException {
-
Page page = newPage(req, resp,
"azkaban/webapp/servlet/velocity/login.vm");
if (errorMsg != null) {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 211ea49..cb917f1 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -23,14 +23,11 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
-import azkaban.executor.ExecutorManager.ExecutionReference;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
-import azkaban.project.ProjectManagerException;
import azkaban.user.User;
import azkaban.user.Permission.Type;
-import azkaban.webapp.servlet.HistoryServlet.PageSelection;
import azkaban.webapp.session.Session;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduledFlow;
src/web/css/azkaban.css 2(+1 -1)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 90e59be..9d88d40 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1929,7 +1929,7 @@ td .status.FAILED {
}
td .status.READY {
- background-color: #C82123;
+ background-color: #CCC;
}
td .status.RUNNING {
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index f18211f..7ba5aff 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -131,8 +131,12 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
alert(data.error);
}
else {
- $('#successEmails').val(data.successEmails.join());
- $('#failureEmails').val(data.failureEmails.join());
+ if (data.successEmails) {
+ $('#successEmails').val(data.successEmails.join());
+ }
+ if (data.failureEmails) {
+ $('#failureEmails').val(data.failureEmails.join());
+ }
if (data.failureAction) {
$('#failureAction').val(data.failureAction);