azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 28(+23 -5)
src/java/azkaban/project/ProjectManager.java 16(+16 -0)
src/java/azkaban/sla/SLAManager.java 104(+98 -6)
src/web/css/azkaban.css 44(+44 -0)
src/web/js/azkaban.jobedit.view.js 223(+223 -0)
Details
src/java/azkaban/execapp/FlowRunner.java 28(+23 -5)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 9682f16..d912e8a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -33,6 +33,8 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -45,6 +47,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private ExecutorService executorService;
private ExecutorLoader executorLoader;
+ private ProjectLoader projectLoader;
private ExecutableFlow flow;
private Thread currentThread;
@@ -77,10 +80,11 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowFinished = false;
private boolean flowCancelled = false;
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
+ this.projectLoader = projectLoader;
this.executorService = Executors.newFixedThreadPool(numThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
@@ -313,13 +317,27 @@ public class FlowRunner extends EventHandler implements Runnable {
// Load job file.
File path = new File(execDir, source);
Props prop = null;
+
+ // load the override props if any
try {
- prop = new Props(null, path);
- prop.setParent(parentProps);
- } catch (IOException e) {
+ prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getJobId()+".jor");
+ }
+ catch(ProjectManagerException e) {
e.printStackTrace();
- logger.error("Error loading job file " + source + " for job " + node.getJobId());
+ logger.error("Error loading job override property for job " + node.getJobId());
+ }
+ if(prop == null) {
+ // if no override prop, load the original one on disk
+ try {
+ prop = new Props(null, path);
+ } catch (IOException e) {
+ e.printStackTrace();
+ logger.error("Error loading job file " + source + " for job " + node.getJobId());
+ }
}
+ // setting this fake source as this will be used to determine the location of log files.
+ prop.setSource(path.getPath());
+ prop.setParent(parentProps);
// should have one prop with system secrets, the other user level props
JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 5054c48..750503e 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -243,7 +243,7 @@ public class FlowRunnerManager implements EventListener {
setupFlow(flow);
// Setup flow runner
- FlowRunner runner = new FlowRunner(flow, executorLoader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setGlobalProps(globalProps);
runner.addListener(this);
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 90f3363..2d9c475 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -749,6 +749,43 @@ public class JdbcProjectLoader implements ProjectLoader {
}
}
+ @Override
+ public void updateProjectProperty(Project project, Props props) throws ProjectManagerException {
+ Connection connection = getConnection();
+ try {
+ updateProjectProperty(connection, project, props.getSource(), props);
+ connection.commit();
+ }
+ catch (SQLException e) {
+ throw new ProjectManagerException("Error uploading project property files", e);
+ }
+ catch (IOException e) {
+ throw new ProjectManagerException("Error uploading project property file", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void updateProjectProperty(Connection connection, Project project, String name, Props props) throws ProjectManagerException, IOException {
+ QueryRunner runner = new QueryRunner();
+ final String UPDATE_PROPERTIES = "UPDATE project_properties SET property=? WHERE project_id=? AND version=? AND name=?";
+
+ String propertyJSON = PropsUtils.toJSONString(props, true);
+ byte[] data = propertyJSON.getBytes("UTF-8");
+ logger.info("UTF-8 size:" + data.length);
+ if (defaultEncodingType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(data);
+ }
+
+ try {
+ runner.update(connection, UPDATE_PROPERTIES, data, project.getId(), project.getVersion(), name);
+ connection.commit();
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Error updating property " + project.getName() + " version " + project.getVersion(), e);
+ }
+ }
+
private void uploadProjectProperty(Connection connection, Project project, String name, Props props) throws ProjectManagerException, IOException {
QueryRunner runner = new QueryRunner();
final String INSERT_PROPERTIES = "INSERT INTO project_properties (project_id, version, name, modified_time, encoding_type, property) values (?,?,?,?,?,?)";
@@ -769,6 +806,25 @@ public class JdbcProjectLoader implements ProjectLoader {
}
@Override
+ public Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException {
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
+ try {
+ List<Pair<String, Props>> properties =
+ runner.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId, projectVer, propsName);
+
+ if (properties == null || properties.isEmpty()) {
+ return null;
+ }
+
+ return properties.get(0).getSecond();
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Error fetching property " + propsName, e);
+ }
+ }
+
+ @Override
public Props fetchProjectProperty(Project project, String propsName) throws ProjectManagerException {
QueryRunner runner = new QueryRunner(dataSource);
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index bd4c6d9..4711dc0 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -214,5 +214,9 @@ public interface ProjectLoader {
* @throws ProjectManagerException
*/
public void cleanOlderProjectVersion(int projectId, int version) throws ProjectManagerException;
+
+ public void updateProjectProperty(Project project, Props props) throws ProjectManagerException;
+
+ Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException;
}
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectLogEvent.java b/src/java/azkaban/project/ProjectLogEvent.java
index 1d109bb..c6e8577 100644
--- a/src/java/azkaban/project/ProjectLogEvent.java
+++ b/src/java/azkaban/project/ProjectLogEvent.java
@@ -7,7 +7,7 @@ public class ProjectLogEvent {
* Only represent from 0 to 255 different codes.
*/
public static enum EventType {
- ERROR(128), CREATED(1), DELETED(2), USER_PERMISSION(3), GROUP_PERMISSION(4), DESCRIPTION(5), UPLOADED(6), SCHEDULE(7);
+ ERROR(128), CREATED(1), DELETED(2), USER_PERMISSION(3), GROUP_PERMISSION(4), DESCRIPTION(5), UPLOADED(6), SCHEDULE(7), SLA(8);
private int numVal;
@@ -35,6 +35,8 @@ public class ProjectLogEvent {
return UPLOADED;
case 7:
return SCHEDULE;
+ case 8:
+ return SLA;
case 128:
return ERROR;
default:
src/java/azkaban/project/ProjectManager.java 16(+16 -0)
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index f1ce489..e526474 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -200,6 +200,22 @@ public class ProjectManager {
public Props getProperties(Project project, String source) throws ProjectManagerException {
return projectLoader.fetchProjectProperty(project, source);
}
+
+ public Props getJobOverrideProperty(Project project, String jobName) throws ProjectManagerException {
+ return projectLoader.fetchProjectProperty(project, jobName+".jor");
+ }
+
+ public void setJobOverrideProperty(Project project, Props prop, String jobName) throws ProjectManagerException {
+ prop.setSource(jobName+".jor");
+ Props oldProps = projectLoader.fetchProjectProperty(project, prop.getSource());
+ if(oldProps == null) {
+ projectLoader.uploadProjectProperty(project, prop);
+ }
+ else {
+ projectLoader.updateProjectProperty(project, prop);
+ }
+ return;
+ }
public void updateProjectPermission(Project project, String name, Permission perm, boolean group, User modifier) throws ProjectManagerException {
logger.info("User " + modifier.getUserId() + " updating permissions for project " + project.getName() + " for " + name + " " + perm.toString());
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 1c80c58..b3382d3 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -307,6 +307,11 @@ public class Schedule{
return projectName + "." + flowName + " (" + projectId + ")";
}
+ public String toString() {
+ return projectName + "." + flowName + " (" + projectId + ")" + " to be run at (starting) " +
+ new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
+ }
+
public Pair<Integer, String> getScheduleId() {
return new Pair<Integer, String>(getProjectId(), getFlowName());
}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ad67def..e908b4f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -406,7 +406,7 @@ public class ScheduleManager {
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
- } catch (ExecutorManagerException e) {
+ } catch (Exception e) {
logger.error("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
logger.error(e.getMessage());
return;
src/java/azkaban/sla/SLAManager.java 104(+98 -6)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index ae17595..61815e3 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -223,10 +223,17 @@ public class SLAManager {
List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
for(SlaSetting set : jobSettings) {
ExecutableNode node = exflow.getExecutableNode(set.getId());
- if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
- submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+ if(node != null) {
+ if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
+ submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+ removeSettings.add(set);
+ logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+ }
+ }
+ else {
+ mailer.sendSlaEmail(s, "The SLA setting for flow/job is no longer valid as flow structure has changed. Execution " + s.getExecId());
removeSettings.add(set);
- logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+
}
}
for(SlaSetting remove : removeSettings) {
@@ -246,7 +253,10 @@ public class SLAManager {
}
else {
if(!metSla(runningSLA, exflow)) {
- takeSLAActions(runningSLA, exflow);
+ takeSLAFailActions(runningSLA, exflow);
+ }
+ else {
+ takeSLASuccessActions(runningSLA, exflow);
}
@@ -319,7 +329,7 @@ public class SLAManager {
}
}
- private void takeSLAActions(SLA s, ExecutableFlow exflow) {
+ private void takeSLAFailActions(SLA s, ExecutableFlow exflow) {
logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
List<SlaAction> actions = s.getActions();
for(SlaAction act : actions) {
@@ -333,7 +343,7 @@ public class SLAManager {
} else if(act.equals(SlaAction.KILL)) {
try {
executorManager.cancelFlow(exflow, "azkaban");
- //sendSlaKillEmail(s, exflow);
+ sendSlaKillEmail(s, exflow);
} catch (ExecutorManagerException e) {
// TODO Auto-generated catch block
logger.error("Cancel flow failed." + e.getCause());
@@ -342,6 +352,11 @@ public class SLAManager {
}
}
+ private void takeSLASuccessActions(SLA s, ExecutableFlow exflow) {
+ sendSlaSuccessEmail(s, exflow);
+
+ }
+
private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
String message = null;
ExecutableNode exnode;
@@ -380,6 +395,83 @@ public class SLAManager {
mailer.sendSlaEmail(s, message);
}
+ private void sendSlaSuccessEmail(SLA s, ExecutableFlow exflow) {
+ String message = null;
+ ExecutableNode exnode;
+ switch(s.getRule()) {
+ case FINISH:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " finished within the set SLA." + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " finished within the set SLA." + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ case SUCCESS:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " successfully finished within the set SLA." + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format(" %n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " successfully finished within the set SLA." + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ default:
+ logger.error("Unknown SLA rules!");
+ message = "Unknown SLA was not met!";
+ break;
+ }
+ mailer.sendSlaEmail(s, message);
+ }
+
+ private void sendSlaKillEmail(SLA s, ExecutableFlow exflow) {
+ String message = null;
+ ExecutableNode exnode;
+ switch(s.getRule()) {
+ case FINISH:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA and is killed. " + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA and is killed. " + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ case SUCCESS:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA and is killed. " + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format(" %n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA and is killed. " + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ default:
+ logger.error("Unknown SLA rules!");
+ message = "Unknown SLA was not met!";
+ break;
+ }
+ mailer.sendSlaEmail(s, message);
+ }
+
+
public class SLAPreRunner extends Thread {
private final List<SLA> preSlas;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 4002e4a..2f1403b 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -217,6 +217,16 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ajaxFetchFlowExecutions(project, ret, req);
}
}
+ else if (ajaxName.equals("fetchJobInfo")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxFetchJobInfo(project, ret, req);
+ }
+ }
+ else if (ajaxName.equals("setJobOverrideProperty")) {
+ if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
+ ajaxSetJobOverrideProperty(project, ret, req);
+ }
+ }
else {
ret.put("error", "Cannot execute command " + ajaxName);
}
@@ -375,6 +385,86 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
+ private void ajaxFetchJobInfo(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+
+ String flowName = getParam(req, "flowName");
+ String jobName = getParam(req, "jobName");
+
+ Flow flow = project.getFlow(flowName);
+ if(flow == null) {
+ ret.put("error", "Flow " + flowName + " not found in project " + project.getName());
+ return;
+ }
+
+ Node node = flow.getNode(jobName);
+ if(node == null) {
+ ret.put("error", "Job " + jobName + " not found in flow " + flowName);
+ return;
+ }
+
+ Props prop;
+ try {
+ prop = projectManager.getProperties(project, node.getJobSource());
+ } catch (ProjectManagerException e) {
+ ret.put("error", "Failed to retrieve job properties!");
+ return;
+ }
+
+ Props overrideProp;
+ try {
+ overrideProp = projectManager.getJobOverrideProperty(project, jobName);
+ } catch (ProjectManagerException e) {
+ ret.put("error", "Failed to retrieve job override properties!");
+ return;
+ }
+
+ ret.put("jobName", node.getId());
+ ret.put("jobType", prop.get("type"));
+
+ if(overrideProp == null) {
+ overrideProp = new Props(prop);
+ }
+
+ Map<String, String> generalParams = new HashMap<String, String>();
+ Map<String, String> overrideParams = new HashMap<String, String>();
+ for(String ps : prop.getKeySet()) {
+ generalParams.put(ps, prop.getString(ps));
+ }
+ for(String ops : overrideProp.getKeySet()) {
+// generalParams.put(ops, overrideProp.getString(ops));
+ overrideParams.put(ops, overrideProp.getString(ops));
+ }
+ ret.put("generalParams", generalParams);
+ ret.put("overrideParams", overrideParams);
+ }
+
+ private void ajaxSetJobOverrideProperty(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ String flowName = getParam(req, "flowName");
+ String jobName = getParam(req, "jobName");
+
+ Flow flow = project.getFlow(flowName);
+ if(flow == null) {
+ ret.put("error", "Flow " + flowName + " not found in project " + project.getName());
+ return;
+ }
+
+ Node node = flow.getNode(jobName);
+ if(node == null) {
+ ret.put("error", "Job " + jobName + " not found in flow " + flowName);
+ return;
+ }
+
+ Map<String, String> jobParamGroup = this.getParamGroup(req, "jobOverride");
+ @SuppressWarnings("unchecked")
+ Props overrideParams = new Props(null, jobParamGroup);
+ try {
+ projectManager.setJobOverrideProperty(project, overrideParams, jobName);
+ } catch (ProjectManagerException e) {
+ ret.put("error", "Failed to upload job override property");
+ }
+
+ }
+
private void ajaxFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
ArrayList<Map<String,Object>> flowList = new ArrayList<Map<String,Object>>();
for (Flow flow: project.getFlows()) {
@@ -751,6 +841,14 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
else {
Props prop = projectManager.getProperties(project, node.getJobSource());
+ Props overrideProp = projectManager.getJobOverrideProperty(project, jobName);
+ if(overrideProp == null) {
+ overrideProp = new Props();
+ }
+ Props comboProp = new Props(prop);
+ for(String key : overrideProp.getKeySet()) {
+ comboProp.put(key, overrideProp.get(key));
+ }
page.add("jobid", node.getId());
page.add("jobtype", node.getType());
@@ -791,11 +889,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.add("properties", source);
}
-
ArrayList<Pair<String,String>> parameters = new ArrayList<Pair<String, String>>();
// Parameter
- for (String key : prop.getKeySet()) {
- String value = prop.get(key);
+ for (String key : comboProp.getKeySet()) {
+ String value = comboProp.get(key);
parameters.add(new Pair<String,String>(key, value));
}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index b5da554..489299a 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -160,6 +160,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
sched.setSlaOptions(slaOptions);
scheduleManager.insertSchedule(sched);
+ if(slaOptions != null) {
+ projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + flowName + " has been added/changed.");
+ }
+
} catch (ServletException e) {
ret.put("error", e);
}
@@ -433,7 +437,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
- projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
+ projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
ret.put("status", "success");
ret.put("message", projectName + "." + flowName + " scheduled.");
@@ -502,8 +506,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
- projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
-
+ projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
+
ret.put("status", "success");
ret.put("message", projectName + "." + flowName + " scheduled.");
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
index b5f7401..536cdb5 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -25,6 +25,7 @@
<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.jobedit.view.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
var currentTime = ${currentTime};
@@ -60,6 +61,7 @@
</div>
<a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Job History</a>
+ <a id="edit-job-btn" class="btn1" onclick='jobEditView.show("${project.name}", "${flowid}", "${jobid}")'>Job Edit</a>
</div>
</div>
@@ -115,5 +117,55 @@
</div>
#end
+
+ <!-- modal content -->
+
+ <div id="jobEditModalBackground" class="modalBackground2">
+ <div id="job-edit-pane" class="modal modalContainer2">
+ <a href='#' title='Close' class='modal-close'>x</a>
+ <h3>Job Description Edit</h3>
+ <div class="optionsPane">
+ <div id="generalPanel" class="generalPanel panel">
+ <div id="mustHave">
+ <h4>Job Essentials</h4>
+ <dl>
+ <dt>Job Name</dt>
+ <dd>
+ <label id="jobName"></label>
+ </dd>
+ <dt>Job Type</dt>
+ <dd>
+ <label id="jobType"></label>
+ </dd>
+ </dl>
+ </div>
+ <br></br>
+ <!--div id="jobTypeSpecific">
+ <h4>Job Type Specific parameters</h4>
+ </div-->
+ <div id="generalJobSetting">
+ <h4>General Job Settings (Be Aware: A Job May Be Shared By Multiple Flows. The Change Will Be Global!)</h4>
+ <div class="tableDiv">
+ <table id="generalProps">
+ <thead>
+ <tr>
+ <th>Name</th>
+ <th>Value</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr id="addRow"><td id="addRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+ </div>
+ <div class="actions">
+ <a class="yes btn1" id="set-btn" >Set/Change Job Description</a>
+ <a class="no simplemodal-close btn3" id="cancel-btn" >Cancel</a>
+ </div>
+ </div>
+ </div>
</body>
</html>
\ No newline at end of file
src/web/css/azkaban.css 44(+44 -0)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 67d3287..9f38ce3 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1529,6 +1529,50 @@ tr:hover td {
}
+#job-edit-pane {
+ left: 100px;
+ right: 100px;
+ top: 50px;
+ bottom: 40px;
+}
+
+#job-edit-pane .list {
+ width: 255px;
+}
+
+#job-edit-pane .optionsPane {
+ position: absolute;
+ top: 85px;
+ background-color: #FFF;
+ left: 0px;
+ right: 0px;
+ bottom: 0px;
+}
+
+#job-edit-pane .panel {
+ position: absolute;
+ width: 100%;
+ top: 0px;
+ bottom: 65px;
+}
+
+#job-edit-pane .generalPanel.panel {
+ background-color: #F4F4F4;
+ padding-top: 15px;
+}
+
+#job-edit-pane h3 {
+ margin-left: 20px;
+ font-size: 14pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#job-edit-pane h4 {
+ margin-left: 20px;
+ font-size: 12pt;
+ border-bottom: 1px solid #CCC;
+}
+
#schedule-options {
left: 100px;
right: 100px;
src/web/js/azkaban.jobedit.view.js 223(+223 -0)
diff --git a/src/web/js/azkaban.jobedit.view.js b/src/web/js/azkaban.jobedit.view.js
new file mode 100644
index 0000000..08d248b
--- /dev/null
+++ b/src/web/js/azkaban.jobedit.view.js
@@ -0,0 +1,223 @@
+$.namespace('azkaban');
+
+var jobEditView;
+azkaban.JobEditView = Backbone.View.extend({
+ events : {
+ "click" : "closeEditingTarget",
+ "click #set-btn": "handleSet",
+ "click #cancel-btn": "handleCancel",
+ "click .modal-close": "handleCancel",
+ "click #addRow": "handleAddRow",
+ "click table .editable": "handleEditColumn",
+ "click table .removeIcon": "handleRemoveColumn"
+ },
+ initialize: function(setting) {
+ this.projectURL = contextURL + "manager"
+ this.generalParams = {}
+ this.overrideParams = {}
+ },
+ handleCancel: function(evt) {
+ $('#jobEditModalBackground').hide();
+ $('#job-edit-pane').hide();
+ var tbl = document.getElementById("generalProps").tBodies[0];
+ var rows = tbl.rows;
+ var len = rows.length;
+ for(var i=0; i < len-1; i++) {
+ tbl.deleteRow(0);
+ }
+ },
+ show: function(projectName, flowName, jobName) {
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.jobName = jobName;
+
+ var projectURL = this.projectURL
+
+
+ $('#jobEditModalBackground').show();
+ $('#job-edit-pane').show();
+
+ var handleAddRow = this.handleAddRow;
+
+// var overrideParams;
+// var generalParams;
+// this.overrideParams = overrideParams;
+// this.generalParams = generalParams;
+ var fetchJobInfo = {"project": this.projectName, "ajax":"fetchJobInfo", "flowName":this.flowName, "jobName":this.jobName};
+
+ var mythis = this;
+
+ $.get(
+ projectURL,
+ fetchJobInfo,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ document.getElementById('jobName').innerHTML = data.jobName;
+ document.getElementById('jobType').innerHTML = data.jobType;
+ var generalParams = data.generalParams;
+ var overrideParams = data.overrideParams;
+
+// for(var key in generalParams) {
+// var row = handleAddRow();
+// var td = $(row).find('span');
+// $(td[1]).text(key);
+// $(td[2]).text(generalParams[key]);
+// }
+
+ mythis.overrideParams = overrideParams;
+ mythis.generalParams = generalParams;
+
+ for(var okey in overrideParams) {
+ if(okey != 'type' && okey != 'dependencies') {
+ var row = handleAddRow();
+ var td = $(row).find('span');
+ $(td[1]).text(okey);
+ $(td[2]).text(overrideParams[okey]);
+ }
+ }
+
+ }
+ },
+ "json"
+ );
+
+ },
+ handleSet: function(evt) {
+
+ var jobOverride = {};
+ var editRows = $(".editRow");
+ for (var i = 0; i < editRows.length; ++i) {
+ var row = editRows[i];
+ var td = $(row).find('span');
+ var key = $(td[1]).text();
+ var val = $(td[2]).text();
+
+ if (key && key.length > 0) {
+ jobOverride[key] = val;
+ }
+ }
+
+ var overrideParams = this.overrideParams
+ var generalParams = this.generalParams
+
+ jobOverride['type'] = overrideParams['type']
+ if('dependencies' in overrideParams) {
+ jobOverride['dependencies'] = overrideParams['dependencies']
+ }
+
+ var project = this.projectName
+ var flowName = this.flowName
+ var jobName = this.jobName
+
+ var jobOverrideData = {
+ project: project,
+ flowName: flowName,
+ jobName: jobName,
+ ajax: "setJobOverrideProperty",
+ jobOverride: jobOverride
+ };
+
+ var projectURL = this.projectURL
+ var redirectURL = projectURL+'?project='+project+'&flow='+flowName+'&job='+jobName;
+
+ $.get(
+ projectURL,
+ jobOverrideData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ window.location = redirectURL;
+ }
+ },
+ "json"
+ );
+ },
+ handleAddRow: function(evt) {
+ var tr = document.createElement("tr");
+ var tdName = document.createElement("td");
+ var tdValue = document.createElement("td");
+
+ var icon = document.createElement("span");
+ $(icon).addClass("removeIcon");
+ var nameData = document.createElement("span");
+ $(nameData).addClass("spanValue");
+ var valueData = document.createElement("span");
+ $(valueData).addClass("spanValue");
+
+ $(tdName).append(icon);
+ $(tdName).append(nameData);
+ $(tdName).addClass("name");
+ $(tdName).addClass("editable");
+
+ $(tdValue).append(valueData);
+ $(tdValue).addClass("editable");
+
+ $(tr).addClass("editRow");
+ $(tr).append(tdName);
+ $(tr).append(tdValue);
+
+ $(tr).insertBefore("#addRow");
+ return tr;
+
+ },
+ handleEditColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+
+ if (this.editingTarget != curTarget) {
+ this.closeEditingTarget();
+
+ var text = $(curTarget).children(".spanValue").text();
+ $(curTarget).empty();
+
+ var input = document.createElement("input");
+ $(input).attr("type", "text");
+ $(input).css("width", "100%");
+ $(input).val(text);
+ $(curTarget).addClass("editing");
+ $(curTarget).append(input);
+ $(input).focus();
+ this.editingTarget = curTarget;
+ }
+ },
+ handleRemoveColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+ // Should be the table
+ var row = curTarget.parentElement.parentElement;
+ $(row).remove();
+ },
+ closeEditingTarget: function(evt) {
+ if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
+ var input = $(this.editingTarget).children("input")[0];
+ var text = $(input).val();
+ $(input).remove();
+
+ var valueData = document.createElement("span");
+ $(valueData).addClass("spanValue");
+ $(valueData).text(text);
+
+ if ($(this.editingTarget).hasClass("name")) {
+ var icon = document.createElement("span");
+ $(icon).addClass("removeIcon");
+ $(this.editingTarget).append(icon);
+ }
+
+ $(this.editingTarget).removeClass("editing");
+ $(this.editingTarget).append(valueData);
+ this.editingTarget = null;
+ }
+ }
+});
+
+$(function() {
+
+
+ jobEditView = new azkaban.JobEditView({el:$('#job-edit-pane')});
+
+
+
+});
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index feaec68..9a9d80c 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,12 +21,14 @@ import azkaban.executor.ExecutorLoader;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.project.ProjectLoader;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
public class FlowRunnerTest {
private File workingDir;
private JobTypeManager jobtypeManager;
+ private ProjectLoader fakeProjectLoader;
public FlowRunnerTest() {
}
@@ -41,6 +43,7 @@ public class FlowRunnerTest {
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
jobtypeManager.registerJobType("java", JavaJob.class);
+ fakeProjectLoader = new MockProjectLoader(workingDir);
}
@After
@@ -55,6 +58,8 @@ public class FlowRunnerTest {
@Test
public void exec1Normal() throws Exception {
MockExecutorLoader loader = new MockExecutorLoader();
+ //just making compile. may not work at all.
+
EventCollectorListener eventCollector = new EventCollectorListener();
FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
@@ -349,7 +354,7 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
loader.uploadExecutableFlow(flow);
- FlowRunner runner = new FlowRunner(flow, loader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
@@ -361,7 +366,7 @@ public class FlowRunnerTest {
//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, loader, jobtypeManager);
+ FlowRunner runner = new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index a9c560d..0739bd3 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -195,4 +195,18 @@ public class MockProjectLoader implements ProjectLoader {
// TODO Auto-generated method stub
}
+
+ @Override
+ public void updateProjectProperty(Project project, Props props)
+ throws ProjectManagerException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Props fetchProjectProperty(int projectId, int projectVer,
+ String propsName) throws ProjectManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
\ No newline at end of file