azkaban-developers

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index d358b02..dcf4de2 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -35,7 +35,7 @@ public class ExecutableFlow {
 	private boolean submitted = false;
 	
 	public enum Status {
-		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN
+		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, PAUSED, UNKNOWN
 	}
 	
 	public ExecutableFlow(String id, Flow flow) {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index bcbb360..b7a47f6 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -34,7 +34,6 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
-import azkaban.project.Project;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
@@ -343,26 +342,45 @@ public class ExecutorManager {
 		runningFlows.put(flow.getExecutionId(), flow);
 	}
 	
-	public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
-		String executionPath = flow.getExecutionPath();
-		File executionDir = new File(executionPath);
-		flow.setSubmitTime(System.currentTimeMillis());
-		
-		writeResourceFile(executionDir, flow);
-		ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
-		addActiveExecutionReference(flow);
-		flow.setLastCheckedTime(System.currentTimeMillis());
-		runningFlows.put(flow.getExecutionId(), flow);
-		
-		logger.info("Setting up " + flow.getExecutionId() + " for execution.");
-		
+	public void cancelFlow(ExecutableFlow flow) throws ExecutorManagerException {
+		String response = null;
+		try {
+			response = callExecutionServer("cancel", flow);
+		} catch (IOException e) {
+			e.printStackTrace();
+			throw new ExecutorManagerException("Error cancelling flow.", e);
+		}
+	}
+	
+	public void pauseFlow(ExecutableFlow flow) throws ExecutorManagerException {
+		String response = null;
+		try {
+			response = callExecutionServer("pause", flow);
+		} catch (IOException e) {
+			e.printStackTrace();
+			throw new ExecutorManagerException("Error cancelling flow.", e);
+		}
+	}
+	
+	public void resumeFlow(ExecutableFlow flow) throws ExecutorManagerException {
+		String response = null;
+		try {
+			response = callExecutionServer("resume", flow);
+		} catch (IOException e) {
+			e.printStackTrace();
+			throw new ExecutorManagerException("Error cancelling flow.", e);
+		}
+
+	}
+	
+	private String callExecutionServer(String action, ExecutableFlow flow) throws IOException{
 		URIBuilder builder = new URIBuilder();
 		builder.setScheme("http")
 			.setHost(url)
 			.setPort(portNumber)
 			.setPath("/executor")
 			.setParameter("sharedToken", token)
-			.setParameter("action", "execute")
+			.setParameter("action", "resume")
 			.setParameter("execid", flow.getExecutionId())
 			.setParameter("execpath", flow.getExecutionPath());
 		
@@ -370,11 +388,9 @@ public class ExecutorManager {
 		try {
 			uri = builder.build();
 		} catch (URISyntaxException e) {
-			e.printStackTrace();
-			flow.setStatus(ExecutableFlow.Status.FAILED);
-			return;
+			throw new IOException(e);
 		}
-
+		
 		ResponseHandler<String> responseHandler = new BasicResponseHandler();
 		
 		logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
@@ -386,15 +402,40 @@ public class ExecutorManager {
 		} catch (IOException e) {
 			flow.setStatus(ExecutableFlow.Status.FAILED);
 			e.printStackTrace();
-			return;
+			return response;
 		}
 		finally {
 			httpclient.getConnectionManager().shutdown();
 		}
 		
+		return response;
+	}
+	
+	public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
+		String executionPath = flow.getExecutionPath();
+		File executionDir = new File(executionPath);
+		flow.setSubmitTime(System.currentTimeMillis());
+		
+		writeResourceFile(executionDir, flow);
+		ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
+		addActiveExecutionReference(flow);
+		flow.setLastCheckedTime(System.currentTimeMillis());
+		runningFlows.put(flow.getExecutionId(), flow);
+		
+		logger.info("Setting up " + flow.getExecutionId() + " for execution.");
+		
+		String response;
+		try {
+			response = callExecutionServer("execute", flow);
+		} catch (IOException e) {
+			e.printStackTrace();
+			flow.setStatus(ExecutableFlow.Status.FAILED);
+			return;
+		}
+		
+		logger.debug("Submitted Response: " + response);
 		flow.setLastCheckedTime(System.currentTimeMillis());
 		flow.setSubmitted(true);
-		logger.debug("Submitted Response: " + response);
 	}
 	
 	public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
@@ -543,41 +584,6 @@ public class ExecutorManager {
 			}
 		}
 	}
