azkaban-aplcache
Changes
src/java/azkaban/executor/ExecutorManager.java 50(+36 -14)
src/java/azkaban/executor/FlowRunner.java 257(+149 -108)
src/web/css/azkaban.css 20(+18 -2)
src/web/js/azkaban.ajax.utils.js 30(+30 -0)
src/web/js/azkaban.exflow.view.js 84(+69 -15)
Details
src/java/azkaban/executor/ExecutorManager.java 50(+36 -14)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index b7a47f6..77806bf 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -342,30 +342,33 @@ public class ExecutorManager {
runningFlows.put(flow.getExecutionId(), flow);
}
- public void cancelFlow(ExecutableFlow flow) throws ExecutorManagerException {
+ public void cancelFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+ logger.info("Calling cancel");
String response = null;
try {
- response = callExecutionServer("cancel", flow);
+ response = callExecutionServer("cancel", flow, user);
} catch (IOException e) {
e.printStackTrace();
throw new ExecutorManagerException("Error cancelling flow.", e);
}
}
- public void pauseFlow(ExecutableFlow flow) throws ExecutorManagerException {
+ public void pauseFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+ logger.info("Calling pause");
String response = null;
try {
- response = callExecutionServer("pause", flow);
+ response = callExecutionServer("pause", flow, user);
} catch (IOException e) {
e.printStackTrace();
throw new ExecutorManagerException("Error cancelling flow.", e);
}
}
- public void resumeFlow(ExecutableFlow flow) throws ExecutorManagerException {
+ public void resumeFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+ logger.info("Calling resume");
String response = null;
try {
- response = callExecutionServer("resume", flow);
+ response = callExecutionServer("resume", flow, user);
} catch (IOException e) {
e.printStackTrace();
throw new ExecutorManagerException("Error cancelling flow.", e);
@@ -374,16 +377,24 @@ public class ExecutorManager {
}
private String callExecutionServer(String action, ExecutableFlow flow) throws IOException{
+ return callExecutionServer(action, flow, null);
+ }
+
+ private String callExecutionServer(String action, ExecutableFlow flow, String user) throws IOException{
URIBuilder builder = new URIBuilder();
builder.setScheme("http")
.setHost(url)
.setPort(portNumber)
.setPath("/executor")
.setParameter("sharedToken", token)
- .setParameter("action", "resume")
+ .setParameter("action", action)
.setParameter("execid", flow.getExecutionId())
.setParameter("execpath", flow.getExecutionPath());
+ if (user != null) {
+ builder.setParameter("user", user);
+ }
+
URI uri = null;
try {
uri = builder.build();
@@ -393,7 +404,7 @@ public class ExecutorManager {
ResponseHandler<String> responseHandler = new BasicResponseHandler();
- logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
+ logger.info("Remotely querying " + flow.getExecutionId() + " for status.");
HttpClient httpclient = new DefaultHttpClient();
HttpGet httpget = new HttpGet(uri);
String response = null;
@@ -590,7 +601,7 @@ public class ExecutorManager {
// Then we're taking a substring of length - 6 to lop off the bottom 5 digits effectively partitioning
// by 100000 millisec. We do this to have quicker searchs by pulling partitions, not full directories.
int index = execID.indexOf('.');
- return execID.substring(0, index - 5);
+ return execID.substring(0, index - 6);
}
private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
@@ -690,11 +701,14 @@ public class ExecutorManager {
}
continue;
}
+ catch (Exception e) {
+ e.printStackTrace();
+ }
Object executorResponseObj;
try {
executorResponseObj = JSONUtils.parseJSONFromString(responseString);
- } catch (IOException e) {
+ } catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
continue;
@@ -705,7 +719,7 @@ public class ExecutorManager {
String status = (String)response.get("status");
try {
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
} catch (ExecutorManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -718,7 +732,7 @@ public class ExecutorManager {
// Cleanup
logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
try {
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+ ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
cleanFinishedJob(exFlow);
} catch (ExecutorManagerException e) {
e.printStackTrace();
@@ -826,8 +840,8 @@ public class ExecutorManager {
reference.flowId = (String)obj.get("flowId");
reference.userId = (String)obj.get("userId");
reference.execPath = (String)obj.get("execPath");
- reference.startTime = (Long)obj.get("startTime");
- reference.endTime = (Long)obj.get("endTime");
+ reference.startTime = getLongFromObject(obj.get("startTime"));
+ reference.endTime = getLongFromObject(obj.get("endTime"));
reference.status = Status.valueOf((String)obj.get("status"));
return reference;
}
@@ -881,4 +895,12 @@ public class ExecutorManager {
this.status = status;
}
}
+
+ private static long getLongFromObject(Object obj) {
+ if (obj instanceof Integer) {
+ return Long.valueOf((Integer)obj);
+ }
+
+ return (Long)obj;
+ }
}
src/java/azkaban/executor/FlowRunner.java 257(+149 -108)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index f450b80..d0c0dc6 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -33,17 +33,19 @@ import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
- private static final Layout DEFAULT_LAYOUT = new PatternLayout( "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout(
+ "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
public static final int NUM_CONCURRENT_THREADS = 10;
private ExecutableFlow flow;
private ExecutorService executorService;
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
- private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
+ private List<JobRunner> pausedJobsToRun = Collections
+ .synchronizedList(new ArrayList<JobRunner>());
private int numThreads = NUM_CONCURRENT_THREADS;
- private boolean cancelled = true;
- private boolean paused = true;
+ private boolean cancelled = false;
+ private boolean paused = false;
private Map<String, JobRunner> runningJobs;
private JobRunnerEventListener listener;
@@ -55,33 +57,33 @@ public class FlowRunner extends EventHandler implements Runnable {
private Logger logger;
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender flowAppender;
-
+
private Thread currentThread;
-
+
public enum FailedFlowOptions {
- FINISH_RUNNING_JOBS,
- KILL_ALL
+ FINISH_RUNNING_JOBS, KILL_ALL
}
-
+
private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
-
+
public FlowRunner(ExecutableFlow flow) {
this.flow = flow;
this.basePath = new File(flow.getExecutionPath());
this.executorService = Executors.newFixedThreadPool(numThreads);
this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
this.listener = new JobRunnerEventListener(this);
-
+
createLogger();
}
-
+
public ExecutableFlow getFlow() {
return flow;
}
-
+
private void createLogger() {
// Create logger
- String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
+ String loggerName = System.currentTimeMillis() + "."
+ + flow.getExecutionId();
logger = Logger.getLogger(loggerName);
// Create file appender
@@ -97,74 +99,97 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.error("Could not open log file in " + basePath, e);
}
}
-
+
private void closeLogger() {
logger.removeAppender(flowAppender);
flowAppender.close();
}
-
- public void cancel() {
- logger.info("Cancel Invoked");
+
+ public synchronized void cancel(String user) {
+ logger.info("Cancel called by " + user);
cancelled = true;
-
+
executorService.shutdownNow();
-
+
+ if (pausedJobsToRun.size() > 0) {
+ logger.info("Cancelling... Clearing paused jobs queue of size "
+ + pausedJobsToRun.size());
+ pausedJobsToRun.clear();
+ }
+
// Loop through job runners
- for (JobRunner runner: runningJobs.values()) {
- if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
+ for (JobRunner runner : runningJobs.values()) {
+ if (runner.getStatus() == Status.WAITING
+ || runner.getStatus() == Status.RUNNING
+ || runner.getStatus() == Status.PAUSED) {
+ logger.info("Cancelling... Killing job "
+ + runner.getNode().getId() + " with status "
+ + runner.getStatus());
runner.cancel();
}
}
- flow.setStatus(Status.KILLED);
+ logger.info("Flow cancelled.");
+ if (flow.getStatus() != Status.FAILED) {
+ flow.setStatus(Status.KILLED);
+ }
+ flow.setEndTime(System.currentTimeMillis());
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
}
-
- public synchronized void pause() {
- logger.info("Pause flow");
- if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.WAITING) {
+
+ public synchronized void pause(String user) {
+ if (flow.getStatus() == Status.RUNNING
+ || flow.getStatus() == Status.WAITING) {
+ logger.info("Flow paused by " + user);
paused = true;
flow.setStatus(Status.PAUSED);
}
}
-
- public synchronized void resume() {
- logger.info("Resume flow");
+
+ public synchronized void resume(String user) {
+ if (isCancelled()) {
+ logger.info("Cannot resume cancelled flow.");
+ return;
+ }
+
if (flow.getStatus() == Status.PAUSED) {
- flow.setStatus(Status.RUNNING);
+ paused = false;
+ logger.info("Flow resumed by " + user);
jobsToRun.addAll(pausedJobsToRun);
+ flow.setStatus(Status.RUNNING);
}
}
-
+
public boolean isCancelled() {
return cancelled;
}
-
+
private synchronized void commitFlow() {
int count = commitCount.getAndIncrement();
try {
- ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
+ ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow,
+ count);
} catch (ExecutorManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
-
+
@Override
public void run() {
currentThread = Thread.currentThread();
-
+
flow.setStatus(Status.RUNNING);
flow.setStartTime(System.currentTimeMillis());
logger.info("Starting Flow");
this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
-
+
// Load all shared props
try {
logger.info("Loading all shared properties");
loadAllProperties(flow);
- }
- catch (IOException e) {
+ } catch (IOException e) {
flow.setStatus(Status.FAILED);
logger.error("Property loading failed due to " + e.getMessage());
logger.error("Exiting Prematurely.");
@@ -175,14 +200,15 @@ public class FlowRunner extends EventHandler implements Runnable {
// Set up starting nodes
try {
logger.info("Queuing starting jobs.");
- for (String startNode: flow.getStartNodes()) {
+ for (String startNode : flow.getStartNodes()) {
ExecutableNode node = flow.getExecutableNode(startNode);
JobRunner jobRunner = createJobRunner(node, null);
jobsToRun.add(jobRunner);
runningJobs.put(startNode, jobRunner);
}
} catch (IOException e) {
- logger.error("Starting job queueing failed due to " + e.getMessage());
+ logger.error("Starting job queueing failed due to "
+ + e.getMessage());
flow.setStatus(Status.FAILED);
jobsToRun.clear();
runningJobs.clear();
@@ -190,9 +216,9 @@ public class FlowRunner extends EventHandler implements Runnable {
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
return;
}
-
+
// Main loop
- while(!runningJobs.isEmpty()) {
+ while (!runningJobs.isEmpty()) {
JobRunner runner = null;
try {
runner = jobsToRun.poll(5, TimeUnit.MINUTES);
@@ -200,12 +226,11 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("FlowRunner thread has been interrupted.");
if (runningJobs.isEmpty()) {
break;
- }
- else {
+ } else {
continue;
}
}
-
+
if (runner != null) {
try {
ExecutableNode node = runner.getNode();
@@ -216,21 +241,21 @@ public class FlowRunner extends EventHandler implements Runnable {
// Should reject if I shutdown executor.
break;
}
-
+
// Just to make sure we back off so we don't flood.
synchronized (this) {
try {
wait(5);
} catch (InterruptedException e) {
-
+
}
}
}
}
-
+
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
-
+
while (executorService.isTerminated()) {
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
@@ -242,15 +267,16 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setEndTime(System.currentTimeMillis());
if (flow.getStatus() == Status.RUNNING) {
- logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ logger.info("Flow finished successfully in "
+ + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.SUCCEEDED);
- }
- else if (flow.getStatus() == Status.KILLED) {
- logger.info("Flow was killed in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ } else if (flow.getStatus() == Status.KILLED) {
+ logger.info("Flow was killed in "
+ + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.KILLED);
- }
- else {
- logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+ } else {
+ logger.info("Flow finished with failures in "
+ + (flow.getEndTime() - flow.getStartTime()) + " ms.");
flow.setStatus(Status.FAILED);
}
@@ -258,102 +284,114 @@ public class FlowRunner extends EventHandler implements Runnable {
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
closeLogger();
}
-
- private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
+
+ private JobRunner createJobRunner(ExecutableNode node, Props previousOutput)
+ throws IOException {
String source = node.getJobPropsSource();
String propsSource = node.getPropsSource();
- Props parentProps = propsSource == null ? null : sharedProps.get(propsSource);
-
+ Props parentProps = propsSource == null ? null : sharedProps
+ .get(propsSource);
+
// We add the previous job output and put into this props.
if (previousOutput != null) {
Props earliestParent = previousOutput.getEarliestAncestor();
earliestParent.setParent(parentProps);
-
+
parentProps = earliestParent;
}
-
+
File propsFile = new File(basePath, source);
Props jobProps = new Props(parentProps, propsFile);
-
+
JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
jobRunner.addListener(listener);
-
+
return jobRunner;
}
-
+
private void loadAllProperties(ExecutableFlow flow) throws IOException {
// First load all the properties
- for (FlowProps fprops: flow.getFlowProps()) {
+ for (FlowProps fprops : flow.getFlowProps()) {
String source = fprops.getSource();
File propsFile = new File(basePath, source);
-
+
Props props = new Props(null, propsFile);
sharedProps.put(source, props);
}
// Resolve parents
- for (FlowProps fprops: flow.getFlowProps()) {
+ for (FlowProps fprops : flow.getFlowProps()) {
if (fprops.getInheritedSource() != null) {
String source = fprops.getSource();
String inherit = fprops.getInheritedSource();
-
+
Props props = sharedProps.get(source);
Props inherits = sharedProps.get(inherit);
-
+
props.setParent(inherits);
}
}
}
-
+
private void interrupt() {
currentThread.interrupt();
}
-
+
private void handleSucceededJob(ExecutableNode node) {
- for(String dependent: node.getOutNodes()) {
+ if (this.isCancelled()) {
+ return;
+ }
+
+ for (String dependent : node.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-
+
// Check all dependencies
boolean ready = true;
- for (String dependency: dependentNode.getInNodes()) {
- ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
- if (dependencyNode.getStatus() != Status.SUCCEEDED &&
- dependencyNode.getStatus() != Status.DISABLED) {
+ for (String dependency : dependentNode.getInNodes()) {
+ ExecutableNode dependencyNode = flow
+ .getExecutableNode(dependency);
+ if (dependencyNode.getStatus() != Status.SUCCEEDED
+ && dependencyNode.getStatus() != Status.DISABLED) {
ready = false;
break;
}
}
-
+
if (ready) {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
- for (String dependency: node.getInNodes()) {
+ for (String dependency : node.getInNodes()) {
Props output = outputProps.get(dependency);
if (output != null) {
output = Props.clone(output);
-
+
output.setParent(previousOutput);
previousOutput = output;
}
}
-
+
JobRunner runner = null;
try {
- runner = this.createJobRunner(dependentNode, previousOutput);
+ runner = this
+ .createJobRunner(dependentNode, previousOutput);
} catch (IOException e) {
- logger.error("JobRunner creation failed due to " + e.getMessage());
+ logger.error("JobRunner creation failed due to "
+ + e.getMessage());
dependentNode.setStatus(Status.FAILED);
handleFailedJob(dependentNode);
return;
}
-
+
runningJobs.put(dependentNode.getId(), runner);
if (paused) {
dependentNode.setStatus(Status.PAUSED);
pausedJobsToRun.add(runner);
- }
- else {
+ logger.info("Flow is paused so adding "
+ + dependentNode.getId() + " to paused list.");
+ } else {
+ logger.info("Flow is not paused so adding "
+ + dependentNode.getId() + " to paused list.");
jobsToRun.add(runner);
}
}
@@ -361,53 +399,56 @@ public class FlowRunner extends EventHandler implements Runnable {
runningJobs.remove(node.getId());
}
-
+
private void handleFailedJob(ExecutableNode node) {
System.err.println("Job " + node.getId() + " failed.");
this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
-
+
switch (failedOptions) {
- // We finish running current jobs and then fail. Do not accept new jobs.
- case FINISH_RUNNING_JOBS:
- runningJobs.clear();
- executorService.shutdown();
+ // We finish running current jobs and then fail. Do not accept new jobs.
+ case FINISH_RUNNING_JOBS:
+ runningJobs.clear();
+ executorService.shutdown();
break;
- // We kill all running jobs and fail immediately
- case KILL_ALL:
- this.cancel();
+ // We kill all running jobs and fail immediately
+ case KILL_ALL:
+ this.cancel("azkaban");
break;
}
-
+
runningJobs.remove(node.getId());
}
-
+
private class JobRunnerEventListener implements EventListener {
private FlowRunner flowRunner;
-
+
public JobRunnerEventListener(FlowRunner flowRunner) {
this.flowRunner = flowRunner;
}
@Override
public synchronized void handleEvent(Event event) {
- JobRunner runner = (JobRunner)event.getRunner();
+ JobRunner runner = (JobRunner) event.getRunner();
ExecutableNode node = runner.getNode();
String jobID = node.getId();
- System.out.println("Event " + jobID + " " + event.getType().toString());
+ System.out.println("Event " + jobID + " "
+ + event.getType().toString());
- // On Job success, we add the output props and then set up the next run.
+ // On Job success, we add the output props and then set up the next
+ // run.
if (event.getType() == Type.JOB_SUCCEEDED) {
- logger.info("Job Succeeded " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+ logger.info("Job Succeeded " + jobID + " in "
+ + (node.getEndTime() - node.getStartTime()) + " ms");
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
flowRunner.handleSucceededJob(runner.getNode());
- }
- else if (event.getType() == Type.JOB_FAILED) {
- logger.info("Job Failed " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+ } else if (event.getType() == Type.JOB_FAILED) {
+ logger.info("Job Failed " + jobID + " in "
+ + (node.getEndTime() - node.getStartTime()) + " ms");
logger.info(jobID + " FAILED");
flowRunner.handleFailedJob(runner.getNode());
}
-
+
flowRunner.commitFlow();
if (runningJobs.isEmpty()) {
System.out.println("There are no more running jobs.");
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 509f64a..d334083 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -56,24 +56,24 @@ public class FlowRunnerManager {
executorService.submit(runner);
}
- public void cancelFlow(String id) throws ExecutorManagerException {
+ public void cancelFlow(String id, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(id);
if (runner != null) {
- runner.cancel();
+ runner.cancel(user);
}
}
- public void pauseFlow(String id) throws ExecutorManagerException {
+ public void pauseFlow(String id, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(id);
if (runner != null) {
- runner.pause();
+ runner.pause(user);
}
}
- public void resumeFlow(String id) throws ExecutorManagerException {
+ public void resumeFlow(String id, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(id);
if (runner != null) {
- runner.resume();
+ runner.resume(user);
}
}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 64bea74..ca0ddb6 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -91,7 +91,7 @@ public class JobRunner extends EventHandler implements Runnable {
wait(5000);
}
catch (InterruptedException e) {
-
+ logger.info("Job cancelled.");
}
}
// Run Job
@@ -109,9 +109,11 @@ public class JobRunner extends EventHandler implements Runnable {
closeLogger();
}
- public void cancel() {
+ public synchronized void cancel() {
// Cancel code here
-
+ // will just interrupt, I guess, until the code is finished.
+ this.notifyAll();
+
node.setStatus(Status.KILLED);
}
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 3130c55..8ff1785 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -23,7 +23,7 @@ public class ExecutableFlowLoader {
* @throws ExecutorManagerException
*/
public static ExecutableFlow loadExecutableFlowFromDir(File exDir) throws ExecutorManagerException {
- File flowFile = getLatestExecutableFlowDir(exDir);
+ File flowFile = getLatestExecutableFlowDir(exDir, false);
Object exFlowObj = getFlowObjectFromFile(flowFile);
int updateNumber = getFlowUpdateNumber(flowFile);
@@ -77,7 +77,7 @@ public class ExecutableFlowLoader {
* @return
* @throws ExecutorManagerException
*/
- private static File getLatestExecutableFlowDir(File exDir) throws ExecutorManagerException {
+ private static File getLatestExecutableFlowDir(File exDir, boolean cleanOldUpdates) throws ExecutorManagerException {
String exFlowName = exDir.getName();
String flowFileName = "_" + exFlowName + ".flow";
@@ -88,6 +88,17 @@ public class ExecutableFlowLoader {
logger.error("Execution flow " + exFlowName + " missing flow file.");
throw new ExecutorManagerException("Execution flow " + exFlowName + " missing flow file.");
}
+
+ // Remove updates between first and last index.
+ if (cleanOldUpdates) {
+ if (exFlowFiles.length > 3) {
+ for (int i=1; i < exFlowFiles.length - 1; ++i) {
+ File file = exFlowFiles[i];
+ file.delete();
+ }
+ }
+ }
+
File lastExFlow = exFlowFiles[exFlowFiles.length-1];
return lastExFlow;
}
@@ -100,14 +111,14 @@ public class ExecutableFlowLoader {
* @return
* @throws ExecutorManagerException
*/
- public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow) throws ExecutorManagerException {
- File file = getLatestExecutableFlowDir(exDir);
- System.out.println("Loading from: " + file);
+ public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow, boolean cleanOldUpdates) throws ExecutorManagerException {
+ File file = getLatestExecutableFlowDir(exDir, cleanOldUpdates);
int number = getFlowUpdateNumber(file);
if (flow.getUpdateNumber() >= number) {
return false;
}
+ System.out.println("Loading from: " + file);
Object exFlowObj = getFlowObjectFromFile(file);
flow.updateExecutableFlowFromObject(exFlowObj);
flow.setUpdateNumber(number);
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index 54293d9..e850dd8 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -314,13 +314,19 @@ public class AzkabanExecutorServer {
handleAjaxFlowStatus(respMap, execid);
}
else if (action.equals("cancel")) {
-
+ String user = getParam(req, "user");
+ logger.info("Cancel called.");
+ handleAjaxCancel(respMap, execid, user);
}
else if (action.equals("pause")) {
-
+ String user = getParam(req, "user");
+ logger.info("Paused called.");
+ handleAjaxPause(respMap, execid, user);
}
else if (action.equals("resume")) {
-
+ String user = getParam(req, "user");
+ logger.info("Resume called.");
+ handleAjaxResume(respMap, execid, user);
}
}
@@ -350,9 +356,30 @@ public class AzkabanExecutorServer {
}
}
- private void handleAjaxPause(Map<String, Object> respMap, String execid) throws ServletException {
+ private void handleAjaxPause(Map<String, Object> respMap, String execid, String user) throws ServletException {
+
try {
- flowRunnerManager.submitFlow(execid, execpath);
+ flowRunnerManager.pauseFlow(execid, user);
+ respMap.put("status", "success");
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleAjaxResume(Map<String, Object> respMap, String execid, String user) throws ServletException {
+ try {
+ flowRunnerManager.resumeFlow(execid, user);
+ respMap.put("status", "success");
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ respMap.put("error", e.getMessage());
+ }
+ }
+
+ private void handleAjaxCancel(Map<String, Object> respMap, String execid, String user) throws ServletException {
+ try {
+ flowRunnerManager.cancelFlow(execid, user);
respMap.put("status", "success");
} catch (ExecutorManagerException e) {
e.printStackTrace();
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 16497b9..c8c5f5b 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -173,7 +173,12 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
if (project == null) {
return;
}
-
+
+ try {
+ executorManager.cancelFlow(exFlow, user.getUserId());
+ } catch (ExecutorManagerException e) {
+ ret.put("error", e.getMessage());
+ }
}
private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -182,6 +187,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
+
}
private void ajaxPauseFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -190,6 +196,11 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
+ try {
+ executorManager.pauseFlow(exFlow, user.getUserId());
+ } catch (ExecutorManagerException e) {
+ ret.put("error", e.getMessage());
+ }
}
private void ajaxResumeFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -198,6 +209,11 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
+ try {
+ executorManager.resumeFlow(exFlow, user.getUserId());
+ } catch (ExecutorManagerException e) {
+ ret.put("resume", e.getMessage());
+ }
}
private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 4492bca..5b31f73 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -2,6 +2,7 @@ package azkaban.webapp.servlet;
import java.io.IOException;
import java.io.Writer;
+import java.util.HashMap;
import java.util.UUID;
import javax.servlet.ServletException;
@@ -57,7 +58,14 @@ public abstract class LoginAbstractAzkabanServlet extends
logger.info("Found session " + session.getUser());
handleGet(req, resp, session);
} else {
- handleLogin(req, resp);
+ if (hasParam(req, "ajax")) {
+ HashMap<String, String> retVal = new HashMap<String, String>();
+ retVal.put("error", "session");
+ this.writeJSON(resp, retVal);
+ }
+ else {
+ handleLogin(req, resp);
+ }
}
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index a49c36f..e81170d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -10,6 +10,7 @@
<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
<script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.exflow.view.js"></script>
@@ -116,6 +117,21 @@
<tr><td class="first">Duration</td><td id="duration">-</td></tr>
</table>
</div>
+
+ <div id="messageDialog" class="modal">
+ <h3 id="messageTitle">Error</h3>
+ <div class="messageDiv">
+ <p id="messageBox"></p>
+ </div>
+ </div>
+
+ <div id="invalid-session" class="modal">
+ <h3>Invalid Session</h3>
+ <p>Session has expired. Please re-login.</p>
+ <div class="actions">
+ <a class="yes btn2" id="login-btn" href="#">Re-login</a>
+ </div>
+ </div>
</body>
</html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 9f0ceca..83e720c 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -10,6 +10,7 @@
<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
<script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
src/web/css/azkaban.css 20(+18 -2)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 3aeda0c..aea3d0b 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1250,6 +1250,10 @@ tr:hover td {
background-position: 0px 0px;
}
+#list ul li.KILLED .icon {
+ background-position: 0px 0px;
+}
+
#list ul li a {
font-size: 10pt;
margin-left: 5px;
@@ -1268,6 +1272,10 @@ tr:hover td {
background-position: 16px 0px;
}
+#messageDialog .messageDiv {
+ margin: 20px;
+}
+
table.parameters tr td.first {
font-weight: bold;
}
@@ -1373,6 +1381,10 @@ svg .FAILED circle {
fill: #CC0000;
}
+svg .KILLED circle {
+ fill: #CC0000;
+}
+
svg .SUCCEEDED circle {
fill: #00CC33;
}
@@ -1488,7 +1500,11 @@ span.sublabel {
}
#flow-status table td.FAILED {
- color: #CC0000;
+ color: #CC0000;
+}
+
+#flow-status table td.PAUSED {
+ color: #FF6600;
}
#flow-status table td.FAILED_FINISHING {
@@ -1586,7 +1602,7 @@ td .status.DISABLED {
}
td .status.KILLED {
- background-color: #000;
+ background-color: #CC0000;
}
td .status.UNKNOWN {
src/web/js/azkaban.ajax.utils.js 30(+30 -0)
diff --git a/src/web/js/azkaban.ajax.utils.js b/src/web/js/azkaban.ajax.utils.js
new file mode 100644
index 0000000..c087086
--- /dev/null
+++ b/src/web/js/azkaban.ajax.utils.js
@@ -0,0 +1,30 @@
+function ajaxCall(requestURL, data, callback) {
+ $.get(
+ requestURL,
+ data,
+ function(data) {
+ if (data.error == "session") {
+ // We need to relogin.
+ var errorDialog = document.getElementById("invalid-session");
+ if (errorDialog) {
+ $(errorDialog).modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onClose: function (dialog) {
+ window.location.reload();
+ }
+ });
+ }
+ }
+ else {
+ callback.call(this,data);
+ }
+ },
+ "json"
+ );
+}
src/web/js/azkaban.exflow.view.js 84(+69 -15)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 92bcf19..9c55b1e 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -182,19 +182,75 @@ azkaban.FlowTabView= Backbone.View.extend({
}
},
handleCancelClick : function(evt) {
-
+ var requestURL = contextURL + "/executor";
+ ajaxCall(
+ requestURL,
+ {"execid": execId, "ajax":"cancelFlow"},
+ function(data) {
+ console.log("cancel clicked");
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ showDialog("Cancelled", "Flow has been cancelled.");
+ }
+ }
+ );
},
handleRestartClick : function(evt) {
-
},
handlePauseClick : function(evt) {
-
+ var requestURL = contextURL + "/executor";
+ ajaxCall(
+ requestURL,
+ {"execid": execId, "ajax":"pauseFlow"},
+ function(data) {
+ console.log("pause clicked");
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ showDialog("Paused", "Flow has been paused.");
+ }
+ }
+ );
},
handleResumeClick : function(evt) {
-
+ var requestURL = contextURL + "/executor";
+ ajaxCall(
+ requestURL,
+ {"execid": execId, "ajax":"resumeFlow"},
+ function(data) {
+ console.log("pause clicked");
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ showDialog("Resumed", "Flow has been resumed.");
+ }
+ }
+ );
}
});
+var showDialog = function(title, message) {
+ $('#messageTitle').text(title);
+
+ $('#messageBox').text(message);
+
+ $('#messageDialog').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ }
+ });
+}
+
var jobListView;
azkaban.JobListView = Backbone.View.extend({
events: {
@@ -765,10 +821,10 @@ var updaterFunction = function() {
var requestURL = contextURL + "/executor";
var oldData = graphModel.get("data");
var nodeMap = graphModel.get("nodeMap");
- var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED";
-
+ var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED" && oldData.status != "KILLED";
+
if (keepRunning) {
- $.get(
+ ajaxCall(
requestURL,
{"execid": execId, "ajax":"fetchexecflowupdate", "lastUpdateTime": updateTime},
function(data) {
@@ -792,14 +848,13 @@ var updaterFunction = function() {
}
graphModel.set({"update": data});
- },
- "json"
- );
+ }
+ );
var data = graphModel.get("data");
if (data.status != "SUCCEEDED" && data.status != "FAILED" ) {
// 10 sec updates
- setTimeout(function() {updaterFunction();}, 10000);
+ setTimeout(function() {updaterFunction();}, 5000);
}
else {
console.log("Flow finished, so no more updates");
@@ -821,7 +876,7 @@ $(function() {
executionListView = new azkaban.ExecutionListView({el: $('#jobListView'), model:graphModel});
var requestURL = contextURL + "/executor";
- $.get(
+ ajaxCall(
requestURL,
{"execid": execId, "ajax":"fetchexecflow"},
function(data) {
@@ -850,8 +905,7 @@ $(function() {
}
}
- setTimeout(function() {updaterFunction()}, 5000);
- },
- "json"
+ setTimeout(function() {updaterFunction()}, 2500);
+ }
);
});