azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 286(+246 -40)
src/java/azkaban/execapp/JobRunner.java 18(+11 -7)
src/java/azkaban/executor/ExecutableNode.java 162(+146 -16)
src/java/azkaban/executor/JdbcExecutorLoader.java 152(+132 -20)
src/java/azkaban/utils/PropsUtils.java 17(+17 -0)
src/sql/create_execution_job.sql 6(+4 -2)
src/sql/create_execution_logs.sql 2(+2 -0)
src/sql/update_2.0_to_2.01.sql 13(+13 -0)
src/web/js/azkaban.exflow.view.js 35(+34 -1)
unit/build.xml 26(+24 -2)
unit/executions/exectest2/myjob1.job 4(+4 -0)
unit/executions/exectest2/myjob3.job 5(+5 -0)
unit/executions/exectest2/myjob4.job 5(+5 -0)
unit/executions/exectest2/myjob5.job 5(+5 -0)
Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 5556d2a..4fd55c2 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -95,6 +95,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
logger.info("Resume called.");
handleAjaxResume(respMap, execid, user);
}
+ else if (action.equals(MODIFY_EXECUTION_ACTION)) {
+ logger.info("Modify Execution Action");
+ handleModifyExecutionRequest(respMap, execid, user, req);
+ }
else {
respMap.put("error", "action: '" + action + "' not supported.");
}
@@ -107,6 +111,38 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
resp.flushBuffer();
}
+ private void handleModifyExecutionRequest(Map<String, Object> respMap, int execId, String user, HttpServletRequest req) throws ServletException {
+ if (!hasParam(req, MODIFY_EXECUTION_ACTION_TYPE)) {
+ respMap.put(RESPONSE_ERROR, "Modification type not set.");
+ }
+ String modificationType = getParam(req, MODIFY_EXECUTION_ACTION_TYPE);
+ String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
+ String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+
+ try {
+ if (MODIFY_RETRY_JOBS.equals(modificationType)) {
+ flowRunnerManager.retryJobs(execId, user, jobIds);
+ }
+ else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
+
+ }
+ else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
+
+ }
+ else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
+
+ }
+ else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
+
+ }
+ else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
+
+ }
+ } catch (ExecutorManagerException e) {
+ respMap.put("error", e.getMessage());
+ }
+ }
+
private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
String type = getParam(req, "type");
int startByte = getIntParam(req, "offset");
@@ -121,13 +157,14 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
result = flowRunnerManager.readFlowLogs(execId, startByte, length);
respMap.putAll(result.toObject());
} catch (Exception e) {
- respMap.put("error", e.getMessage());
+ respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
else {
+ int attempt = getIntParam(req, "attempt", 0);
String jobId = getParam(req, "jobId");
try {
- LogData result = flowRunnerManager.readJobLogs(execId, jobId, startByte, length);
+ LogData result = flowRunnerManager.readJobLogs(execId, jobId, attempt, startByte, length);
respMap.putAll(result.toObject());
} catch (Exception e) {
respMap.put("error", e.getMessage());
src/java/azkaban/execapp/FlowRunner.java 286(+246 -40)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c3449d8..91ac757 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -33,6 +33,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
+import azkaban.utils.Pair;
import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
@@ -63,9 +64,12 @@ public class FlowRunner extends EventHandler implements Runnable {
private JobRunnerEventListener listener = new JobRunnerEventListener();
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
- private Map<String, JobRunner> allJobs = new ConcurrentHashMap<String, JobRunner>();
+ private Map<Pair<String, Integer>, JobRunner> allJobs = new ConcurrentHashMap<Pair<String, Integer>, JobRunner>();
private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
+ // Used for individual job pausing
+ private Map<String, ExecutableNode> pausedNode = new ConcurrentHashMap<String, ExecutableNode>();
+
private Object actionSyncObj = new Object();
private boolean flowPaused = false;
private boolean flowFailed = false;
@@ -173,7 +177,7 @@ public class FlowRunner extends EventHandler implements Runnable {
flowAppender.close();
try {
- executorLoader.uploadLogFile(execId, "", logFile);
+ executorLoader.uploadLogFile(execId, "", 0, logFile);
} catch (ExecutorManagerException e) {
e.printStackTrace();
}
@@ -238,7 +242,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
else {
runningJob.put(node.getJobId(), runner);
- allJobs.put(node.getJobId(), runner);
+ allJobs.put(new Pair<String, Integer>(node.getJobId(), node.getAttempt()), runner);
executorService.submit(runner);
logger.info("Job Started " + node.getJobId());
}
@@ -363,12 +367,13 @@ public class FlowRunner extends EventHandler implements Runnable {
public void cancel(String user) {
synchronized(actionSyncObj) {
+ logger.info("Flow cancelled by " + user);
flowPaused = false;
flowCancelled = true;
for (JobRunner runner: pausedJobsToRun) {
ExecutableNode node = runner.getNode();
- logger.info("Resumed flow is cancelled. Job killed " + node.getJobId());
+ logger.info("Resumed flow is cancelled. Job killed " + node.getJobId() + " by " + user);
node.setStatus(Status.KILLED);
jobsToRun.add(runner);
@@ -382,11 +387,192 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setStatus(Status.KILLED);
}
+ for (ExecutableNode node: pausedNode.values()) {
+ node.setStatus(Status.KILLED);
+ node.setPaused(false);
+ queueNextJob(node);
+ }
+
updateFlow();
interrupt();
}
}
+ public void cancelJob(String jobId, String user) throws ExecutorManagerException {
+ synchronized(actionSyncObj) {
+ logger.info("Cancel of job " + jobId + " called by user " + user);
+ JobRunner runner = runningJob.get(jobId);
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (runner != null) {
+ runner.cancel();
+ }
+ else {
+ Status status = node.getStatus();
+ if(status == Status.FAILED || status == Status.SUCCEEDED || status == Status.SKIPPED) {
+ throw new ExecutorManagerException("Can't cancel finished job " + jobId + " with status " + status);
+ }
+
+ node.setStatus(Status.KILLED);
+ if (node.isPaused()) {
+ node.setPaused(false);
+ queueNextJob(node);
+ }
+ }
+ }
+ }
+
+ public void resumeJob(String jobId, String user) throws ExecutorManagerException {
+ synchronized(actionSyncObj) {
+ if (runningJob.containsKey(jobId)) {
+ throw new ExecutorManagerException("Resume of job " + jobId + " failed since it's already running. User " + user);
+ }
+ else {
+ logger.info("Resume of job " + jobId + " requested by " + user);
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
+ }
+
+ if (node.isPaused()) {
+ node.setPaused(false);
+ if (pausedNode.containsKey(jobId)) {
+ queueNextJob(node);
+ }
+
+ updateFlow();
+ }
+ }
+ }
+ }
+
+ public void pauseJob(String jobId, String user) throws ExecutorManagerException {
+ synchronized(actionSyncObj) {
+ if (runningJob.containsKey(jobId)) {
+ throw new ExecutorManagerException("Pause of job " + jobId + " failed since it's already running. User " + user);
+ }
+ else {
+ logger.info("Pause of job " + jobId + " requested by " + user);
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
+ }
+
+ long startTime = node.getStartTime();
+ if (startTime < 0) {
+ node.setPaused(true);
+ updateFlow();
+ }
+ else {
+ throw new ExecutorManagerException("Cannot pause job " + jobId + " that's started.");
+ }
+ }
+ }
+ }
+
+ public void disableJob(String jobId, String user) throws ExecutorManagerException {
+ // Disable and then check to see if it's set.
+ synchronized(actionSyncObj) {
+ if (runningJob.containsKey(jobId)) {
+ throw new ExecutorManagerException("Disable of job " + jobId + " failed since it's already running. User " + user);
+ }
+ else {
+ logger.info("Disable of job " + jobId + " requested by " + user);
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
+ }
+
+ Status status = node.getStatus();
+ if (status == Status.DISABLED || status == Status.READY) {
+ node.setStatus(Status.DISABLED);
+ updateFlow();
+ }
+ else {
+ throw new ExecutorManagerException("Cannot disable job " + jobId + " with status " + status.toString());
+ }
+ }
+ }
+ }
+
+ public void enableJob(String jobId, String user) throws ExecutorManagerException {
+ // Disable and then check to see if it's set.
+ synchronized(actionSyncObj) {
+ if (runningJob.containsKey(jobId)) {
+ throw new ExecutorManagerException("Enable of job " + jobId + " failed since it's already running. User " + user);
+ }
+ else {
+ logger.info("Enable of job " + jobId + " requested by " + user);
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot enable.");
+ }
+
+ Status status = node.getStatus();
+ if (status == Status.DISABLED || status == Status.READY) {
+ node.setStatus(Status.READY);
+ updateFlow();
+ }
+ else {
+ throw new ExecutorManagerException("Cannot enable job " + jobId + " with status " + status.toString());
+ }
+ }
+ }
+ }
+
+ public void retryJobs(String[] jobIds, String user) {
+ synchronized(actionSyncObj) {
+ for (String jobId: jobIds) {
+ if (runningJob.containsKey(jobId)) {
+ logger.error("Cannot retry job " + jobId + " since it's already running. User " + user);
+ continue;
+ }
+ else {
+ logger.info("Retry of job " + jobId + " requested by " + user);
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ if (node == null) {
+ logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
+ }
+
+ Status status = node.getStatus();
+ if (status == Status.FAILED || status == Status.READY || status == Status.KILLED) {
+ node.resetForRetry();
+ reEnableDependents(node);
+ }
+ else {
+ logger.error("Cannot retry a job that hasn't finished. " + jobId);
+ }
+ }
+ }
+
+ boolean isFailureFound = false;
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ Status nodeStatus = node.getStatus();
+ if (nodeStatus == Status.FAILED || nodeStatus == Status.KILLED) {
+ isFailureFound = true;
+ break;
+ }
+ }
+
+ if (!isFailureFound) {
+ flow.setStatus(Status.RUNNING);
+ }
+
+ updateFlow();
+ }
+ }
+
+ private void reEnableDependents(ExecutableNode node) {
+ for(String dependent: node.getOutNodes()) {
+ ExecutableNode dependentNode = flow.getExecutableNode(dependent);
+
+ if (dependentNode.getStatus() == Status.KILLED) {
+ dependentNode.setStatus(Status.READY);
+ dependentNode.setUpdateTime(System.currentTimeMillis());
+ reEnableDependents(dependentNode);
+ }
+ }
+ }
+
private void interrupt() {
currentThread.interrupt();
}
@@ -435,50 +621,70 @@ public class FlowRunner extends EventHandler implements Runnable {
return Status.READY;
}
- private synchronized void queueNextJobs(ExecutableNode node) {
- for (String dependent : node.getOutNodes()) {
+ /**
+ * Iterates through the finished jobs dependents.
+ *
+ * @param node
+ */
+ private synchronized void queueNextJobs(ExecutableNode finishedNode) {
+ for (String dependent : finishedNode.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-
- Status nextStatus = getImpliedStatus(dependentNode);
- if (nextStatus == null) {
- // Not yet ready or not applicable
- continue;
- }
+ queueNextJob(dependentNode);
+ }
+ }
- dependentNode.setStatus(nextStatus);
+ /**
+ * Queues node for running if it's ready to be run.
+ *
+ * @param node
+ */
+ private void queueNextJob(ExecutableNode node) {
+ Status nextStatus = getImpliedStatus(node);
+ if (nextStatus == null) {
+ // Not yet ready or not applicable
+ return;
+ }
- Props previousOutput = null;
- // Iterate the in nodes again and create the dependencies
- for (String dependency : dependentNode.getInNodes()) {
- Props output = jobOutputProps.get(dependency);
- if (output != null) {
- output = Props.clone(output);
- output.setParent(previousOutput);
- previousOutput = output;
- }
+ node.setStatus(nextStatus);
+
+ Props previousOutput = null;
+ // Iterate the in nodes again and create the dependencies
+ for (String dependency : node.getInNodes()) {
+ Props output = jobOutputProps.get(dependency);
+ if (output != null) {
+ output = Props.clone(output);
+ output.setParent(previousOutput);
+ previousOutput = output;
}
+ }
- JobRunner runner = this.createJobRunner(dependentNode, previousOutput);
- synchronized(actionSyncObj) {
- if (flowPaused) {
- if (dependentNode.getStatus() != Status.DISABLED && dependentNode.getStatus() != Status.KILLED) {
- dependentNode.setStatus(Status.PAUSED);
- }
- pausedJobsToRun.add(runner);
- logger.info("Job Paused " + dependentNode.getJobId());
+ synchronized(actionSyncObj) {
+ //pausedNode
+ if (node.isPaused()) {
+ pausedNode.put(node.getJobId(), node);
+ logger.info("Job Paused " + node.getJobId());
+ return;
+ }
+
+ JobRunner runner = this.createJobRunner(node, previousOutput);
+ if (flowPaused) {
+ if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
+ node.setStatus(Status.PAUSED);
}
- else {
- logger.info("Adding " + dependentNode.getJobId() + " to run queue.");
- if (dependentNode.getStatus() != Status.DISABLED && dependentNode.getStatus() != Status.KILLED) {
- dependentNode.setStatus(Status.QUEUED);
- }
-
- jobsToRun.add(runner);
+ pausedJobsToRun.add(runner);
+ logger.info("Flow Paused. Pausing " + node.getJobId());
+ }
+ else {
+ logger.info("Adding " + node.getJobId() + " to run queue.");
+ if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
+ node.setStatus(Status.QUEUED);
}
+
+ jobsToRun.add(runner);
}
}
}
-
+
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
@@ -562,8 +768,8 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
- public File getJobLogFile(String jobId) {
- JobRunner runner = allJobs.get(jobId);
+ public File getJobLogFile(String jobId, int attempt) {
+ JobRunner runner = allJobs.get(new Pair<String, Integer>(jobId, attempt));
if (runner == null) {
return null;
}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index dda8674..5054c48 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -324,6 +324,66 @@ public class FlowRunnerManager implements EventListener {
runner.resume(user);
}
+ public void pauseJob(int execId, String jobId, String user) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+
+ if (runner == null) {
+ throw new ExecutorManagerException("Execution " + execId + " is not running.");
+ }
+
+ runner.pauseJob(jobId, user);
+ }
+
+ public void resumeJob(int execId, String jobId, String user) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+
+ if (runner == null) {
+ throw new ExecutorManagerException("Execution " + execId + " is not running.");
+ }
+
+ runner.resumeJob(jobId, user);
+ }
+
+ public void retryJobs(int execId, String user, String ... jobId) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+
+ if (runner == null) {
+ throw new ExecutorManagerException("Execution " + execId + " is not running.");
+ }
+
+ runner.retryJobs(jobId, user);
+ }
+
+ public void disableJob(int execId, String user, String jobId) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+
+ if (runner == null) {
+ throw new ExecutorManagerException("Execution " + execId + " is not running.");
+ }
+
+ runner.disableJob(jobId, user);
+ }
+
+ public void enableJob(int execId, String user, String jobId) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+
+ if (runner == null) {
+ throw new ExecutorManagerException("Execution " + execId + " is not running.");
+ }
+
+ runner.enableJob(jobId, user);
+ }
+
+ public void cancelJob(int execId, String user, String jobId) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+
+ if (runner == null) {
+ throw new ExecutorManagerException("Execution " + execId + " is not running.");
+ }
+
+ runner.cancelJob(jobId, user);
+ }
+
public ExecutableFlow getExecutableFlow(int execId) {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
@@ -384,7 +444,7 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
- public LogData readJobLogs(int execId, String jobId, int startByte, int length) throws ExecutorManagerException {
+ public LogData readJobLogs(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
throw new ExecutorManagerException("Running flow " + execId + " not found.");
@@ -397,7 +457,7 @@ public class FlowRunnerManager implements EventListener {
if (!dir.exists()) {
throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
}
- File logFile = runner.getJobLogFile(jobId);
+ File logFile = runner.getJobLogFile(jobId, attempt);
if (logFile != null && logFile.exists()) {
return FileIOUtils.readUtf8File(logFile, startByte, length);
}
diff --git a/src/java/azkaban/execapp/FlowWatcher.java b/src/java/azkaban/execapp/FlowWatcher.java
new file mode 100644
index 0000000..5bcb4d0
--- /dev/null
+++ b/src/java/azkaban/execapp/FlowWatcher.java
@@ -0,0 +1,9 @@
+package azkaban.execapp;
+
+/**
+ * Class that watches and updates execution flows that are being listened to by
+ * other executing flows.
+ */
+public class FlowWatcher {
+
+}
src/java/azkaban/execapp/JobRunner.java 18(+11 -7)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 5aa5a04..0b8d3c6 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -83,14 +83,13 @@ public class JobRunner extends EventHandler implements Runnable {
logger = Logger.getLogger(loggerName);
// Create file appender
-
- String logName = "_job." + executionId + "." + node.getJobId() + ".log";
+ String logName = node.getAttempt() > 0 ? "_job." + executionId + "." + node.getAttempt() + "." + node.getJobId() + ".log" : "_job." + executionId + "." + node.getJobId() + ".log";
logFile = new File(workingDir, logName);
String absolutePath = logFile.getAbsolutePath();
jobAppender = null;
try {
- FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, false);
+ FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
jobAppender = fileAppender;
logger.addAppender(jobAppender);
@@ -110,7 +109,7 @@ public class JobRunner extends EventHandler implements Runnable {
private void writeStatus() {
try {
node.setUpdateTime(System.currentTimeMillis());
- loader.updateExecutableNode(node, outputProps);
+ loader.updateExecutableNode(node);
} catch (ExecutorManagerException e) {
logger.error("Error writing node properties", e);
}
@@ -159,7 +158,7 @@ public class JobRunner extends EventHandler implements Runnable {
if (logFile != null) {
try {
- loader.uploadLogFile(executionId, node.getJobId(), logFile);
+ loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
} catch (ExecutorManagerException e) {
System.err.println("Error writing out logs for job " + node.getJobId());
}
@@ -192,7 +191,12 @@ public class JobRunner extends EventHandler implements Runnable {
return false;
}
- logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
+ if (node.getAttempt() > 0) {
+ logInfo("Starting job " + node.getJobId() + " attempt " + node.getAttempt() + " at " + node.getStartTime());
+ }
+ else {
+ logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
+ }
node.setStatus(Status.RUNNING);
// Ability to specify working directory
@@ -220,6 +224,7 @@ public class JobRunner extends EventHandler implements Runnable {
node.setStatus(Status.SUCCEEDED);
if (job != null) {
outputProps = job.getJobGeneratedProperties();
+ node.setOutputProps(outputProps);
}
}
@@ -266,5 +271,4 @@ public class JobRunner extends EventHandler implements Runnable {
public File getLogFile() {
return logFile;
}
-
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 8c1360e..6bda5cd 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -31,6 +31,16 @@ public interface ConnectorParams {
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
+ public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
+ public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
+ public static final String MODIFY_RETRY_JOBS = "retryJobs";
+ public static final String MODIFY_CANCEL_JOBS = "cancelJobs";
+ public static final String MODIFY_DISABLE_JOBS = "skipJobs";
+ public static final String MODIFY_ENABLE_JOBS = "enableJobs";
+ public static final String MODIFY_PAUSE_JOBS = "pauseJobs";
+ public static final String MODIFY_RESUME_JOBS = "resumeJobs";
+ public static final String MODIFY_JOBS_LIST = "jobIds";
+
public static final String START_PARAM = "start";
public static final String END_PARAM = "end";
public static final String STATUS_PARAM = "status";
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index ec05459..de2843a 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -118,6 +118,15 @@ public class ExecutableFlow {
this.setFlow(flow);
}
+ public ExecutableFlow(int executionId, Flow flow) {
+ this.projectId = flow.getProjectId();
+ this.flowId = flow.getId();
+ this.version = flow.getVersion();
+ this.executionId = executionId;
+
+ this.setFlow(flow);
+ }
+
public ExecutableFlow() {
}
@@ -207,6 +216,15 @@ public class ExecutableFlow {
exNode.setStatus(status);
return true;
}
+
+ public void setProxyNodes(int externalExecutionId, String nodeId) {
+ ExecutableNode exNode = executableNodes.get(nodeId);
+ if (exNode == null) {
+ return;
+ }
+
+ exNode.setExternalExecutionId(externalExecutionId);
+ }
public int getExecutionId() {
return executionId;
@@ -214,6 +232,10 @@ public class ExecutableFlow {
public void setExecutionId(int executionId) {
this.executionId = executionId;
+
+ for(ExecutableNode node: executableNodes.values()) {
+ node.setExecutionId(executionId);
+ }
}
public String getFlowId() {
diff --git a/src/java/azkaban/executor/ExecutableJobInfo.java b/src/java/azkaban/executor/ExecutableJobInfo.java
index 99f271e..b716b5f 100644
--- a/src/java/azkaban/executor/ExecutableJobInfo.java
+++ b/src/java/azkaban/executor/ExecutableJobInfo.java
@@ -14,8 +14,9 @@ public class ExecutableJobInfo {
private final long startTime;
private final long endTime;
private final Status status;
+ private final int attempt;
- public ExecutableJobInfo(int execId, int projectId, int version, String flowId, String jobId, long startTime, long endTime, Status status) {
+ public ExecutableJobInfo(int execId, int projectId, int version, String flowId, String jobId, long startTime, long endTime, Status status, int attempt) {
this.execId = execId;
this.projectId = projectId;
this.startTime = startTime;
@@ -24,6 +25,7 @@ public class ExecutableJobInfo {
this.version = version;
this.flowId = flowId;
this.jobId = jobId;
+ this.attempt = attempt;
}
public int getProjectId() {
@@ -58,6 +60,10 @@ public class ExecutableJobInfo {
return status;
}
+ public int getAttempt() {
+ return attempt;
+ }
+
public Map<String, Object> toObject() {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("execId", execId);
@@ -67,6 +73,7 @@ public class ExecutableJobInfo {
map.put("startTime", startTime);
map.put("endTime", endTime);
map.put("status", status.toString());
+ map.put("attempt", attempt);
return map;
}
src/java/azkaban/executor/ExecutableNode.java 162(+146 -16)
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 5541bb2..66526af 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -20,11 +20,13 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Node;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
public class ExecutableNode {
private String jobId;
@@ -32,17 +34,23 @@ public class ExecutableNode {
private String type;
private String jobPropsSource;
private String inheritPropsSource;
- private String outputPropsSource;
private Status status = Status.READY;
private long startTime = -1;
private long endTime = -1;
private long updateTime = -1;
private int level = 0;
private ExecutableFlow flow;
+ private Props outputProps;
+ private int attempt = 0;
+ private boolean paused = false;
private Set<String> inNodes = new HashSet<String>();
private Set<String> outNodes = new HashSet<String>();
+ // Used if proxy node
+ private Integer externalExecutionId;
+ private List<Attempt> pastAttempts = null;
+
public ExecutableNode(Node node, ExecutableFlow flow) {
jobId = node.getId();
executionId = flow.getExecutionId();
@@ -57,6 +65,20 @@ public class ExecutableNode {
public ExecutableNode() {
}
+ public void resetForRetry() {
+ Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
+ attempt++;
+ if (pastAttempts == null) {
+ pastAttempts = new ArrayList<Attempt>();
+ pastAttempts.add(pastAttempt);
+ }
+
+ startTime = -1;
+ endTime = -1;
+ updateTime = System.currentTimeMillis();
+ status = Status.READY;
+ }
+
public void setExecutableFlow(ExecutableFlow flow) {
this.flow = flow;
}
@@ -64,11 +86,11 @@ public class ExecutableNode {
public void setExecutionId(int id) {
executionId = id;
}
-
+
public int getExecutionId() {
return executionId;
}
-
+
public String getJobId() {
return jobId;
}
@@ -76,15 +98,15 @@ public class ExecutableNode {
public void setJobId(String id) {
this.jobId = id;
}
-
+
public void addInNode(String exNode) {
inNodes.add(exNode);
}
-
+
public void addOutNode(String exNode) {
outNodes.add(exNode);
}
-
+
public Set<String> getOutNodes() {
return outNodes;
}
@@ -96,11 +118,11 @@ public class ExecutableNode {
public Status getStatus() {
return status;
}
-
+
public void setStatus(Status status) {
this.status = status;
}
-
+
public Object toObject() {
HashMap<String, Object> objMap = new HashMap<String, Object>();
objMap.put("id", jobId);
@@ -114,9 +136,15 @@ public class ExecutableNode {
objMap.put("endTime", endTime);
objMap.put("updateTime", updateTime);
objMap.put("level", level);
+ objMap.put("externalExecutionId", externalExecutionId);
+ objMap.put("paused", paused);
- if (outputPropsSource != null) {
- objMap.put("outputSource", outputPropsSource);
+ if (pastAttempts != null) {
+ ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
+ for (Attempt attempts : pastAttempts) {
+ attemptsList.add(attempts.toObject());
+ }
+ objMap.put("pastAttempts", attemptsList);
}
return objMap;
@@ -131,7 +159,6 @@ public class ExecutableNode {
exNode.jobId = (String)objMap.get("id");
exNode.jobPropsSource = (String)objMap.get("jobSource");
exNode.inheritPropsSource = (String)objMap.get("propSource");
- exNode.outputPropsSource = (String)objMap.get("outputSource");
exNode.type = (String)objMap.get("jobType");
exNode.status = Status.valueOf((String)objMap.get("status"));
@@ -143,11 +170,28 @@ public class ExecutableNode {
exNode.updateTime = JSONUtils.getLongFromObject(objMap.get("updateTime"));
exNode.level = (Integer)objMap.get("level");
+ exNode.externalExecutionId = (Integer)objMap.get("externalExecutionId");
+
exNode.flow = flow;
+ Boolean paused = (Boolean)objMap.get("paused");
+ if (paused!=null) {
+ exNode.paused = paused;
+ }
+
+ List<Object> pastAttempts = (List<Object>)objMap.get("pastAttempts");
+ if (pastAttempts!=null) {
+ ArrayList<Attempt> attempts = new ArrayList<Attempt>();
+ for (Object attemptObj: pastAttempts) {
+ Attempt attempt = Attempt.fromObject(attemptObj);
+ attempts.add(attempt);
+ }
+
+ exNode.pastAttempts = attempts;
+ }
return exNode;
}
-
+
@SuppressWarnings("unchecked")
public void updateNodeFromObject(Object obj) {
HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
@@ -156,7 +200,7 @@ public class ExecutableNode {
startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
}
-
+
public long getStartTime() {
return startTime;
}
@@ -172,7 +216,7 @@ public class ExecutableNode {
public void setEndTime(long endTime) {
this.endTime = endTime;
}
-
+
public String getJobPropsSource() {
return jobPropsSource;
}
@@ -180,11 +224,11 @@ public class ExecutableNode {
public String getPropsSource() {
return inheritPropsSource;
}
-
+
public int getLevel() {
return level;
}
-
+
public ExecutableFlow getFlow() {
return flow;
}
@@ -196,4 +240,90 @@ public class ExecutableNode {
public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}
+
+ public void setOutputProps(Props output) {
+ this.outputProps = output;
+ }
+
+ public Props getOutputProps() {
+ return outputProps;
+ }
+
+ public Integer getExternalExecutionId() {
+ return externalExecutionId;
+ }
+
+ public void setExternalExecutionId(Integer externalExecutionId) {
+ this.externalExecutionId = externalExecutionId;
+ }
+
+ public List<Attempt> getPastAttemptList() {
+ return pastAttempts;
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ public void setAttempt(int attempt) {
+ this.attempt = attempt;
+ }
+
+ public boolean isPaused() {
+ return paused;
+ }
+
+ public void setPaused(boolean paused) {
+ this.paused = paused;
+ }
+
+ public static class Attempt {
+ private int attempt = 0;
+ private long startTime = -1;
+ private long endTime = -1;
+ private Status status;
+
+ public Attempt(int attempt, long startTime, long endTime, Status status) {
+ this.attempt = attempt;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.status = status;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ public static Attempt fromObject(Object obj) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>)obj;
+ int attempt = (Integer)map.get("attempt");
+ long startTime = JSONUtils.getLongFromObject(map.get("startTime"));
+ long endTime = JSONUtils.getLongFromObject(map.get("endTime"));
+ Status status = Status.valueOf((String)map.get("status"));
+
+ return new Attempt(attempt, startTime, endTime, status);
+ }
+
+ public Map<String, Object> toObject() {
+ HashMap<String,Object> attempts = new HashMap<String,Object>();
+ attempts.put("attempt", attempt);
+ attempts.put("startTime", startTime);
+ attempts.put("endTime", endTime);
+ attempts.put("status", status.toString());
+ return attempts;
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutionReference.java b/src/java/azkaban/executor/ExecutionReference.java
index 89c1d38..930d11d 100644
--- a/src/java/azkaban/executor/ExecutionReference.java
+++ b/src/java/azkaban/executor/ExecutionReference.java
@@ -1,7 +1,5 @@
package azkaban.executor;
-import java.util.HashMap;
-
public class ExecutionReference {
private final int execId;
private final String host;
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 0ce8016..13eea40 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -43,23 +43,32 @@ public interface ExecutorLoader {
public boolean updateExecutableReference(int execId, long updateTime) throws ExecutorManagerException;
- public LogData fetchLogs(int execId, String name, int startByte, int endByte) throws ExecutorManagerException;
+ public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
- public void uploadLogFile(int execId, String name, File ... files) throws ExecutorManagerException;
+ public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
public void uploadExecutableNode(ExecutableNode node, Props inputParams) throws ExecutorManagerException;
- public ExecutableJobInfo fetchJobInfo(int execId, String jobId) throws ExecutorManagerException;
+ public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException;
+ public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt) throws ExecutorManagerException;
+
public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException;
- public void updateExecutableNode(ExecutableNode node, Props outputParams) throws ExecutorManagerException;
+ public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException;
public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException;
public int fetchNumExecutableFlows() throws ExecutorManagerException;
public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException;
+
+ public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException;
+
+ public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException;
+
+ public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException;
+
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fe807d3..a5add77 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
@@ -159,12 +160,12 @@ public class ExecutorManager {
return LogData.createLogDataFromObject(result);
}
else {
- LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", offset, length);
+ LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
return value;
}
}
- public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length) throws ExecutorManagerException {
+ public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
@@ -172,13 +173,14 @@ public class ExecutorManager {
Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
-
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
@SuppressWarnings("unchecked")
- Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam);
+ Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return LogData.createLogDataFromObject(result);
}
else {
- LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, offset, length);
+ LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
return value;
}
}
@@ -213,6 +215,58 @@ public class ExecutorManager {
}
}
+ public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
+ }
+
+ public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
+ }
+
+ public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
+ }
+
+ public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
+ }
+
+ public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
+ }
+
+ public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+ modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
+ synchronized(exFlow) {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
+ throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
+ }
+
+ for (String jobId: jobIds) {
+ if (!jobId.isEmpty()) {
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+ }
+ }
+ }
+ String ids = StringUtils.join(jobIds, ',');
+ Map<String, Object> response = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.MODIFY_EXECUTION_ACTION,
+ userId,
+ new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+ new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+
+ return response;
+ }
+ }
+
public void submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
synchronized(exflow) {
logger.info("Submitting execution flow " + exflow.getFlowId());
@@ -258,6 +312,14 @@ public class ExecutorManager {
}
}
+ private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user, Pair<String,String> ... params) throws ExecutorManagerException {
+ try {
+ return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, params);
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
private Map<String, Object> callExecutorServer(String host, int port, String action, Integer executionId, String user, Pair<String,String> ... params) throws IOException {
URIBuilder builder = new URIBuilder();
builder.setScheme("http")
src/java/azkaban/executor/JdbcExecutorLoader.java 152(+132 -20)
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index e7f7bf1..87478ac 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -152,7 +152,6 @@ public class JdbcExecutorLoader implements ExecutorLoader {
if (encType == EncodingType.GZIP) {
data = GZIPUtils.gzipBytes(stringData);
}
- logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
}
catch (IOException e) {
throw new ExecutorManagerException("Error encoding the execution flow.");
@@ -248,6 +247,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
QueryRunner runner = new QueryRunner(dataSource);
+
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
@@ -391,7 +391,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public void uploadExecutableNode(ExecutableNode node, Props inputProps) throws ExecutorManagerException {
- final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params) VALUES (?,?,?,?,?,?,?,?,?)";
+ final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempts) VALUES (?,?,?,?,?,?,?,?,?,?)";
byte[] inputParam = null;
if (inputProps != null) {
@@ -416,17 +416,20 @@ public class JdbcExecutorLoader implements ExecutorLoader {
node.getStartTime(),
node.getEndTime(),
node.getStatus().getNumVal(),
- inputParam);
+ inputParam,
+ node.getAttempt()
+ );
} catch (SQLException e) {
throw new ExecutorManagerException("Error writing job " + node.getJobId(), e);
}
}
@Override
- public void updateExecutableNode(ExecutableNode node, Props outputProps) throws ExecutorManagerException {
+ public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=?";
byte[] outputParam = null;
+ Props outputProps = node.getOutputProps();
if (outputProps != null) {
try {
String jsonString = JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
@@ -444,7 +447,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
node.getEndTime(),
node.getStatus().getNumVal(),
outputParam,
- node.getExecutionId(),
+ node.getFlow().getExecutionId(),
node.getJobId());
} catch (SQLException e) {
throw new ExecutorManagerException("Error updating job " + node.getJobId(), e);
@@ -452,7 +455,23 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
- public ExecutableJobInfo fetchJobInfo(int execId, String jobId) throws ExecutorManagerException {
+ public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, new FetchExecutableJobHandler(), execId, jobId);
+ if (info == null || info.isEmpty()) {
+ return null;
+ }
+
+ return info;
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error querying job info " + jobId, e);
+ }
+ }
+
+ @Override
+ public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts) throws ExecutorManagerException {
QueryRunner runner = new QueryRunner(dataSource);
try {
@@ -468,6 +487,42 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
+ public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
+ QueryRunner runner = new QueryRunner(dataSource);
+ try {
+ Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+ return props.getFirst();
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+ }
+ }
+
+ @Override
+ public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
+ QueryRunner runner = new QueryRunner(dataSource);
+ try {
+ Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+ return props.getFirst();
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+ }
+ }
+
+ @Override
+ public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
+ QueryRunner runner = new QueryRunner(dataSource);
+ try {
+ Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+ return props;
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+ }
+ }
+
+ @Override
public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException {
QueryRunner runner = new QueryRunner(dataSource);
@@ -484,13 +539,13 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
- public LogData fetchLogs(int execId, String name, int startByte, int length) throws ExecutorManagerException {
+ public LogData fetchLogs(int execId, String name, int attempt, int startByte, int length) throws ExecutorManagerException {
QueryRunner runner = new QueryRunner(dataSource);
FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
try {
- LogData result = runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name, startByte, startByte + length);
+ LogData result = runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name, attempt, startByte, startByte + length);
return result;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching logs " + execId + " : " + name, e);
@@ -498,10 +553,10 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
- public void uploadLogFile(int execId, String name, File ... files) throws ExecutorManagerException {
+ public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException {
Connection connection = getConnection();
try {
- uploadLogFile(connection, execId, name, files, defaultEncodingType);
+ uploadLogFile(connection, execId, name, attempt, files, defaultEncodingType);
connection.commit();
}
catch (SQLException e) {
@@ -515,7 +570,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
}
- private void uploadLogFile(Connection connection, int execId, String name, File[] files, EncodingType encType) throws ExecutorManagerException, IOException {
+ private void uploadLogFile(Connection connection, int execId, String name, int attempt, File[] files, EncodingType encType) throws ExecutorManagerException, IOException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
byte[] buffer = new byte[50*1024];
@@ -532,7 +587,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
while (size >= 0) {
if (pos + size == buffer.length) {
// Flush here.
- uploadLogPart(connection, execId, name, startByte, startByte + buffer.length, encType, buffer, buffer.length);
+ uploadLogPart(connection, execId, name, attempt, startByte, startByte + buffer.length, encType, buffer, buffer.length);
pos = 0;
length = buffer.length;
@@ -549,7 +604,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
// Final commit of buffer.
if (pos > 0) {
- uploadLogPart(connection, execId, name, startByte, startByte + pos, encType, buffer, pos);
+ uploadLogPart(connection, execId, name, attempt, startByte, startByte + pos, encType, buffer, pos);
}
}
catch (SQLException e) {
@@ -564,8 +619,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
- private void uploadLogPart(Connection connection, int execId, String name, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
- final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?)";
+ private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
+ final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?,?)";
QueryRunner runner = new QueryRunner();
byte[] buf = buffer;
@@ -576,7 +631,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
buf = Arrays.copyOf(buffer, length);
}
- runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, encType.getNumVal(), startByte, startByte + length, buf);
+ runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf);
}
private Connection getConnection() throws ExecutorManagerException {
@@ -608,7 +663,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
private static class FetchLogsHandler implements ResultSetHandler<LogData> {
- private static String FETCH_LOGS = "SELECT exec_id, name, enc_type, start_byte, end_byte, log FROM execution_logs WHERE exec_id=? AND name=? AND end_byte > ? AND start_byte <= ? ORDER BY start_byte";
+ private static String FETCH_LOGS = "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log FROM execution_logs WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? AND start_byte <= ? ORDER BY start_byte";
private int startByte;
private int endByte;
@@ -657,8 +712,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
private static class FetchExecutableJobHandler implements ResultSetHandler<List<ExecutableJobInfo>> {
- private static String FETCH_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status FROM execution_jobs WHERE exec_id=? AND job_id=?";
- private static String FETCH_PROJECT_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status FROM execution_jobs WHERE project_id=? AND job_id=? ORDER BY exec_id DESC LIMIT ?, ? ";
+ private static String FETCH_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=? AND attempt_id=?";
+ private static String FETCH_EXECUTABLE_NODE_ATTEMPTS = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static String FETCH_PROJECT_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE project_id=? AND job_id=? ORDER BY exec_id DESC LIMIT ?, ? ";
@Override
public List<ExecutableJobInfo> handle(ResultSet rs) throws SQLException {
@@ -676,8 +732,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
long startTime = rs.getLong(6);
long endTime = rs.getLong(7);
Status status = Status.fromInteger(rs.getInt(8));
+ int attempt = rs.getInt(9);
- ExecutableJobInfo info = new ExecutableJobInfo(execId, projectId, version, flowId, jobId, startTime, endTime, status);
+ ExecutableJobInfo info = new ExecutableJobInfo(execId, projectId, version, flowId, jobId, startTime, endTime, status, attempt);
execNodes.add(info);
} while (rs.next());
@@ -685,6 +742,61 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
}
+ private static class FetchExecutableJobPropsHandler implements ResultSetHandler<Pair<Props, Props>> {
+ private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params, output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Pair<Props, Props> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return new Pair<Props, Props>(null, null);
+ }
+
+ if (rs.getMetaData().getColumnCount() > 1) {
+ byte[] input = rs.getBytes(1);
+ byte[] output = rs.getBytes(2);
+
+ Props inputProps = null;
+ Props outputProps = null;
+ try {
+ if (input != null) {
+ String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
+ inputProps = PropsUtils.fromHierarchicalMap(
+ (Map<String, Object>)JSONUtils.parseJSONFromString(jsonInputString));
+
+ }
+ if (output != null) {
+ String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
+ outputProps = PropsUtils.fromHierarchicalMap(
+ (Map<String, Object>)JSONUtils.parseJSONFromString(jsonOutputString));
+ }
+ } catch (IOException e) {
+ throw new SQLException("Error decoding param data", e);
+ }
+
+ return new Pair<Props, Props>(inputProps, outputProps);
+ }
+ else {
+ byte[] params = rs.getBytes(1);
+ Props props = null;
+ try {
+ if (params != null) {
+ String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
+
+ props = PropsUtils.fromHierarchicalMap(
+ (Map<String, Object>)JSONUtils.parseJSONFromString(jsonProps));
+ }
+ } catch (IOException e) {
+ throw new SQLException("Error decoding param data", e);
+ }
+
+ return new Pair<Props,Props>(props, null);
+ }
+ }
+
+ }
private static class FetchActiveExecutableFlows implements ResultSetHandler<Map<Integer, Pair<ExecutionReference,ExecutableFlow>>> {
private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, ax.host host, ax.port port, ax.update_time axUpdateTime FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index a04db08..d43b0da 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -183,7 +183,6 @@ public class JobTypeManager
@SuppressWarnings("unchecked")
private void loadJob(File dir, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
-
Props conf = null;
Props sysConf = null;
File confFile = findFilefromDir(dir, JOBTYPECONFFILE);
@@ -250,7 +249,6 @@ public class JobTypeManager
public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
{
-
Job job;
try {
String jobType = jobProps.getString("type");
@@ -260,13 +258,13 @@ public class JobTypeManager
String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
}
- logger.info("Building " + jobType + " job executor. ");
+ logger.info("Building " + jobType + " job executor. ");
Class<? extends Object> executorClass = jobToClass.get(jobType);
if (executorClass == null) {
throw new JobExecutionException(
- String.format("Could not construct job[%s] of type[%s].", jobProps, jobType));
+ String.format("Job type '" + jobType + "' is unrecognized. Could not construct job[%s] of type[%s].", jobProps, jobType));
}
Props sysConf = jobtypeSysProps.get(jobType);
src/java/azkaban/utils/PropsUtils.java 17(+17 -0)
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index b81d0e8..e97c221 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -233,6 +233,23 @@ public class PropsUtils {
}
}
+ @SuppressWarnings("unchecked")
+ public static Props fromHierarchicalMap(Map<String, Object> propsMap) {
+ if (propsMap == null) {
+ return null;
+ }
+
+ String source = (String)propsMap.get("source");
+ Map<String, String> propsParams = (Map<String,String>)propsMap.get("props");
+
+ Map<String,Object> parent = (Map<String,Object>)propsMap.get("parent");
+ Props parentProps = fromHierarchicalMap(parent);
+
+ Props props = new Props(parentProps, propsParams);
+ props.setSource(source);
+ return props;
+ }
+
public static Map<String,Object> toHierarchicalMap(Props props) {
Map<String,Object> propsMap = new HashMap<String,Object>();
propsMap.put("source", props.getSource());
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2babb79..430af3d 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -428,6 +428,7 @@ public class AzkabanWebServer implements AzkabanServer {
public void run() {
logger.info("Shutting down http server...");
try {
+ app.getScheduleManager().shutdown();
server.stop();
server.destroy();
}
@@ -435,6 +436,7 @@ public class AzkabanWebServer implements AzkabanServer {
logger.error("Error while shutting down http server.", e);
}
logger.info("kk thx bye.");
+ System.exit(0);
}
});
logger.info("Server running on port " + sslPortNumber + ".");
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 581f5c1..df9dff9 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -32,7 +32,6 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorManagerException;
@@ -264,6 +263,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
}
+ else if (ajaxName.equals("retryFailedJobs")) {
+ ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
+ }
else if (ajaxName.equals("flowInfo")) {
//String projectName = getParam(req, "project");
//Project project = projectManager.getProject(projectName);
@@ -295,6 +297,27 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+ if (project == null) {
+ return;
+ }
+
+ if (exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.SUCCEEDED) {
+ ret.put("error", "Flow has already finished. Please re-execute.");
+ return;
+ }
+
+ String jobs = getParam(req, "jobIds");
+ String[] jobIds = jobs.split("\\s*,\\s*");
+
+ try {
+ executorManager.retryExecutingJobs(exFlow, user.getUserId(), jobIds);
+ } catch (ExecutorManagerException e) {
+ ret.put("error", e.getMessage());
+ }
+ }
+
/**
* Gets the logs through plain text stream to reduce memory overhead.
*
@@ -349,11 +372,19 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
int offset = this.getIntParam(req, "offset");
int length = this.getIntParam(req, "length");
+
String jobId = this.getParam(req, "jobId");
resp.setCharacterEncoding("utf-8");
try {
- LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length);
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+ return;
+ }
+
+ int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+ LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
if (data == null) {
ret.put("length", 0);
ret.put("offset", offset);
@@ -552,6 +583,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
nodeObj.put("startTime", node.getStartTime());
nodeObj.put("endTime", node.getEndTime());
+ // Add past attempts
+ if (node.getPastAttemptList() != null) {
+ ArrayList<Object> pastAttempts = new ArrayList<Object>();
+ for (ExecutableNode.Attempt attempt: node.getPastAttemptList()) {
+ pastAttempts.add(attempt.toObject());
+ }
+ nodeObj.put("pastAttempts", pastAttempts);
+ }
+
nodeList.add(nodeObj);
// Add edges
@@ -651,13 +691,23 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
}
- else {
- // Setup disabled
- Map<String, String> paramGroup = this.getParamGroup(req, "disable");
- for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
- boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-
- exflow.setNodeStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+ else if (hasParam(req, "disabled")) {
+ String disabled = getParam(req, "disabled");
+ String[] disabledNodes = disabled.split("\\s*,\\s*");
+
+ for (String node: disabledNodes) {
+ if (!node.isEmpty()) {
+ exflow.setNodeStatus(node, Status.DISABLED);
+ }
+ }
+ }
+
+ if (hasParam(req, "restartExecutionId")) {
+ int externalExecutionId = getIntParam(req, "restartExecutionId");
+ String proxyJobs = getParam(req, "proxyJobs");
+ String[] proxyJobsArray = proxyJobs.split("\\s*,\\s*");
+ for (String nodeId: proxyJobsArray) {
+ exflow.setProxyNodes(externalExecutionId, nodeId);
}
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 3a2e851..735a251 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -84,6 +84,7 @@
<li><div id="pausebtn" class="btn2">Pause</div></li>
<li><div id="resumebtn" class="btn2">Resume</div></li>
<li><div id="cancelbtn" class="btn6">Cancel</div></li>
+ <li><div id="retrybtn" class="btn1">Retry Failed</div></li>
<li><div id="executebtn" class="btn1">Prepare Execution</div></li>
</ul>
</div>
@@ -142,6 +143,7 @@
#parse( "azkaban/webapp/servlet/velocity/executionoptionspanel.vm" )
#end
+
<ul id="jobMenu" class="contextMenu">
<li class="open"><a href="#open">Open...</a></li>
<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
src/sql/create_execution_job.sql 6(+4 -2)
diff --git a/src/sql/create_execution_job.sql b/src/sql/create_execution_job.sql
index bcaf90a..b263128 100644
--- a/src/sql/create_execution_job.sql
+++ b/src/sql/create_execution_job.sql
@@ -4,13 +4,15 @@ CREATE TABLE execution_jobs (
version INT NOT NULL,
flow_id VARCHAR(128) NOT NULL,
job_id VARCHAR(128) NOT NULL,
+ attempt INT,
start_time BIGINT,
end_time BIGINT,
status TINYINT,
input_params LONGBLOB,
output_params LONGBLOB,
attachments LONGBLOB,
- PRIMARY KEY (exec_id, job_id),
+ PRIMARY KEY (exec_id, job_id, attempt),
+ INDEX exec_job (exec_id, job_id),
INDEX exec_id (exec_id),
INDEX job_id (project_id, job_id)
-) ENGINE=InnoDB;
\ No newline at end of file
+) ENGINE=InnoDB;
src/sql/create_execution_logs.sql 2(+2 -0)
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 330d4b2..07e0b7b 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -1,10 +1,12 @@
CREATE TABLE execution_logs (
exec_id INT NOT NULL,
name VARCHAR(128),
+ attempt INT,
enc_type TINYINT,
start_byte INT,
end_byte INT,
log LONGBLOB,
+ PRIMARY KEY (exec_id, name, attempt),
INDEX log_index (exec_id, name),
INDEX byte_log_index(exec_id, name, start_byte, end_byte)
) ENGINE=InnoDB;
src/sql/update_2.0_to_2.01.sql 13(+13 -0)
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
new file mode 100644
index 0000000..010f5ed
--- /dev/null
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -0,0 +1,13 @@
+/*
+For 2.01 Adds the attempt column to execution_jobs
+*/
+ALTER TABLE execution_jobs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_jobs DROP PRIMARY KEY;
+ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
+ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
+
+ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs DROP PRIMARY KEY;
+ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt);
+
+
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index d8e80fa..dc9547a 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -248,11 +248,19 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
}
}
+ var disabled = "";
+ var disabledMap = this.cloneModel.get('disabled');
+ for (var dis in disabledMap) {
+ if (disabledMap[dis]) {
+ disabled += dis + ",";
+ }
+ }
+
var executingData = {
project: projectName,
ajax: "executeFlow",
flow: flowName,
- disable: this.cloneModel.get('disabled'),
+ disabled: disabled,
failureAction: failureAction,
failureEmails: failureEmails,
successEmails: successEmails,
src/web/js/azkaban.exflow.view.js 35(+34 -1)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 91c72aa..cd6382c 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -76,6 +76,7 @@ azkaban.FlowTabView= Backbone.View.extend({
"click #executebtn" : "handleRestartClick",
"click #pausebtn" : "handlePauseClick",
"click #resumebtn" : "handleResumeClick",
+ "click #retrybtn" : "handleRetryClick"
},
initialize : function(settings) {
$("#cancelbtn").hide();
@@ -130,6 +131,7 @@ azkaban.FlowTabView= Backbone.View.extend({
$("#executebtn").hide();
$("#pausebtn").hide();
$("#resumebtn").hide();
+ $("#retrybtn").hide();
if(data.status=="SUCCEEDED") {
$("#executebtn").show();
@@ -139,7 +141,8 @@ azkaban.FlowTabView= Backbone.View.extend({
}
else if (data.status=="FAILED_FINISHING") {
$("#cancelbtn").show();
- $("#executebtn").show();
+ $("#executebtn").hide();
+ $("#retrybtn").show();
}
else if (data.status=="RUNNING") {
$("#cancelbtn").show();
@@ -174,6 +177,36 @@ azkaban.FlowTabView= Backbone.View.extend({
}
);
},
+ handleRetryClick : function(evt) {
+ var graphData = graphModel.get("data");
+
+ var failedJobs = new Array();
+ var failedJobStr = "";
+ var nodes = graphData.nodes;
+ for (var i = 0; i < nodes.length; ++i) {
+ var node = nodes[i];
+ if(node.status=='FAILED') {
+ failedJobs.push(node.id);
+ }
+ }
+ failedJobStr = failedJobs.join();
+
+ var requestURL = contextURL + "/executor";
+ ajaxCall(
+ requestURL,
+ {"execid": execId, "ajax":"retryFailedJobs", "jobIds":failedJobStr},
+ function(data) {
+ console.log("cancel clicked");
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ showDialog("Retry", "Flow has been retried.");
+ setTimeout(function() {updateStatus();}, 1100);
+ }
+ }
+ );
+ },
handleRestartClick : function(evt) {
executeFlowView.show();
},
unit/build.xml 26(+24 -2)
diff --git a/unit/build.xml b/unit/build.xml
index d949de5..b35b866 100644
--- a/unit/build.xml
+++ b/unit/build.xml
@@ -8,9 +8,18 @@
<property name="java.src.dir" value="${base.dir}/unit/java" />
<property name="job.conf.dir" value="${base.dir}/unit/executions/exectest1" />
-
+ <property name="job.conf.dir2" value="${base.dir}/unit/executions/exectest2" />
+
<property environment="env" />
+ <path id="main.classpath">
+ <fileset dir="../lib">
+ <include name="*.jar" />
+ </fileset>
+
+ <pathelement path="${dist.classes.dir}" />
+ </path>
+
<!-- set the build number based on environment variable, otherwise blank -->
<property environment="env" description="System environment variables (including those set by Hudson)" />
@@ -29,7 +38,8 @@
<mkdir dir="${dist.packages.dir}" />
<javac fork="true" destdir="${dist.classes.dir}"
- target="1.6" debug="true" deprecation="false" failonerror="true" srcdir="${java.src.dir}/azkaban/test/executor" includes="*Job.java">
+ target="1.6" debug="true" deprecation="false" failonerror="true" srcdir="${java.src.dir}/azkaban/test/executor" includes="SleepJavaJob.java">
+ <classpath refid="main.classpath" />
</javac>
</target>
@@ -54,4 +64,16 @@
</zip>
</target>
+ <target name="package-exectest2" depends="jars" description="Creates a test zip">
+ <delete dir="${dist.packages.dir}" />
+ <mkdir dir="${dist.packages.dir}" />
+
+ <!-- Tarball it -->
+ <zip destfile="${dist.packages.dir}/exectest2.zip">
+ <zipfileset dir="${dist.jar.dir}" />
+ <zipfileset dir="${job.conf.dir2}" />
+ </zip>
+ </target>
+
+
</project>
diff --git a/unit/executions/exectest2/failflow.job b/unit/executions/exectest2/failflow.job
new file mode 100644
index 0000000..bfd7ce6
--- /dev/null
+++ b/unit/executions/exectest2/failflow.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+fail=false
+dependencies=myjob3,myjob5
\ No newline at end of file
unit/executions/exectest2/myjob1.job 4(+4 -0)
diff --git a/unit/executions/exectest2/myjob1.job b/unit/executions/exectest2/myjob1.job
new file mode 100644
index 0000000..917929e
--- /dev/null
+++ b/unit/executions/exectest2/myjob1.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+fail=false
diff --git a/unit/executions/exectest2/myjob2-fail20.job b/unit/executions/exectest2/myjob2-fail20.job
new file mode 100644
index 0000000..4e02239
--- /dev/null
+++ b/unit/executions/exectest2/myjob2-fail20.job
@@ -0,0 +1,6 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=20
+fail=true
+passRetry=2
+dependencies=myjob1
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob2-fail30.job b/unit/executions/exectest2/myjob2-fail30.job
new file mode 100644
index 0000000..908bfed
--- /dev/null
+++ b/unit/executions/exectest2/myjob2-fail30.job
@@ -0,0 +1,6 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=30
+fail=true
+passRetry=2
+dependencies=myjob1
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob2-pass50.job b/unit/executions/exectest2/myjob2-pass50.job
new file mode 100644
index 0000000..d9553ca
--- /dev/null
+++ b/unit/executions/exectest2/myjob2-pass50.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=50
+fail=false
+dependencies=myjob1
\ No newline at end of file
unit/executions/exectest2/myjob3.job 5(+5 -0)
diff --git a/unit/executions/exectest2/myjob3.job b/unit/executions/exectest2/myjob3.job
new file mode 100644
index 0000000..fe614fa
--- /dev/null
+++ b/unit/executions/exectest2/myjob3.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=240
+fail=false
+dependencies=myjob2-pass50
unit/executions/exectest2/myjob4.job 5(+5 -0)
diff --git a/unit/executions/exectest2/myjob4.job b/unit/executions/exectest2/myjob4.job
new file mode 100644
index 0000000..d81f9f2
--- /dev/null
+++ b/unit/executions/exectest2/myjob4.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=20
+fail=false
+dependencies=myjob2-fail20
\ No newline at end of file
unit/executions/exectest2/myjob5.job 5(+5 -0)
diff --git a/unit/executions/exectest2/myjob5.job b/unit/executions/exectest2/myjob5.job
new file mode 100644
index 0000000..9b17129
--- /dev/null
+++ b/unit/executions/exectest2/myjob5.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=20
+fail=false
+dependencies=myjob4,myjob2-fail30
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 8030d93..124c5d9 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -60,7 +60,7 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public void uploadLogFile(int execId, String name, File... files) throws ExecutorManagerException {
+ public void uploadLogFile(int execId, String name, int attempt, File... files) throws ExecutorManagerException {
}
@@ -80,7 +80,7 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public void updateExecutableNode(ExecutableNode node, Props outputParams) throws ExecutorManagerException {
+ public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
ExecutableNode foundNode = nodes.get(node.getJobId());
foundNode.setEndTime(node.getEndTime());
foundNode.setStartTime(node.getStartTime());
@@ -118,7 +118,7 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public ExecutableJobInfo fetchJobInfo(int execId, String jobId)
+ public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
throws ExecutorManagerException {
// TODO Auto-generated method stub
return null;
@@ -131,7 +131,7 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public LogData fetchLogs(int execId, String name, int startByte, int endByte) throws ExecutorManagerException {
+ public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException {
// TODO Auto-generated method stub
return null;
}
@@ -162,5 +162,29 @@ public class MockExecutorLoader implements ExecutorLoader {
return 0;
}
+ @Override
+ public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 19175a1..ba673dd 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -221,6 +221,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
}
+
@Test
public void testUploadExecutableNode() throws Exception {
if (!isTestSetup()) {
@@ -228,16 +229,17 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = createExecutableFlow(10, "exec1");
flow.setExecutionId(10);
File jobFile = new File(flowDir, "job10.job");
Props props = new Props(null, jobFile);
props.put("test","test2");
ExecutableNode oldNode = flow.getExecutableNode("job10");
+ oldNode.setStartTime(System.currentTimeMillis());
loader.uploadExecutableNode(oldNode, props);
- ExecutableJobInfo info = loader.fetchJobInfo(10, "job10");
+ ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
Assert.assertEquals(flow.getExecutionId(), info.getExecId());
Assert.assertEquals(flow.getProjectId(), info.getProjectId());
Assert.assertEquals(flow.getVersion(), info.getVersion());
@@ -246,6 +248,23 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(oldNode.getStatus(), info.getStatus());
Assert.assertEquals(oldNode.getStartTime(), info.getStartTime());
Assert.assertEquals("endTime = " + oldNode.getEndTime() + " info endTime = " + info.getEndTime(), oldNode.getEndTime(), info.getEndTime());
+
+ // Fetch props
+ Props outputProps = new Props();
+ outputProps.put("hello", "output");
+ oldNode.setOutputProps(outputProps);
+ oldNode.setEndTime(System.currentTimeMillis());
+ loader.updateExecutableNode(oldNode);
+
+ Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
+ Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
+ Pair<Props,Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
+
+ Assert.assertEquals(fInputProps.get("test"), "test2");
+ Assert.assertEquals(fOutputProps.get("hello"), "output");
+ Assert.assertEquals(inOutProps.getFirst().get("test"), "test2");
+ Assert.assertEquals(inOutProps.getSecond().get("hello"), "output");
+
}
@Test
@@ -307,19 +326,19 @@ public class JdbcExecutorLoaderTest {
File[] smalllog = {new File(logDir, "log1.log"), new File(logDir, "log2.log"), new File(logDir, "log3.log")};
ExecutorLoader loader = createLoader();
- loader.uploadLogFile(1, "smallFiles", smalllog);
+ loader.uploadLogFile(1, "smallFiles", 0, smalllog);
- LogData data = loader.fetchLogs(1, "smallFiles", 0, 50000);
+ LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
Assert.assertNotNull(data);
Assert.assertEquals("Logs length is " + data.getLength(), data.getLength(), 53);
System.out.println(data.toString());
- LogData data2 = loader.fetchLogs(1, "smallFiles", 10, 20);
- Assert.assertNotNull(data2);
- Assert.assertEquals("Logs length is " + data2.getLength(), data2.getLength(), 10);
-
+ LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
System.out.println(data2.toString());
+ Assert.assertNotNull(data2);
+ Assert.assertEquals("Logs length is " + data2.getLength(), data2.getLength(), 20);
+
}
@Test
@@ -330,30 +349,41 @@ public class JdbcExecutorLoaderTest {
File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
ExecutorLoader loader = createLoader();
- loader.uploadLogFile(1, "largeFiles", largelog);
+ loader.uploadLogFile(1, "largeFiles",0, largelog);
- LogData logsResult = loader.fetchLogs(1, "largeFiles", 0, 64000);
+ LogData logsResult = loader.fetchLogs(1, "largeFiles",0, 0, 64000);
Assert.assertNotNull(logsResult);
Assert.assertEquals("Logs length is " + logsResult.getLength(), logsResult.getLength(), 64000);
- LogData logsResult2 = loader.fetchLogs(1, "largeFiles", 1000, 64000);
+ LogData logsResult2 = loader.fetchLogs(1, "largeFiles",0, 1000, 64000);
Assert.assertNotNull(logsResult2);
- Assert.assertEquals("Logs length is " + logsResult2.getLength(), logsResult2.getLength(), 63000);
+ Assert.assertEquals("Logs length is " + logsResult2.getLength(), logsResult2.getLength(), 64000);
- LogData logsResult3 = loader.fetchLogs(1, "largeFiles", 330000, 400000);
+ LogData logsResult3 = loader.fetchLogs(1, "largeFiles",0, 330000, 400000);
Assert.assertNotNull(logsResult3);
Assert.assertEquals("Logs length is " + logsResult3.getLength(), logsResult3.getLength(), 5493);
- LogData logsResult4 = loader.fetchLogs(1, "largeFiles", 340000, 400000);
+ LogData logsResult4 = loader.fetchLogs(1, "largeFiles",0, 340000, 400000);
Assert.assertNull(logsResult4);
- LogData logsResult5 = loader.fetchLogs(1, "largeFiles", 153600, 204800);
+ LogData logsResult5 = loader.fetchLogs(1, "largeFiles",0, 153600, 204800);
Assert.assertNotNull(logsResult5);
- Assert.assertEquals("Logs length is " + logsResult5.getLength(), logsResult5.getLength(), 51200);
+ Assert.assertEquals("Logs length is " + logsResult5.getLength(), logsResult5.getLength(), 181893);
- LogData logsResult6 = loader.fetchLogs(1, "largeFiles", 150000, 250000);
+ LogData logsResult6 = loader.fetchLogs(1, "largeFiles",0, 150000, 250000);
Assert.assertNotNull(logsResult6);
- Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 100000);
+ Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
+ }
+
+ private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
+ File jsonFlowFile = new File(flowDir, flowName + ".flow");
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ ExecutableFlow execFlow = new ExecutableFlow(executionId, flow);
+
+ return execFlow;
}
private ExecutableFlow createExecutableFlow(String flowName) throws IOException {
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index 29b0bb8..0bba1ec 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -1,12 +1,17 @@
package azkaban.test.executor;
+import java.io.File;
+import java.io.FileFilter;
import java.util.Map;
public class SleepJavaJob {
private boolean fail;
private String seconds;
+ private int attempts;
+ private String id;
public SleepJavaJob(String id, Map<String, String> parameters) {
+ this.id = id;
String failStr = parameters.get("fail");
if (failStr == null || failStr.equals("false")) {
fail = false;
@@ -15,6 +20,13 @@ public class SleepJavaJob {
fail = true;
}
+ String attemptString = parameters.get("passRetry");
+ if (attemptString == null) {
+ attempts = -1;
+ }
+ else {
+ attempts = Integer.valueOf(attemptString);
+ }
seconds = parameters.get("seconds");
System.out.println("Properly created");
}
@@ -34,8 +46,19 @@ public class SleepJavaJob {
}
}
+ File file = new File("");
+ File[] attemptFiles = file.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith(id);
+ }});
+
if (fail) {
- throw new Exception("I failed because I had to.");
+ if (attempts <= 0 || attemptFiles == null || attemptFiles.length > attempts) {
+ File attemptFile = new File(file, id + "." + (attemptFiles == null ? 0 : attemptFiles.length));
+ attemptFile.mkdir();
+ throw new Exception("I failed because I had to.");
+ }
}
}
@@ -46,4 +69,6 @@ public class SleepJavaJob {
this.notifyAll();
}
}
+
+
}