-
-	private String getFlowStatusInExecutor(ExecutableFlow flow) throws IOException {
-		URIBuilder builder = new URIBuilder();
-		builder.setScheme("http")
-			.setHost(url)
-			.setPort(portNumber)
-			.setPath("/executor")
-			.setParameter("sharedToken", token)
-			.setParameter("action", "status")
-			.setParameter("execid", flow.getExecutionId());
-		
-		URI uri = null;
-		try {
-			uri = builder.build();
-		} catch (URISyntaxException e) {
-			e.printStackTrace();
-			throw new IOException("Bad URI", e);
-		}
-		
-		HttpClient httpclient = new DefaultHttpClient();
-		HttpGet httpget = new HttpGet(uri);
-		String response = null;
-		ResponseHandler<String> responseHandler = new BasicResponseHandler();
-		
-		try {
-			response = httpclient.execute(httpget, responseHandler);
-		} catch (IOException e) {
-			e.printStackTrace();
-			throw new IOException("Connection problem", e);
-		}
-		finally {
-			httpclient.getConnectionManager().shutdown();
-		}
-		return response;
-	}
 	
 	private String getExecutableReferencePartition(String execID) {
 		// We're partitioning the archive by the first part of the id, which should be a timestamp.
@@ -672,7 +678,7 @@ public class ExecutorManager {
 					// Query the executor service to see if the item is running.
 					String responseString = null;
 					try {
-						responseString = getFlowStatusInExecutor(exFlow);
+						responseString = callExecutionServer("status", exFlow);
 					} catch (IOException e) {
 						e.printStackTrace();
 						// Connection issue. Backoff 1 sec.
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 0ffb5a1..f450b80 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -2,7 +2,10 @@ package azkaban.executor;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,8 +40,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private ExecutableFlow flow;
 	private ExecutorService executorService;
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
+	private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
 	private int numThreads = NUM_CONCURRENT_THREADS;
 	private boolean cancelled = true;
+	private boolean paused = true;
 
 	private Map<String, JobRunner> runningJobs;
 	private JobRunnerEventListener listener;
@@ -114,6 +119,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 		flow.setStatus(Status.KILLED);
 	}
 	
+	public synchronized void pause() {
+		logger.info("Pause flow");
+		if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.WAITING) {
+			paused = true;
+			flow.setStatus(Status.PAUSED);
+		}
+	}
+	
+	public synchronized void resume() {
+		logger.info("Resume flow");
+		if (flow.getStatus() == Status.PAUSED) {
+			flow.setStatus(Status.RUNNING);
+			jobsToRun.addAll(pausedJobsToRun);
+		}
+	}
+	
 	public boolean isCancelled() {
 		return cancelled;
 	}
@@ -328,7 +349,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 				}
 			
 				runningJobs.put(dependentNode.getId(), runner);
-				jobsToRun.add(runner);
+				if (paused) {
+					dependentNode.setStatus(Status.PAUSED);
+					pausedJobsToRun.add(runner);
+				}
+				else {
+					jobsToRun.add(runner);
+				}
 			}
 		}
 
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 8a353e3..509f64a 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -63,6 +63,20 @@ public class FlowRunnerManager {
 		}
 	}
 	
+	public void pauseFlow(String id) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(id);
+		if (runner != null) {
+			runner.pause();
+		}
+	}
+	
+	public void resumeFlow(String id) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(id);
+		if (runner != null) {
+			runner.resume();
+		}
+	}
+	
 	public FlowRunner getFlowRunner(String id) {
 		return runningFlows.get(id);
 	}
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index d84ea11..54293d9 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
 
 import javax.servlet.ServletConfig;
