azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 9682f16..d912e8a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -33,6 +33,8 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -45,6 +47,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 	private ExecutorService executorService;
 	private ExecutorLoader executorLoader;
+	private ProjectLoader projectLoader;
 
 	private ExecutableFlow flow;
 	private Thread currentThread;
@@ -77,10 +80,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowFinished = false;
 	private boolean flowCancelled = false;
 	
-	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
+		this.projectLoader = projectLoader;
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
@@ -313,13 +317,27 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// Load job file.
 		File path = new File(execDir, source);
 		Props prop = null;
+		
+		// load the override props if any
 		try {
-			prop = new Props(null, path);
-			prop.setParent(parentProps);
-		} catch (IOException e) {
+			prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getJobId()+".jor");
+		}
+		catch(ProjectManagerException e) {
 			e.printStackTrace();
-			logger.error("Error loading job file " + source + " for job " + node.getJobId());
+			logger.error("Error loading job override property for job " + node.getJobId());
+		}
+		if(prop == null) {
+			// if no override prop, load the original one on disk
+			try {
+				prop = new Props(null, path);				
+			} catch (IOException e) {
+				e.printStackTrace();
+				logger.error("Error loading job file " + source + " for job " + node.getJobId());
+			}
 		}
+		// setting this fake source as this will be used to determine the location of log files.
+		prop.setSource(path.getPath());
+		prop.setParent(parentProps);
 		
 		// should have one prop with system secrets, the other user level props
 		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 5054c48..750503e 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -243,7 +243,7 @@ public class FlowRunnerManager implements EventListener {
 		setupFlow(flow);
 		
 		// Setup flow runner
-		FlowRunner runner = new FlowRunner(flow, executorLoader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(this);
 		
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 90f3363..2d9c475 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -749,6 +749,43 @@ public class JdbcProjectLoader implements ProjectLoader {
 		}
 	}
 	
+	@Override
+	public void updateProjectProperty(Project project, Props props) throws ProjectManagerException {
+		Connection connection = getConnection();
+		try {
+			updateProjectProperty(connection, project, props.getSource(), props);
+			connection.commit();
+		}
+		catch (SQLException e) {
+			throw new ProjectManagerException("Error uploading project property files", e);
+		} 
+		catch (IOException e) {
+			throw new ProjectManagerException("Error uploading project property file", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+	
+	private void updateProjectProperty(Connection connection, Project project, String name, Props props) throws ProjectManagerException, IOException {
+		QueryRunner runner = new QueryRunner();
+		final String UPDATE_PROPERTIES = "UPDATE project_properties SET property=? WHERE project_id=? AND version=? AND name=?";
+		
+		String propertyJSON = PropsUtils.toJSONString(props, true);
+		byte[] data = propertyJSON.getBytes("UTF-8");
+		logger.info("UTF-8 size:" + data.length);
+		if (defaultEncodingType == EncodingType.GZIP) {
+			data = GZIPUtils.gzipBytes(data);
+		}
+
+		try {
+			runner.update(connection, UPDATE_PROPERTIES, data, project.getId(), project.getVersion(), name);
+			connection.commit();
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Error updating property " + project.getName() + " version " + project.getVersion(), e);
+		}
+	}
+	
 	private void uploadProjectProperty(Connection connection, Project project, String name, Props props) throws ProjectManagerException, IOException {
 		QueryRunner runner = new QueryRunner();
 		final String INSERT_PROPERTIES = "INSERT INTO project_properties (project_id, version, name, modified_time, encoding_type, property) values (?,?,?,?,?,?)";
@@ -769,6 +806,25 @@ public class JdbcProjectLoader implements ProjectLoader {
 	}
 
 	@Override
+	public Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException {
+		QueryRunner runner = new QueryRunner(dataSource);
+		
+		ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
+		try {
+			List<Pair<String, Props>> properties = 
+					runner.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler,  projectId, projectVer, propsName);
+
+			if (properties == null || properties.isEmpty()) {
+				return null;
+			}
+			
+			return properties.get(0).getSecond();
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Error fetching property " + propsName, e);
+		}
+	}
+	
+	@Override
 	public Props fetchProjectProperty(Project project, String propsName) throws ProjectManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
 		
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index bd4c6d9..4711dc0 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -214,5 +214,9 @@ public interface ProjectLoader {
 	 * @throws ProjectManagerException
 	 */
 	public void cleanOlderProjectVersion(int projectId, int version) throws ProjectManagerException;
+
+	public void updateProjectProperty(Project project, Props props) throws ProjectManagerException;
+
+	Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException;
 	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectLogEvent.java b/src/java/azkaban/project/ProjectLogEvent.java
index 1d109bb..c6e8577 100644
--- a/src/java/azkaban/project/ProjectLogEvent.java
+++ b/src/java/azkaban/project/ProjectLogEvent.java
@@ -7,7 +7,7 @@ public class ProjectLogEvent {
 	 * Only represent from 0 to 255 different codes.
 	 */
 	public static enum EventType {
-		ERROR(128), CREATED(1), DELETED(2), USER_PERMISSION(3), GROUP_PERMISSION(4), DESCRIPTION(5), UPLOADED(6), SCHEDULE(7);
+		ERROR(128), CREATED(1), DELETED(2), USER_PERMISSION(3), GROUP_PERMISSION(4), DESCRIPTION(5), UPLOADED(6), SCHEDULE(7), SLA(8);
 
 		private int numVal;
 
@@ -35,6 +35,8 @@ public class ProjectLogEvent {
 				return UPLOADED;
 			case 7:
 				return SCHEDULE;
+			case 8:
+				return SLA;
 			case 128:
 				return ERROR;
 			default:
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index f1ce489..e526474 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -200,6 +200,22 @@ public class ProjectManager {
 	public Props getProperties(Project project, String source) throws ProjectManagerException {
 		return projectLoader.fetchProjectProperty(project, source);
 	}
+	
+	public Props getJobOverrideProperty(Project project, String jobName) throws ProjectManagerException {
+		return projectLoader.fetchProjectProperty(project, jobName+".jor");
+	}
+	
+	public void setJobOverrideProperty(Project project, Props prop, String jobName) throws ProjectManagerException {
+		prop.setSource(jobName+".jor");
+		Props oldProps = projectLoader.fetchProjectProperty(project, prop.getSource());
+		if(oldProps == null) {
+			projectLoader.uploadProjectProperty(project, prop);
+		}
+		else {
+			projectLoader.updateProjectProperty(project, prop);
+		}
+		return;
+	}
 
 	public void updateProjectPermission(Project project, String name, Permission perm, boolean group, User modifier) throws ProjectManagerException {
 		logger.info("User " + modifier.getUserId() + " updating permissions for project " + project.getName() + " for " + name + " " + perm.toString());
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 1c80c58..b3382d3 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -307,6 +307,11 @@ public class Schedule{
 		return projectName + "." + flowName + " (" + projectId + ")";
 	}
 	
+	public String toString() {
+		return projectName + "." + flowName + " (" + projectId + ")" + " to be run at (starting) " + 
+				new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
+	}
+	
 	public Pair<Integer, String> getScheduleId() {
 		return new Pair<Integer, String>(getProjectId(), getFlowName());
 	}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ad67def..e908b4f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -406,7 +406,7 @@ public class ScheduleManager {
 										try {
 											executorManager.submitExecutableFlow(exflow);
 											logger.info("Scheduler has invoked " + exflow.getExecutionId());
-										} catch (ExecutorManagerException e) {	
+										} catch (Exception e) {	
 											logger.error("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
 											logger.error(e.getMessage());
 											return;
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index ae17595..61815e3 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -223,10 +223,17 @@ public class SLAManager {
 									List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
 									for(SlaSetting set : jobSettings) {
 										ExecutableNode node = exflow.getExecutableNode(set.getId());
-										if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
-											submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+										if(node != null) {
+											if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
+												submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+												removeSettings.add(set);
+												logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+											}
+										}
+										else {
+											mailer.sendSlaEmail(s, "The SLA setting for flow/job is no longer valid as flow structure has changed. Execution " + s.getExecId());
 											removeSettings.add(set);
-											logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+											
 										}
 									}
 									for(SlaSetting remove : removeSettings) {
@@ -246,7 +253,10 @@ public class SLAManager {
 								}
 								else {
 									if(!metSla(runningSLA, exflow)) {
-										takeSLAActions(runningSLA, exflow);
+										takeSLAFailActions(runningSLA, exflow);
+									}
+									else {
+										takeSLASuccessActions(runningSLA, exflow);
 									}
 
 
@@ -319,7 +329,7 @@ public class SLAManager {
 		}
 	}
 
-	private void takeSLAActions(SLA s, ExecutableFlow exflow) {
+	private void takeSLAFailActions(SLA s, ExecutableFlow exflow) {
 		logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
 		List<SlaAction> actions = s.getActions();
 		for(SlaAction act : actions) {
@@ -333,7 +343,7 @@ public class SLAManager {
 			} else if(act.equals(SlaAction.KILL)) {
 				try {
 					executorManager.cancelFlow(exflow, "azkaban");
-					//sendSlaKillEmail(s, exflow);
+					sendSlaKillEmail(s, exflow);
 				} catch (ExecutorManagerException e) {
 					// TODO Auto-generated catch block
 					logger.error("Cancel flow failed." + e.getCause());
@@ -342,6 +352,11 @@ public class SLAManager {
 		}
 	}
 	
+	private void takeSLASuccessActions(SLA s, ExecutableFlow exflow) {
+		sendSlaSuccessEmail(s, exflow);
+		
+	}
+	
 	private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
 		String message = null;
 		ExecutableNode exnode;
@@ -380,6 +395,83 @@ public class SLAManager {
 		mailer.sendSlaEmail(s, message);
 	}
 	
+	private void sendSlaSuccessEmail(SLA s, ExecutableFlow exflow) {
+		String message = null;
+		ExecutableNode exnode;
+		switch(s.getRule()) {
+			case FINISH:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " finished within the set SLA." + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " finished within the set SLA." + String.format("%n");
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			case SUCCESS:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " successfully finished within the set SLA." + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " successfully finished within the set SLA." + String.format("%n"); 
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			default:
+				logger.error("Unknown SLA rules!");
+				message = "Unknown SLA was not met!";
+				break;
+		}
+		mailer.sendSlaEmail(s, message);
+	}
+	
+	private void sendSlaKillEmail(SLA s, ExecutableFlow exflow) {
+		String message = null;
+		ExecutableNode exnode;
+		switch(s.getRule()) {
+			case FINISH:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA and is killed. " + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+						message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA and is killed. " + String.format("%n");
+						message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+						message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			case SUCCESS:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA and is killed. " + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA and is killed. " + String.format("%n"); 
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			default:
+				logger.error("Unknown SLA rules!");
+				message = "Unknown SLA was not met!";
+				break;
+		}
+		mailer.sendSlaEmail(s, message);
+	}
+	
+	
 	public class SLAPreRunner extends Thread {
 		private final List<SLA> preSlas;
 		private AtomicBoolean stillAlive = new AtomicBoolean(true);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 4002e4a..2f1403b 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -217,6 +217,16 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 				ajaxFetchFlowExecutions(project, ret, req);
 			}
 		}
+		else if (ajaxName.equals("fetchJobInfo")) {
+			if (handleAjaxPermission(project, user, Type.READ, ret)) {
+				ajaxFetchJobInfo(project, ret, req);
+			}
+		}
+		else if (ajaxName.equals("setJobOverrideProperty")) {
+			if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
+				ajaxSetJobOverrideProperty(project, ret, req);
+			}
+		}
 		else {
 			ret.put("error", "Cannot execute command " + ajaxName);
 		}
@@ -375,6 +385,86 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
+	private void ajaxFetchJobInfo(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+
+		String flowName = getParam(req, "flowName");
+		String jobName = getParam(req, "jobName");
+		
+		Flow flow = project.getFlow(flowName);
+		if(flow == null) {
+			ret.put("error", "Flow " + flowName + " not found in project " + project.getName());
+			return;
+		}
+		
+		Node node = flow.getNode(jobName);
+		if(node == null) {
+			ret.put("error", "Job " + jobName + " not found in flow " + flowName);
+			return;
+		}
+		
+		Props prop;
+		try {
+			prop = projectManager.getProperties(project, node.getJobSource());
+		} catch (ProjectManagerException e) {
+			ret.put("error", "Failed to retrieve job properties!");
+			return;
+		}
+		
+		Props overrideProp;
+		try {
+			overrideProp = projectManager.getJobOverrideProperty(project, jobName);
+		} catch (ProjectManagerException e) {
+			ret.put("error", "Failed to retrieve job override properties!");
+			return;
+		}
+		
+		ret.put("jobName", node.getId());
+		ret.put("jobType", prop.get("type"));
+		
+		if(overrideProp == null) {
+			overrideProp = new Props(prop);
+		}
+		
+		Map<String, String> generalParams = new HashMap<String, String>();
+		Map<String, String> overrideParams = new HashMap<String, String>();
+		for(String ps : prop.getKeySet()) {
+			generalParams.put(ps, prop.getString(ps));
+		}
+		for(String ops : overrideProp.getKeySet()) {
+//			generalParams.put(ops, overrideProp.getString(ops));
+			overrideParams.put(ops, overrideProp.getString(ops));
+		}
+		ret.put("generalParams", generalParams);
+		ret.put("overrideParams", overrideParams);
+	}
+	
+	private void ajaxSetJobOverrideProperty(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+		String flowName = getParam(req, "flowName");
+		String jobName = getParam(req, "jobName");
+		
+		Flow flow = project.getFlow(flowName);
+		if(flow == null) {
+			ret.put("error", "Flow " + flowName + " not found in project " + project.getName());
+			return;
+		}
+		
+		Node node = flow.getNode(jobName);
+		if(node == null) {
+			ret.put("error", "Job " + jobName + " not found in flow " + flowName);
+			return;
+		}
+		
+		Map<String, String> jobParamGroup = this.getParamGroup(req, "jobOverride");
+		@SuppressWarnings("unchecked")
+		Props overrideParams = new Props(null, jobParamGroup);
+		try {
+			projectManager.setJobOverrideProperty(project, overrideParams, jobName);
+		} catch (ProjectManagerException e) {
+			ret.put("error", "Failed to upload job override property");
+		}
+
+	}
+		
 	private void ajaxFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
 		ArrayList<Map<String,Object>> flowList = new ArrayList<Map<String,Object>>();
 		for (Flow flow: project.getFlows()) {
@@ -751,6 +841,14 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 					}
 					else {
 						Props prop = projectManager.getProperties(project, node.getJobSource());
+						Props overrideProp = projectManager.getJobOverrideProperty(project, jobName);
+						if(overrideProp == null) {
+							overrideProp = new Props();
+						}
+						Props comboProp = new Props(prop);
+						for(String key : overrideProp.getKeySet()) {
+							comboProp.put(key, overrideProp.get(key));
+						}
 						page.add("jobid", node.getId());
 						page.add("jobtype", node.getType());
 						
@@ -791,11 +889,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 							page.add("properties", source);
 						}
 						
-
 						ArrayList<Pair<String,String>> parameters = new ArrayList<Pair<String, String>>();
 						// Parameter
-						for (String key : prop.getKeySet()) {
-							String value = prop.get(key);
+						for (String key : comboProp.getKeySet()) {
+							String value = comboProp.get(key);
 							parameters.add(new Pair<String,String>(key, value));
 						}
 						
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index b5da554..489299a 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -160,6 +160,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			sched.setSlaOptions(slaOptions);
 			scheduleManager.insertSchedule(sched);
 
+			if(slaOptions != null) {
+				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + flowName + " has been added/changed.");
+			}
+			
 		} catch (ServletException e) {
 			ret.put("error", e);
 		}
@@ -433,7 +437,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
-		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
+		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
 		
 		ret.put("status", "success");
 		ret.put("message", projectName + "." + flowName + " scheduled.");
@@ -502,8 +506,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
-		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
-		
+		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
+
 		ret.put("status", "success");
 		ret.put("message", projectName + "." + flowName + " scheduled.");
 	}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
index b5f7401..536cdb5 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -25,6 +25,7 @@
 		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
 		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.jobedit.view.js"></script>
 		<script type="text/javascript">
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
@@ -60,6 +61,7 @@
 					</div>
 					
 					<a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Job History</a>
+					<a id="edit-job-btn" class="btn1" onclick='jobEditView.show("${project.name}", "${flowid}", "${jobid}")'>Job Edit</a>
 				</div>
 			</div>
 			
@@ -115,5 +117,55 @@
 	</div>
 
 #end
+
+		<!-- modal content -->
+
+		<div id="jobEditModalBackground" class="modalBackground2">
+			<div id="job-edit-pane" class="modal modalContainer2">
+				<a href='#' title='Close' class='modal-close'>x</a>
+					<h3>Job Description Edit</h3>					
+					<div class="optionsPane">
+						<div id="generalPanel" class="generalPanel panel">
+							<div id="mustHave">
+								<h4>Job Essentials</h4>
+								<dl>
+									<dt>Job Name</dt>
+									<dd>
+										<label id="jobName"></label>
+									</dd>
+									<dt>Job Type</dt>
+									<dd>
+										<label id="jobType"></label>
+									</dd>
+								</dl>
+							</div>
+							<br></br>
+							<!--div id="jobTypeSpecific">
+								<h4>Job Type Specific parameters</h4>
+							</div-->
+							<div id="generalJobSetting">
+								<h4>General Job Settings (Be Aware: A Job May Be Shared By Multiple Flows. The Change Will Be Global!)</h4>
+								<div class="tableDiv">
+									<table id="generalProps">
+										<thead>
+											<tr>
+												<th>Name</th>
+												<th>Value</th>
+											</tr>
+										</thead>
+										<tbody>
+											<tr id="addRow"><td id="addRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+										</tbody>
+									</table>
+								</div>
+							</div>
+						</div>
+					</div>
+					<div class="actions">
+						<a class="yes btn1" id="set-btn" >Set/Change Job Description</a>
+						<a class="no simplemodal-close btn3" id="cancel-btn" >Cancel</a>
+					</div>
+			</div>
+		</div>
 	</body>
 </html>
\ No newline at end of file
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 67d3287..9f38ce3 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1529,6 +1529,50 @@ tr:hover td {
 }
 
 
+#job-edit-pane {
+	left: 100px;
+	right: 100px;
+	top: 50px;
+	bottom: 40px;
+}
+
+#job-edit-pane .list {
+	width: 255px;
+}
+
+#job-edit-pane .optionsPane {
+	position: absolute;
+	top: 85px;
+	background-color: #FFF;
+	left: 0px;
+	right: 0px;
+	bottom: 0px;
+}
+
+#job-edit-pane .panel {
+	position: absolute;
+	width: 100%;
+	top: 0px;
+	bottom: 65px;
+}
+
+#job-edit-pane .generalPanel.panel {
+	background-color: #F4F4F4;
+	padding-top: 15px;
+}
+
+#job-edit-pane h3 {
+	margin-left: 20px;
+	font-size: 14pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#job-edit-pane h4 {
+	margin-left: 20px;
+	font-size: 12pt;
+	border-bottom: 1px solid #CCC;
+}
+
 #schedule-options {
 	left: 100px;
 	right: 100px;
diff --git a/src/web/js/azkaban.jobedit.view.js b/src/web/js/azkaban.jobedit.view.js
new file mode 100644
index 0000000..08d248b
--- /dev/null
+++ b/src/web/js/azkaban.jobedit.view.js
@@ -0,0 +1,223 @@
+$.namespace('azkaban');
+
+var jobEditView;
+azkaban.JobEditView = Backbone.View.extend({
+	events : {
+		"click" : "closeEditingTarget",
+		"click #set-btn": "handleSet",	
+		"click #cancel-btn": "handleCancel",
+		"click .modal-close": "handleCancel",
+		"click #addRow": "handleAddRow",
+		"click table .editable": "handleEditColumn",
+		"click table .removeIcon": "handleRemoveColumn"
+	},
+	initialize: function(setting) {
+		this.projectURL = contextURL + "manager"
+		this.generalParams = {}
+		this.overrideParams = {}
+	},
+	handleCancel: function(evt) {
+		$('#jobEditModalBackground').hide();
+		$('#job-edit-pane').hide();
+		var tbl = document.getElementById("generalProps").tBodies[0];
+		var rows = tbl.rows;
+		var len = rows.length;
+		for(var i=0; i < len-1; i++) {
+			tbl.deleteRow(0);
+		}
+	},
+	show: function(projectName, flowName, jobName) {
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.jobName = jobName;
+		
+		var projectURL = this.projectURL
+		
+		
+		$('#jobEditModalBackground').show();
+		$('#job-edit-pane').show();
+		
+		var handleAddRow = this.handleAddRow;
+		
+//		var overrideParams;
+//		var generalParams;
+//		this.overrideParams = overrideParams;
+//		this.generalParams = generalParams;
+		var fetchJobInfo = {"project": this.projectName, "ajax":"fetchJobInfo", "flowName":this.flowName, "jobName":this.jobName};
+		
+		var mythis = this;
+		
+		$.get(
+				projectURL,
+				fetchJobInfo,
+				function(data) {
+					if (data.error) {
+						alert(data.error);
+					}
+					else {
+						document.getElementById('jobName').innerHTML = data.jobName;				
+						document.getElementById('jobType').innerHTML = data.jobType;
+						var generalParams = data.generalParams;
+						var overrideParams = data.overrideParams;
+						
+//						for(var key in generalParams) {
+//							var row = handleAddRow();
+//							var td = $(row).find('span');
+//							$(td[1]).text(key);
+//							$(td[2]).text(generalParams[key]);
+//						}
+						
+						mythis.overrideParams = overrideParams;
+						mythis.generalParams = generalParams;
+						
+						for(var okey in overrideParams) {
+							if(okey != 'type' && okey != 'dependencies') {
+								var row = handleAddRow();
+								var td = $(row).find('span');
+								$(td[1]).text(okey);
+								$(td[2]).text(overrideParams[okey]);
+							}
+						}
+						
+					}
+				},
+				"json"
+			);
+
+	},
+	handleSet: function(evt) {
+
+		var jobOverride = {};
+	  	var editRows = $(".editRow");
+		for (var i = 0; i < editRows.length; ++i) {
+			var row = editRows[i];
+			var td = $(row).find('span');
+			var key = $(td[1]).text();
+			var val = $(td[2]).text();
+
+			if (key && key.length > 0) {
+				jobOverride[key] = val;
+			}
+		}
+		
+		var overrideParams = this.overrideParams
+		var generalParams = this.generalParams
+		
+		jobOverride['type'] = overrideParams['type']
+		if('dependencies' in overrideParams) {
+			jobOverride['dependencies'] = overrideParams['dependencies']
+		}
+		
+		var project = this.projectName
+		var flowName = this.flowName
+		var jobName = this.jobName
+		
+		var jobOverrideData = {
+			project: project,
+			flowName: flowName,
+			jobName: jobName,
+			ajax: "setJobOverrideProperty",
+			jobOverride: jobOverride
+		};
+
+		var projectURL = this.projectURL
+		var redirectURL = projectURL+'?project='+project+'&flow='+flowName+'&job='+jobName;
+		
+		$.get(
+			projectURL,
+			jobOverrideData,
+			function(data) {
+				if (data.error) {
+					alert(data.error);
+				}
+				else {
+					window.location = redirectURL;
+				}
+			},
+			"json"
+		);
+	},
+	handleAddRow: function(evt) {
+		var tr = document.createElement("tr");
+	  	var tdName = document.createElement("td");
+	    var tdValue = document.createElement("td");
+
+	    var icon = document.createElement("span");
+	    $(icon).addClass("removeIcon");
+	    var nameData = document.createElement("span");
+	    $(nameData).addClass("spanValue");
+	    var valueData = document.createElement("span");
+	    $(valueData).addClass("spanValue");
+
+		$(tdName).append(icon);
+		$(tdName).append(nameData);
+		$(tdName).addClass("name");
+		$(tdName).addClass("editable");
+
+		$(tdValue).append(valueData);
+	    $(tdValue).addClass("editable");
+
+		$(tr).addClass("editRow");
+	  	$(tr).append(tdName);
+	  	$(tr).append(tdValue);
+
+	  	$(tr).insertBefore("#addRow");
+	  	return tr;
+
+	},
+	handleEditColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+	
+		if (this.editingTarget != curTarget) {
+			this.closeEditingTarget();
+			
+			var text = $(curTarget).children(".spanValue").text();
+			$(curTarget).empty();
+						
+			var input = document.createElement("input");
+			$(input).attr("type", "text");
+			$(input).css("width", "100%");
+			$(input).val(text);
+			$(curTarget).addClass("editing");
+			$(curTarget).append(input);
+			$(input).focus();
+			this.editingTarget = curTarget;
+		}
+	},
+	handleRemoveColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+		// Should be the table
+		var row = curTarget.parentElement.parentElement;
+		$(row).remove();
+	},
+	closeEditingTarget: function(evt) {
+		if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
+	  		var input = $(this.editingTarget).children("input")[0];
+	  		var text = $(input).val();
+	  		$(input).remove();
+
+		    var valueData = document.createElement("span");
+		    $(valueData).addClass("spanValue");
+		    $(valueData).text(text);
+
+	  		if ($(this.editingTarget).hasClass("name")) {
+		  		var icon = document.createElement("span");
+		    	$(icon).addClass("removeIcon");
+		    	$(this.editingTarget).append(icon);
+		    }
+
+		    $(this.editingTarget).removeClass("editing");
+		    $(this.editingTarget).append(valueData);
+		    this.editingTarget = null;
+	  	}
+	}
+});
+
+$(function() {
+
+
+	jobEditView = new azkaban.JobEditView({el:$('#job-edit-pane')});
+	
+	 
+	
+});
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index feaec68..9a9d80c 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,12 +21,14 @@ import azkaban.executor.ExecutorLoader;
 
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
 
 public class FlowRunnerTest {
 	private File workingDir;
 	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
 	public FlowRunnerTest() {
 		
 	}
@@ -41,6 +43,7 @@ public class FlowRunnerTest {
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
 		jobtypeManager.registerJobType("java", JavaJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
 	
 	@After
@@ -55,6 +58,8 @@ public class FlowRunnerTest {
 	@Test
 	public void exec1Normal() throws Exception {
 		MockExecutorLoader loader = new MockExecutorLoader();
+		//just making compile. may not work at all.
+		
 		EventCollectorListener eventCollector = new EventCollectorListener();
 		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
 		
@@ -349,7 +354,7 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(flow);
-		FlowRunner runner = new FlowRunner(flow, loader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
@@ -361,7 +366,7 @@ public class FlowRunnerTest {
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
 		loader.uploadExecutableFlow(exFlow);
-		FlowRunner runner = new FlowRunner(exFlow, loader, jobtypeManager);
+		FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
 		runner.addListener(eventCollector);
 		
 		return runner;
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index a9c560d..0739bd3 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -195,4 +195,18 @@ public class MockProjectLoader implements ProjectLoader {
 		// TODO Auto-generated method stub
 		
 	}
+
+	@Override
+	public void updateProjectProperty(Project project, Props props)
+			throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public Props fetchProjectProperty(int projectId, int projectVer,
+			String propsName) throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
 }
\ No newline at end of file