@@ -306,26 +307,20 @@ public class AzkabanExecutorServer {
 				
 				// Handle execute
 				if (action.equals("execute")) {
-					String execpath = getParam(req, "execpath");
-					
-					logger.info("Submitted " + execid + " with " + execpath);
-					try {
-						flowRunnerManager.submitFlow(execid, execpath);
-						respMap.put("status", "success");
-					} catch (ExecutorManagerException e) {
-						e.printStackTrace();
-						respMap.put("error", e.getMessage());
-					}
+					handleAjaxExecute(req, respMap, execid);
 				}
 				// Handle Status
 				else if (action.equals("status")) {
-					ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
-					if (flow == null) {
-						respMap.put("status", "notfound");
-					}
-					else {
-						respMap.put("status", flow.getStatus().toString());
-					}
+					handleAjaxFlowStatus(respMap, execid);
+				}
+				else if (action.equals("cancel")) {
+					
+				}
+				else if (action.equals("pause")) {
+					
+				}
+				else if (action.equals("resume")) {
+					
 				}
 			}
 
@@ -333,6 +328,38 @@ public class AzkabanExecutorServer {
 			resp.flushBuffer();
 		}
 		
+		private void handleAjaxExecute(HttpServletRequest req, Map<String, Object> respMap, String execid) throws ServletException {
+			String execpath = getParam(req, "execpath");
+			logger.info("Submitted " + execid + " with " + execpath);
+			try {
+				flowRunnerManager.submitFlow(execid, execpath);
+				respMap.put("status", "success");
+			} catch (ExecutorManagerException e) {
+				e.printStackTrace();
+				respMap.put("error", e.getMessage());
+			}
+		}
+		
+		private void handleAjaxFlowStatus(Map<String, Object> respMap, String execid) {
+			ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
+			if (flow == null) {
+				respMap.put("status", "notfound");
+			}
+			else {
+				respMap.put("status", flow.getStatus().toString());
+			}
+		}
+		
+		private void handleAjaxPause(Map<String, Object> respMap, String execid) throws ServletException {
+			try {
+				flowRunnerManager.submitFlow(execid, execpath);
+				respMap.put("status", "success");
+			} catch (ExecutorManagerException e) {
+				e.printStackTrace();
+				respMap.put("error", e.getMessage());
+			}
+		}
+		
 		@Override
 		public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 			
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 9cbd74f..16497b9 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -124,11 +124,37 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 		String ajaxName = getParam(req, "ajax");
 		
 		if (hasParam(req, "execid")) {
-			if (ajaxName.equals("fetchexecflow")) {
-				ajaxFetchExecutableFlow(req, resp, ret, session.getUser());
+			String execid = getParam(req, "execid");
+			ExecutableFlow exFlow = null;
+
+			try {
+				exFlow = executorManager.getExecutableFlow(execid);
+			} catch (ExecutorManagerException e) {
+				ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
 			}
-			else if (ajaxName.equals("fetchexecflowupdate")) {
-				ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser());
+
+			if (exFlow == null) {
+				ret.put("error", "Cannot find execution '" + execid + "'");
+			}
+			else {
+				if (ajaxName.equals("fetchexecflow")) {
+					ajaxFetchExecutableFlow(req, resp, ret, session.getUser(), exFlow);
+				}
+				else if (ajaxName.equals("fetchexecflowupdate")) {
+					ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser(), exFlow);
+				}
+				else if (ajaxName.equals("cancelFlow")) {
+					ajaxCancelFlow(req, resp, ret, session.getUser(), exFlow);
+				}
+				else if (ajaxName.equals("restartFlow")) {
+					ajaxRestartFlow(req, resp, ret, session.getUser(), exFlow);
+				}
+				else if (ajaxName.equals("pauseFlow")) {
+					ajaxPauseFlow(req, resp, ret, session.getUser(), exFlow);
+				}
+				else if (ajaxName.equals("resumeFlow")) {
+					ajaxResumeFlow(req, resp, ret, session.getUser(), exFlow);
+				}
 			}
 		}
 		else {
@@ -142,22 +168,42 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 		this.writeJSON(resp, ret);
 	}
 
-	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException{
-		String execid = getParam(req, "execid");
-		Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
-		
-		System.out.println("Fetching " + execid);
-		
-		ExecutableFlow exFlow = null;
-		try {
-			exFlow = executorManager.getExecutableFlow(execid);
-		} catch (ExecutorManagerException e) {
-			ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
+	private void ajaxCancelFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+		if (project == null) {
+			return;
 		}
-		if (exFlow == null) {
-			ret.put("error", "Cannot find execution '" + execid + "'");
+
+	}
+
+	private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+		if (project == null) {
 			return;
 		}
+
+	}
+
+	private void ajaxPauseFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+		if (project == null) {
+			return;
+		}
+
+	}
+
+	private void ajaxResumeFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+		if (project == null) {
+			return;
+		}
+
+	}
+	
+	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+		Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
+		
+		System.out.println("Fetching " + exFlow.getExecutionId());
 		
 		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
@@ -187,20 +233,9 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("submitTime", exFlow.getSubmitTime());
 	}
 	
-	private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
-		String execid = getParam(req, "execid");
-		System.out.println("Fetching " + execid);
-		ExecutableFlow exFlow = null;
-		try {
-			exFlow = executorManager.getExecutableFlow(execid);
-		} catch (ExecutorManagerException e) {
-			ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
-		}
-		if (exFlow == null) {
-			ret.put("error", "Cannot find execution '" + execid + "'");
-			return;
-		}
-		
+	private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+		System.out.println("Fetching " + exFlow.getExecutionId());
+
 		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5ede56c..a49c36f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -57,6 +57,12 @@
 							<li class="lidivider">|</li>
 							<li><a id="jobslistViewLink" href="#jobslist">Job List</a></li>
 						</ul>
+						<ul id="actionsBtns" class="buttons">
+							<li><div id="pausebtn" class="btn2">Pause</div></li>
+							<li><div id="resumebtn" class="btn2">Resume</div></li>
+							<li><div id="cancelbtn" class="btn6">Cancel</div></li>
+							<li><div id="restartbtn" class="btn1">Restart</div></li>
+						</ul>
 					</div>
 					<div id="graphView">
 						<div class="relative">
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 13163f8..3aeda0c 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -327,6 +327,7 @@ tr:hover td {
 .btn3,
 .btn4,
 .btn5,
+.btn6,
 .btn-disabled {
   -moz-border-radius: 3px;
   -webkit-border-radius: 3px;
@@ -373,7 +374,6 @@ tr:hover td {
   margin-right: 25px;
 }
 
-
 /* blue */
 .btn2 {
   background: -moz-linear-gradient(center top , #73AEC9 0pt, #73AEC9 1px, #338AB0 1px, #0571A6 100%) repeat scroll 0 0 transparent;
@@ -432,6 +432,29 @@ tr:hover td {
   color: #EEE;
 }
 
+/* red */
+.btn6 {
+  background: #cc9999;
+  background: -moz-linear-gradient(top, #cc6161 0, #cc6161 1px, #cc0000 1px, #931100 100%);
+  background: -o-linear-gradient(top, #cc6161 0, #cc6161 1px, #cc0000 1px, #931100 100%);
+  background: -webkit-gradient(linear, left top, left bottom, color-stop(0,#cc6161), color-stop(5%,#cc6161), color-stop(5%,#cc0000), color-stop(100%,#931100));
+  background: linear-gradient(top, #cc6161 0, #cc6161 1px, #cc0000 1px, #931100 100%);
+  filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#cc0000', endColorstr='#931100',GradientType=0 );
+  border-color: #931100;
+  color: #fff;
+}
+
+.btn6:hover {
+  background: #cc9999;
+  background: -moz-linear-gradient(top, #cc6161 0, #cc6161 1px, #CC4343 1px, #912C21 100%);
+  background: -o-linear-gradient(top, #cc6161 0, #cc6161 1px, #CC4343 1px, #912C21 100%);
+  background: -webkit-gradient(linear, left top, left bottom, color-stop(0,#cc6161), color-stop(5%,#cc6161), color-stop(5%,#CC4343), color-stop(100%,#912C21));
+  background: linear-gradient(top, #cc6161 0, #cc6161 1px, #CC4343 1px, #931100 100%);
+  filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#CC4343', endColorstr='#912C21',GradientType=0 );
+  border-color: #912C21;
+  color: #fff;
+}
+
 /* confirm modal */
 .modal {
   display: none;
@@ -1063,8 +1086,18 @@ tr:hover td {
 
 .headertabs ul li {
 	float: left;
-	margin-left: 12px;
 	margin-top: 5px;
+	margin-left: 12px;
+}
+
+.headertabs ul.buttons {
+	float: right;
+	margin-right: 10px;
+}
+
+.headertabs ul.buttons li {
+	margin-top: 1px;
+	margin-left: 2px;
 }
 
 .headertabs ul li.lidivider {
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index c639026..92bcf19 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -1,6 +1,6 @@
 $.namespace('azkaban');
 
-var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN"];
+var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN", "PAUSED"];
 var statusStringMap = {
 	"FAILED": "Failed",
 	"SUCCEEDED": "Success",
@@ -10,7 +10,8 @@ var statusStringMap = {
 	"KILLED": "Killed",
 	"DISABLED": "Disabled",
 	"READY": "Ready",
-	"UNKNOWN": "Unknown"
+	"UNKNOWN": "Unknown",
+	"PAUSED": "Paused"
 };
 
 var handleJobMenuClick = function(action, el, pos) {
@@ -109,9 +110,21 @@ var flowTabView;
 azkaban.FlowTabView= Backbone.View.extend({
   events : {
   	"click #graphViewLink" : "handleGraphLinkClick",
-  	"click #jobslistViewLink" : "handleJobslistLinkClick"
+  	"click #jobslistViewLink" : "handleJobslistLinkClick",
+  	"click #cancelbtn" : "handleCancelClick",
+  	"click #restartbtn" : "handleRestartClick",
+  	"click #pausebtn" : "handlePauseClick",
+  	"click #resumebtn" : "handleResumeClick",
   },
   initialize : function(settings) {
+  	$("#cancelbtn").hide();
+  	$("#restartbtn").hide();
+  	$("#pausebtn").hide();
+  	$("#resumebtn").hide();
+  
+ 	this.model.bind('change:graph', this.handleFlowStatusChange, this);
+	this.model.bind('change:update', this.handleFlowStatusChange, this);
+	
   	var selectedView = settings.selectedView;
   	if (selectedView == "jobslist") {
   		this.handleJobslistLinkClick();
@@ -119,7 +132,6 @@ azkaban.FlowTabView= Backbone.View.extend({
   	else {
   		this.handleGraphLinkClick();
   	}
-
   },
   render: function() {
   	console.log("render graph");
@@ -137,6 +149,49 @@ azkaban.FlowTabView= Backbone.View.extend({
   	
   	$("#graphView").hide();
   	$("#jobListView").show();
+  },
+  handleFlowStatusChange: function() {
+  	var data = this.model.get("data");
+  	$("#cancelbtn").hide();
+  	$("#restartbtn").hide();
+  	$("#pausebtn").hide();
+  	$("#resumebtn").hide();
+
+  	if(data.status=="SUCCEEDED") {
+  	  	$("#restartbtn").show();
+  	}
+  	else if (data.status=="FAILED") {
+  		$("#restartbtn").show();
+  	}
+  	else if (data.status=="FAILED_FINISHING") {
+  		$("#cancelbtn").show();
+  	}
+  	else if (data.status=="RUNNING") {
+  		$("#cancelbtn").show();
+  		$("#pausebtn").show();
+  	}
+  	else if (data.status=="PAUSED") {
+  		$("#cancelbtn").show();
+  		$("#resumebtn").show();
+  	}
+  	else if (data.status=="WAITING") {
+  		$("#cancelbtn").show();
+  	}
+  	else if (data.status=="KILLED") {
+  		$("#restartbtn").show();
+  	}
+  },
+  handleCancelClick : function(evt) {
+  	
+  },
+  handleRestartClick : function(evt) {
+  	
+  },
+  handlePauseClick : function(evt) {
+  	
+  },
+  handleResumeClick : function(evt) {
+  	
   }
 });
 
@@ -758,9 +813,8 @@ var updaterFunction = function() {
 $(function() {
 	var selected;
 
-	flowTabView = new azkaban.FlowTabView({el:$( '#headertabs') });
-
 	graphModel = new azkaban.GraphModel();
+	flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), model: graphModel});
 	svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel});
 	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel});
 	statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});