azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 61(+41 -20)
src/java/azkaban/execapp/JobRunner.java 41(+22 -19)
src/java/azkaban/executor/ExecutableFlow.java 409(+81 -328)
src/java/azkaban/executor/ExecutableFlowBase.java 273(+273 -0)
src/java/azkaban/executor/ExecutableNode.java 521(+266 -255)
src/java/azkaban/scheduler/ScheduleManager.java 326(+191 -135)
src/java/azkaban/webapp/servlet/ExecutorServlet.java 628(+352 -276)
Details
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index afe9248..4ba5889 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -43,14 +43,19 @@ public class LocalFlowWatcher extends FlowWatcher {
Object data = event.getData();
if (data instanceof ExecutableNode) {
ExecutableNode node = (ExecutableNode)data;
- handleJobFinished(node.getJobId(), node.getStatus());
+
+ if (node.getId()) {
+
+ }
+
+ handleJobFinished(node.getId(), node.getStatus());
}
}
else if (event.getRunner() instanceof JobRunner) {
JobRunner runner = (JobRunner)event.getRunner();
ExecutableNode node = runner.getNode();
- handleJobFinished(node.getJobId(), node.getStatus());
+ handleJobFinished(node.getId(), node.getStatus());
}
}
else if (event.getType() == Type.FLOW_FINISHED) {
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index ec60025..cecf1bc 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -1,6 +1,6 @@
package azkaban.execapp.event;
-import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
@@ -11,7 +11,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
private int execId;
private ExecutorLoader loader;
- private ExecutableFlow flow;
+ private ExecutableFlowBase flow;
private RemoteUpdaterThread thread;
private boolean isShutdown = false;
@@ -46,7 +46,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
@Override
public void run() {
do {
- ExecutableFlow updateFlow = null;
+ ExecutableFlowBase updateFlow = null;
try {
updateFlow = loader.fetchExecutableFlow(execId);
} catch (ExecutorManagerException e) {
@@ -63,7 +63,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
flow.setUpdateTime(updateFlow.getUpdateTime());
for (ExecutableNode node : flow.getExecutableNodes()) {
- String jobId = node.getJobId();
+ String jobId = node.getId();
ExecutableNode newNode = updateFlow.getExecutableNode(jobId);
long updateTime = node.getUpdateTime();
node.setUpdateTime(newNode.getUpdateTime());
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index d342f0f..f73b88c 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -16,7 +16,7 @@ import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import azkaban.executor.ConnectorParams;
-import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutorManagerException;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
@@ -215,7 +215,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
long updateTime = JSONUtils.getLongFromObject(updateTimesList.get(i));
int execId = (Integer)execIDList.get(i);
- ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execId);
+ ExecutableFlowBase flow = flowRunnerManager.getExecutableFlow(execId);
if (flow == null) {
Map<String, Object> errorResponse = new HashMap<String,Object>();
errorResponse.put(RESPONSE_ERROR, "Flow does not exist");
@@ -243,7 +243,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
private void handleAjaxFlowStatus(Map<String, Object> respMap, int execid) {
- ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
+ ExecutableFlowBase flow = flowRunnerManager.getExecutableFlow(execid);
if (flow == null) {
respMap.put(STATUS_PARAM, RESPONSE_NOTFOUND);
}
diff --git a/src/java/azkaban/execapp/FlowBlocker.java b/src/java/azkaban/execapp/FlowBlocker.java
new file mode 100644
index 0000000..2d64073
--- /dev/null
+++ b/src/java/azkaban/execapp/FlowBlocker.java
@@ -0,0 +1,7 @@
+package azkaban.execapp;
+
+import azkaban.executor.ExecutableNode;
+
+public interface FlowBlocker {
+ public boolean readyToRun(ExecutableNode node);
+}
src/java/azkaban/execapp/FlowRunner.java 61(+41 -20)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index d38aa47..38ece4a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -24,6 +24,7 @@ import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
@@ -71,7 +72,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private JobRunnerEventListener listener = new JobRunnerEventListener();
private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
-
+
// Used for pipelining
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
@@ -314,24 +315,24 @@ public class FlowRunner extends EventHandler implements Runnable {
Props outputProps = collectOutputProps(node);
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node, outputProps);
- logger.info("Submitting job " + node.getJobId() + " to run.");
+ logger.info("Submitting job " + node.getId() + " to run.");
try {
executorService.submit(runner);
- jobRunners.put(node.getJobId(), runner);
- activeJobRunners.put(node.getJobId(), runner);
+ jobRunners.put(node.getId(), runner);
+ activeJobRunners.put(node.getId(), runner);
} catch (RejectedExecutionException e) {
logger.error(e);
};
} // If killed, then auto complete and KILL
else if (node.getStatus() == Status.KILLED) {
- logger.info("Killing " + node.getJobId() + " due to prior errors.");
+ logger.info("Killing " + node.getId() + " due to prior errors.");
node.setStartTime(currentTime);
node.setEndTime(currentTime);
fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
} // If disabled, then we auto skip
else if (node.getStatus() == Status.DISABLED) {
- logger.info("Skipping disabled job " + node.getJobId() + ".");
+ logger.info("Skipping disabled job " + node.getId() + ".");
node.setStartTime(currentTime);
node.setEndTime(currentTime);
node.setStatus(Status.SKIPPED);
@@ -422,6 +423,26 @@ public class FlowRunner extends EventHandler implements Runnable {
return jobsToRun;
}
+ private List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
+ ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+ for (ExecutableNode node : flow.getExecutableNodes()) {
+ if (Status.isStatusFinished(node.getStatus())) {
+ continue;
+ }
+ else {
+ // Check the dependencies to see if execution conditions are met,
+ // and what the status should be set to.
+ Status impliedStatus = getImpliedStatus(node);
+ if (getImpliedStatus(node) != null) {
+ node.setStatus(impliedStatus);
+ jobsToRun.add(node);
+ }
+ }
+ }
+
+ return jobsToRun;
+ }
+
private boolean isFlowFinished() {
if (!activeJobRunners.isEmpty()) {
return false;
@@ -453,7 +474,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
- String source = node.getJobPropsSource();
+ String source = node.getJobSource();
String propsSource = node.getPropsSource();
// If no properties are set, we just set the global properties.
@@ -481,11 +502,11 @@ public class FlowRunner extends EventHandler implements Runnable {
// load the override props if any
try {
- prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getJobId()+".jor");
+ prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
}
catch(ProjectManagerException e) {
e.printStackTrace();
- logger.error("Error loading job override property for job " + node.getJobId());
+ logger.error("Error loading job override property for job " + node.getId());
}
if(prop == null) {
// if no override prop, load the original one on disk
@@ -493,7 +514,7 @@ public class FlowRunner extends EventHandler implements Runnable {
prop = new Props(null, path);
} catch (IOException e) {
e.printStackTrace();
- logger.error("Error loading job file " + source + " for job " + node.getJobId());
+ logger.error("Error loading job file " + source + " for job " + node.getId());
}
}
// setting this fake source as this will be used to determine the location of log files.
@@ -594,7 +615,7 @@ public class FlowRunner extends EventHandler implements Runnable {
ArrayList<String> failures = new ArrayList<String>();
for (ExecutableNode node: flow.getExecutableNodes()) {
if (node.getStatus() == Status.FAILED) {
- failures.add(node.getJobId());
+ failures.add(node.getId());
}
else if (node.getStatus() == Status.KILLED) {
node.setStartTime(-1);
@@ -620,7 +641,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Resets the status and increments the attempt number
node.resetForRetry();
reEnableDependents(node);
- logger.info("Re-enabling job " + node.getJobId() + " attempt " + node.getAttempt());
+ logger.info("Re-enabling job " + node.getId() + " attempt " + node.getAttempt());
}
else {
logger.error("Cannot retry job " + jobId + " since it hasn't run yet. User " + user);
@@ -734,12 +755,12 @@ public class FlowRunner extends EventHandler implements Runnable {
else if (event.getType() == Type.JOB_FINISHED) {
synchronized(mainSyncObj) {
ExecutableNode node = runner.getNode();
- activeJobRunners.remove(node.getJobId());
+ activeJobRunners.remove(node.getId());
- logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
+ logger.info("Job Finished " + node.getId() + " with status " + node.getStatus());
if (runner.getOutputProps() != null) {
- logger.info("Job " + node.getJobId() + " had output props.");
- jobOutputProps.put(node.getJobId(), runner.getOutputProps());
+ logger.info("Job " + node.getId() + " had output props.");
+ jobOutputProps.put(node.getId(), runner.getOutputProps());
}
updateFlow();
@@ -747,14 +768,14 @@ public class FlowRunner extends EventHandler implements Runnable {
if (node.getStatus() == Status.FAILED) {
// Retry failure if conditions are met.
if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
- logger.info("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+ logger.info("Job " + node.getId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
node.setDelayedExecution(runner.getRetryBackoff());
node.resetForRetry();
}
else {
if (!runner.isCancelled() && runner.getRetries() > 0) {
- logger.info("Job " + node.getJobId() + " has run out of retry attempts");
+ logger.info("Job " + node.getId() + " has run out of retry attempts");
// Setting delayed execution to 0 in case this is manually re-tried.
node.setDelayedExecution(0);
}
@@ -796,7 +817,7 @@ public class FlowRunner extends EventHandler implements Runnable {
public File getJobLogFile(String jobId, int attempt) {
ExecutableNode node = flow.getExecutableNode(jobId);
- File path = new File(execDir, node.getJobPropsSource());
+ File path = new File(execDir, node.getJobSource());
String logFileName = JobRunner.createLogFileName(execId, jobId, attempt);
File logFile = new File(path.getParentFile(), logFileName);
@@ -810,7 +831,7 @@ public class FlowRunner extends EventHandler implements Runnable {
public File getJobMetaDataFile(String jobId, int attempt) {
ExecutableNode node = flow.getExecutableNode(jobId);
- File path = new File(execDir, node.getJobPropsSource());
+ File path = new File(execDir, node.getJobSource());
String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
File metaDataFile = new File(path.getParentFile(), metaDataFileName);
diff --git a/src/java/azkaban/execapp/FlowRunnerHelper.java b/src/java/azkaban/execapp/FlowRunnerHelper.java
new file mode 100644
index 0000000..7c8be47
--- /dev/null
+++ b/src/java/azkaban/execapp/FlowRunnerHelper.java
@@ -0,0 +1,97 @@
+package azkaban.execapp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+public class FlowRunnerHelper {
+ public static boolean isFlowFinished(ExecutableFlow flow) {
+ for (String end: flow.getEndNodes()) {
+ ExecutableNode node = flow.getExecutableNode(end);
+ if (!Status.isStatusFinished(node.getStatus()) ) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public static List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
+ ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+ for (ExecutableNode node : flow.getExecutableNodes()) {
+ if (Status.isStatusFinished(node.getStatus())) {
+ continue;
+ }
+ else {
+ // Check the dependencies to see if execution conditions are met,
+ // and what the status should be set to.
+ Status impliedStatus = getImpliedStatus(node);
+ if (impliedStatus != null) {
+ node.setStatus(impliedStatus);
+ jobsToRun.add(node);
+ }
+ else if (node instanceof ExecutableFlowBase && node.getStatus() == Status.RUNNING) {
+ // We want to seek into a running flow
+ }
+ }
+ }
+
+ return jobsToRun;
+ }
+
+ public static Status getImpliedStatus(ExecutableNode node) {
+ switch(node.getStatus()) {
+ case FAILED:
+ case KILLED:
+ case SKIPPED:
+ case SUCCEEDED:
+ case FAILED_SUCCEEDED:
+ case QUEUED:
+ case RUNNING:
+ return null;
+ default:
+ break;
+ }
+
+ ExecutableFlowBase flow = node.getParentFlow();
+
+ boolean shouldKill = false;
+ for (String dependency : node.getInNodes()) {
+ ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+
+ Status depStatus = dependencyNode.getStatus();
+ switch (depStatus) {
+ case FAILED:
+ case KILLED:
+ shouldKill = true;
+ case SKIPPED:
+ case SUCCEEDED:
+ case FAILED_SUCCEEDED:
+ continue;
+ case RUNNING:
+ case QUEUED:
+ case DISABLED:
+ return null;
+ default:
+ // Return null means it's not ready to run.
+ return null;
+ }
+ }
+
+ if (shouldKill || flow.getStatus() == Status.KILLED || flow.getStatus() == Status.FAILED) {
+ return Status.KILLED;
+ }
+
+ // If it's disabled but ready to run, we want to make sure it continues being disabled.
+ if (node.getStatus() == Status.DISABLED) {
+ return Status.DISABLED;
+ }
+
+ // All good to go, ready to run.
+ return Status.READY;
+ }
+}
src/java/azkaban/execapp/JobRunner.java 41(+22 -19)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index a363284..c4ea32f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -67,6 +67,7 @@ public class JobRunner extends EventHandler implements Runnable {
private Job job;
private int executionId = -1;
+ private String jobId;
private static final Object logCreatorLock = new Object();
private Object syncObject = new Object();
@@ -91,7 +92,9 @@ public class JobRunner extends EventHandler implements Runnable {
this.props = props;
this.node = node;
this.workingDir = workingDir;
- this.executionId = node.getExecutionId();
+
+ this.executionId = node.getParentFlow().getExecutionId();
+ this.jobId = node.getId();
this.loader = loader;
this.jobtypeManager = jobtypeManager;
}
@@ -115,10 +118,10 @@ public class JobRunner extends EventHandler implements Runnable {
this.pipelineLevel = pipelineLevel;
if (this.pipelineLevel == 1) {
- pipelineJobs.add(node.getJobId());
+ pipelineJobs.add(node.getId());
}
else if (this.pipelineLevel == 2) {
- pipelineJobs.add(node.getJobId());
+ pipelineJobs.add(node.getId());
pipelineJobs.addAll(node.getOutNodes());
}
}
@@ -142,11 +145,11 @@ public class JobRunner extends EventHandler implements Runnable {
private void createLogger() {
// Create logger
synchronized (logCreatorLock) {
- String loggerName = System.currentTimeMillis() + "." + executionId + "." + node.getJobId();
+ String loggerName = System.currentTimeMillis() + "." + this.executionId + "." + this.jobId;
logger = Logger.getLogger(loggerName);
// Create file appender
- String logName = createLogFileName(node.getExecutionId(), node.getJobId(), node.getAttempt());
+ String logName = createLogFileName(this.executionId, this.jobId, node.getAttempt());
logFile = new File(workingDir, logName);
String absolutePath = logFile.getAbsolutePath();
@@ -158,7 +161,7 @@ public class JobRunner extends EventHandler implements Runnable {
jobAppender = fileAppender;
logger.addAppender(jobAppender);
} catch (IOException e) {
- flowLogger.error("Could not open log file in " + workingDir + " for job " + node.getJobId(), e);
+ flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
}
}
}
@@ -175,13 +178,13 @@ public class JobRunner extends EventHandler implements Runnable {
node.setUpdateTime(System.currentTimeMillis());
loader.updateExecutableNode(node);
} catch (ExecutorManagerException e) {
- flowLogger.error("Could not update job properties in db for " + node.getJobId(), e);
+ flowLogger.error("Could not update job properties in db for " + this.jobId, e);
}
}
@Override
public void run() {
- Thread.currentThread().setName("JobRunner-" + node.getJobId() + "-" + executionId);
+ Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + executionId);
if (node.getStatus() == Status.DISABLED) {
node.setStartTime(System.currentTimeMillis());
@@ -220,7 +223,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
if (!blockingStatus.isEmpty()) {
- logger.info("Pipeline job " + node.getJobId() + " waiting on " + blockedList + " in execution " + watcher.getExecId());
+ logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
for(BlockingStatus bStatus: blockingStatus) {
logger.info("Waiting on pipelined job " + bStatus.getJobId());
@@ -253,7 +256,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.wait(delayStartMs);
logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
} catch (InterruptedException e) {
- logger.error("Job " + node.getJobId() + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+ logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
}
}
@@ -286,7 +289,7 @@ public class JobRunner extends EventHandler implements Runnable {
node.setEndTime(System.currentTimeMillis());
- logInfo("Finishing job " + node.getJobId() + " at " + node.getEndTime());
+ logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
closeLogger();
writeStatus();
@@ -304,13 +307,13 @@ public class JobRunner extends EventHandler implements Runnable {
Arrays.sort(files, Collections.reverseOrder());
- loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), files);
+ loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
} catch (ExecutorManagerException e) {
- flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
+ flowLogger.error("Error writing out logs for job " + this.jobId, e);
}
}
else {
- flowLogger.info("Log file for job " + node.getJobId() + " is null");
+ flowLogger.info("Log file for job " + this.jobId + " is null");
}
}
fireEvent(Event.create(this, Type.JOB_FINISHED));
@@ -340,13 +343,13 @@ public class JobRunner extends EventHandler implements Runnable {
}
if (node.getAttempt() > 0) {
- logInfo("Starting job " + node.getJobId() + " attempt " + node.getAttempt() + " at " + node.getStartTime());
+ logInfo("Starting job " + this.jobId + " attempt " + node.getAttempt() + " at " + node.getStartTime());
}
else {
- logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
+ logInfo("Starting job " + this.jobId + " at " + node.getStartTime());
}
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
- props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, node.getJobId(), node.getAttempt()));
+ props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
node.setStatus(Status.RUNNING);
// Ability to specify working directory
@@ -357,13 +360,13 @@ public class JobRunner extends EventHandler implements Runnable {
if(props.containsKey("user.to.proxy")) {
String jobProxyUser = props.getString("user.to.proxy");
if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
- logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
+ logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
return false;
}
}
try {
- job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+ job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
}
catch (JobTypeManagerException e) {
logger.error("Failed to build job type, skipping this job");
src/java/azkaban/executor/ExecutableFlow.java 409(+81 -328)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 152a546..6141959 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -13,84 +13,66 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
-
package azkaban.executor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import azkaban.executor.ExecutableNode.Attempt;
-import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
-import azkaban.flow.Node;
+import azkaban.project.Project;
import azkaban.utils.JSONUtils;
-public class ExecutableFlow {
+public class ExecutableFlow extends ExecutableFlowBase {
+ public static final String EXECUTIONID_PARAM = "executionId";
+ public static final String EXECUTIONPATH_PARAM ="executionPath";
+ public static final String EXECUTIONOPTIONS_PARAM ="executionOptions";
+ public static final String PROJECTID_PARAM ="projectId";
+ public static final String SCHEDULEID_PARAM ="scheduleId";
+ public static final String SUBMITUSER_PARAM = "submitUser";
+ public static final String SUBMITTIME_PARAM = "submitUser";
+ public static final String VERSION_PARAM = "version";
+ public static final String PROXYUSERS_PARAM = "proxyUsers";
+
private int executionId = -1;
- private String flowId;
private int scheduleId = -1;
private int projectId;
private int version;
-
- private String executionPath;
-
- private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
- private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
- private ArrayList<String> startNodes;
- private ArrayList<String> endNodes;
-
private long submitTime = -1;
- private long startTime = -1;
- private long endTime = -1;
- private long updateTime = -1;
-
- private Status flowStatus = Status.READY;
private String submitUser;
+ private String executionPath;
+ private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashSet<String> proxyUsers = new HashSet<String>();
private ExecutionOptions executionOptions;
- public ExecutableFlow(Flow flow) {
- this.projectId = flow.getProjectId();
+ public ExecutableFlow(Project project, Flow flow) {
+ this.projectId = project.getId();
+ this.version = project.getVersion();
this.scheduleId = -1;
- this.flowId = flow.getId();
- this.version = flow.getVersion();
- this.setFlow(flow);
+
+ this.setFlow(project, flow);
}
- public ExecutableFlow(int executionId, Flow flow) {
- this.projectId = flow.getProjectId();
- this.scheduleId = -1;
- this.flowId = flow.getId();
- this.version = flow.getVersion();
- this.executionId = executionId;
-
- this.setFlow(flow);
+ public ExecutableFlow(Flow flow) {
+ this.setFlow(null, flow);
}
public ExecutableFlow() {
}
-
- public long getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(long updateTime) {
- this.updateTime = updateTime;
- }
-
- public List<ExecutableNode> getExecutableNodes() {
- return new ArrayList<ExecutableNode>(executableNodes.values());
+
+ @Override
+ public String getId() {
+ return getFlowId();
}
- public ExecutableNode getExecutableNode(String id) {
- return executableNodes.get(id);
+ @Override
+ public ExecutableFlow getExecutableFlow() {
+ return this;
}
public Collection<FlowProps> getFlowProps() {
@@ -113,74 +95,16 @@ public class ExecutableFlow {
return executionOptions;
}
- private void setFlow(Flow flow) {
+ private void setFlow(Project project, Flow flow) {
+ super.setFlow(project, flow, null);
executionOptions = new ExecutionOptions();
-
- for (Node node: flow.getNodes()) {
- String id = node.getId();
- ExecutableNode exNode = new ExecutableNode(node, this);
- executableNodes.put(id, exNode);
- }
-
- for (Edge edge: flow.getEdges()) {
- ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
- ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
-
- sourceNode.addOutNode(edge.getTargetId());
- targetNode.addInNode(edge.getSourceId());
- }
-
+
if (flow.getSuccessEmails() != null) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
if (flow.getFailureEmails() != null) {
executionOptions.setFailureEmails(flow.getFailureEmails());
}
- flowProps.putAll(flow.getAllFlowProps());
- }
-
- public List<String> getStartNodes() {
- if (startNodes == null) {
- startNodes = new ArrayList<String>();
- for (ExecutableNode node: executableNodes.values()) {
- if (node.getInNodes().isEmpty()) {
- startNodes.add(node.getJobId());
- }
- }
- }
-
- return startNodes;
- }
-
- public List<String> getEndNodes() {
- if (endNodes == null) {
- endNodes = new ArrayList<String>();
- for (ExecutableNode node: executableNodes.values()) {
- if (node.getOutNodes().isEmpty()) {
- endNodes.add(node.getJobId());
- }
- }
- }
-
- return endNodes;
- }
-
- public boolean setNodeStatus(String nodeId, Status status) {
- ExecutableNode exNode = executableNodes.get(nodeId);
- if (exNode == null) {
- return false;
- }
- exNode.setStatus(status);
- return true;
- }
-
- public void setProxyNodes(int externalExecutionId, String nodeId) {
- ExecutableNode exNode = executableNodes.get(nodeId);
- if (exNode == null) {
- return;
- }
-
- exNode.setExternalExecutionId(externalExecutionId);
}
public int getExecutionId() {
@@ -189,20 +113,9 @@ public class ExecutableFlow {
public void setExecutionId(int executionId) {
this.executionId = executionId;
-
- for(ExecutableNode node: executableNodes.values()) {
- node.setExecutionId(executionId);
- }
- }
-
- public String getFlowId() {
- return flowId;
- }
-
- public void setFlowId(String flowId) {
- this.flowId = flowId;
}
+ @Override
public int getProjectId() {
return projectId;
}
@@ -227,254 +140,94 @@ public class ExecutableFlow {
this.executionPath = executionPath;
}
- public long getStartTime() {
- return startTime;
+ public String getSubmitUser() {
+ return submitUser;
}
-
- public void setStartTime(long time) {
- this.startTime = time;
+
+ public void setSubmitUser(String submitUser) {
+ this.submitUser = submitUser;
}
- public long getEndTime() {
- return endTime;
+ @Override
+ public int getVersion() {
+ return version;
}
-
- public void setEndTime(long time) {
- this.endTime = time;
+
+ public void setVersion(int version) {
+ this.version = version;
}
public long getSubmitTime() {
return submitTime;
}
- public void setSubmitTime(long time) {
- this.submitTime = time;
- }
-
- public Status getStatus() {
- return flowStatus;
- }
-
- public void setStatus(Status flowStatus) {
- this.flowStatus = flowStatus;
+ public void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
}
public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
- flowObj.put("type", "executableflow");
- flowObj.put("executionId", executionId);
- flowObj.put("executionPath", executionPath);
- flowObj.put("flowId", flowId);
- flowObj.put("projectId", projectId);
-
- if(scheduleId >= 0) {
- flowObj.put("scheduleId", scheduleId);
- }
- flowObj.put("submitTime", submitTime);
- flowObj.put("startTime", startTime);
- flowObj.put("endTime", endTime);
- flowObj.put("status", flowStatus.toString());
- flowObj.put("submitUser", submitUser);
- flowObj.put("version", version);
+ fillMapFromExecutable(flowObj);
- flowObj.put("executionOptions", this.executionOptions.toObject());
- flowObj.put("version", version);
+ flowObj.put(EXECUTIONID_PARAM, executionId);
+ flowObj.put(EXECUTIONPATH_PARAM, executionPath);
+ flowObj.put(PROJECTID_PARAM, projectId);
- ArrayList<Object> props = new ArrayList<Object>();
- for (FlowProps fprop: flowProps.values()) {
- HashMap<String, Object> propObj = new HashMap<String, Object>();
- String source = fprop.getSource();
- String inheritedSource = fprop.getInheritedSource();
-
- propObj.put("source", source);
- if (inheritedSource != null) {
- propObj.put("inherited", inheritedSource);
- }
- props.add(propObj);
+ if(scheduleId >= 0) {
+ flowObj.put(SCHEDULEID_PARAM, scheduleId);
}
- flowObj.put("properties", props);
+
+ flowObj.put(SUBMITUSER_PARAM, submitUser);
+ flowObj.put(VERSION_PARAM, version);
- ArrayList<Object> nodes = new ArrayList<Object>();
- for (ExecutableNode node: executableNodes.values()) {
- nodes.add(node.toObject());
- }
- flowObj.put("nodes", nodes);
+ flowObj.put(EXECUTIONOPTIONS_PARAM, this.executionOptions.toObject());
+ flowObj.put(VERSION_PARAM, version);
ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
- flowObj.put("proxyUsers", proxyUserList);
+ flowObj.put(PROXYUSERS_PARAM, proxyUserList);
- return flowObj;
- }
-
- public Object toUpdateObject(long lastUpdateTime) {
- Map<String, Object> updateData = new HashMap<String,Object>();
- updateData.put("execId", this.executionId);
- updateData.put("status", this.flowStatus.getNumVal());
- updateData.put("startTime", this.startTime);
- updateData.put("endTime", this.endTime);
- updateData.put("updateTime", this.updateTime);
-
- List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
- for (ExecutableNode node: executableNodes.values()) {
-
- if (node.getUpdateTime() > lastUpdateTime) {
- Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
- updatedNodeMap.put("jobId", node.getJobId());
- updatedNodeMap.put("status", node.getStatus().getNumVal());
- updatedNodeMap.put("startTime", node.getStartTime());
- updatedNodeMap.put("endTime", node.getEndTime());
- updatedNodeMap.put("updateTime", node.getUpdateTime());
- updatedNodeMap.put("attempt", node.getAttempt());
-
- if (node.getAttempt() > 0) {
- ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
- for (Attempt attempt: node.getPastAttemptList()) {
- pastAttempts.add(attempt.toObject());
- }
- updatedNodeMap.put("pastAttempts", pastAttempts);
- }
-
- updatedNodes.add(updatedNodeMap);
- }
- }
-
- updateData.put("nodes", updatedNodes);
- return updateData;
- }
-
- @SuppressWarnings("unchecked")
- public void applyUpdateObject(Map<String, Object> updateData) {
- List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
- for (Map<String,Object> node: updatedNodes) {
- String jobId = (String)node.get("jobId");
- Status status = Status.fromInteger((Integer)node.get("status"));
- long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
- long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
- long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-
- ExecutableNode exNode = executableNodes.get(jobId);
- exNode.setEndTime(endTime);
- exNode.setStartTime(startTime);
- exNode.setUpdateTime(updateTime);
- exNode.setStatus(status);
-
- int attempt = 0;
- if (node.containsKey("attempt")) {
- attempt = (Integer)node.get("attempt");
- if (attempt > 0) {
- exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
- }
- }
-
- exNode.setAttempt(attempt);
- }
-
- this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
+ flowObj.put(SUBMITTIME_PARAM, submitTime);
- this.startTime = JSONUtils.getLongFromObject(updateData.get("startTime"));
- this.endTime = JSONUtils.getLongFromObject(updateData.get("endTime"));
- this.updateTime = JSONUtils.getLongFromObject(updateData.get("updateTime"));
+ return flowObj;
}
@SuppressWarnings("unchecked")
public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
ExecutableFlow exFlow = new ExecutableFlow();
-
HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
- exFlow.executionId = (Integer)flowObj.get("executionId");
- exFlow.executionPath = (String)flowObj.get("executionPath");
- exFlow.flowId = (String)flowObj.get("flowId");
- exFlow.projectId = (Integer)flowObj.get("projectId");
- if (flowObj.containsKey("scheduleId")) {
- exFlow.scheduleId = (Integer)flowObj.get("scheduleId");
+
+ exFlow.fillExecutableFromMapObject(flowObj);
+ exFlow.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
+ exFlow.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
+
+ exFlow.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
+ if (flowObj.containsKey(SCHEDULEID_PARAM)) {
+ exFlow.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
}
- exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
- exFlow.startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
- exFlow.endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
- exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
- exFlow.submitUser = (String)flowObj.get("submitUser");
- exFlow.version = (Integer)flowObj.get("version");
+ exFlow.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
+ exFlow.version = (Integer)flowObj.get(VERSION_PARAM);
- if (flowObj.containsKey("executionOptions")) {
- exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
+ exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
+
+ if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
+ exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
}
else {
- // for backawards compatibility should remove in a few versions.
+ // for backwards compatibility should remove in a few versions.
exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
}
- // Copy nodes
- List<Object> nodes = (List<Object>)flowObj.get("nodes");
- for (Object nodeObj: nodes) {
- ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj, exFlow);
- exFlow.executableNodes.put(node.getJobId(), node);
- }
-
- List<Object> properties = (List<Object>)flowObj.get("properties");
- for (Object propNode : properties) {
- HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
- String source = (String)fprop.get("source");
- String inheritedSource = (String)fprop.get("inherited");
-
- FlowProps flowProps = new FlowProps(inheritedSource, source);
- exFlow.flowProps.put(source, flowProps);
- }
-
- if(flowObj.containsKey("proxyUsers")) {
- ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+ if(flowObj.containsKey(PROXYUSERS_PARAM)) {
+ ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get(PROXYUSERS_PARAM);
exFlow.addAllProxyUsers(proxyUserList);
}
return exFlow;
}
- @SuppressWarnings("unchecked")
- public void updateExecutableFlowFromObject(Object obj) {
- HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
-
- submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
- startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
- endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
- flowStatus = Status.valueOf((String)flowObj.get("status"));
-
- List<Object> nodes = (List<Object>)flowObj.get("nodes");
- for (Object nodeObj: nodes) {
- HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
- String nodeId = (String)nodeHash.get("id");
- ExecutableNode node = executableNodes.get(nodeId);
- if (nodeId == null) {
- throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
- }
-
- node.updateNodeFromObject(nodeObj);
- }
- }
-
- public Set<String> getSources() {
- HashSet<String> set = new HashSet<String>();
- for (ExecutableNode exNode: executableNodes.values()) {
- set.add(exNode.getJobPropsSource());
- }
-
- for (FlowProps props: flowProps.values()) {
- set.add(props.getSource());
- }
- return set;
- }
-
- public String getSubmitUser() {
- return submitUser;
- }
-
- public void setSubmitUser(String submitUser) {
- this.submitUser = submitUser;
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
+ public Map<String, Object> toUpdateObject(long lastUpdateTime) {
+ Map<String, Object> updateData = super.toUpdateObject(lastUpdateTime);
+ updateData.put(EXECUTIONID_PARAM, this.executionId);
+ return updateData;
}
-}
+}
\ No newline at end of file
src/java/azkaban/executor/ExecutableFlowBase.java 273(+273 -0)
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
new file mode 100644
index 0000000..311c33b
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.flow.Edge;
+import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.flow.Node;
+import azkaban.flow.SpecialJobTypes;
+import azkaban.project.Project;
+
+public class ExecutableFlowBase extends ExecutableNode {
+ public static final String FLOW_ID_PARAM = "flowId";
+ public static final String NODES_PARAM = "nodes";
+ public static final String PROPERTIES_PARAM = "properties";
+ public static final String SOURCE_PARAM = "source";
+ public static final String INHERITED_PARAM = "inherited";
+
+ private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
+ private ArrayList<String> startNodes;
+ private ArrayList<String> endNodes;
+
+ private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
+ private String flowId;
+
+ public ExecutableFlowBase(Project project, Node node, Flow flow, ExecutableFlowBase parent) {
+ super(node, parent);
+
+ setFlow(project, flow, parent);
+ }
+
+ public ExecutableFlowBase() {
+ }
+
+ public int getExecutionId() {
+ if (this.getParentFlow() != null) {
+ return this.getParentFlow().getExecutionId();
+ }
+
+ return -1;
+ }
+
+ public int getProjectId() {
+ if (this.getParentFlow() != null) {
+ return this.getParentFlow().getProjectId();
+ }
+
+ return -1;
+ }
+
+ public int getVersion() {
+ if (this.getParentFlow() != null) {
+ return this.getParentFlow().getVersion();
+ }
+
+ return -1;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public String getNestedId() {
+ if (this.getParentFlow() != null) {
+ return this.getParentFlow().getNestedId() + ":" + getId();
+ }
+
+ return getId();
+ }
+
+ protected void setFlow(Project project, Flow flow, ExecutableFlowBase parent) {
+ this.flowId = flow.getId();
+
+ for (Node node: flow.getNodes()) {
+ String id = node.getId();
+ if (node.getType().equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
+ String embeddedFlowId = node.getEmbeddedFlowId();
+ Flow subFlow = project.getFlow(embeddedFlowId);
+
+ ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, parent);
+ executableNodes.put(id, embeddedFlow);
+ }
+ else {
+ ExecutableNode exNode = new ExecutableNode(node, parent);
+ executableNodes.put(id, exNode);
+ }
+ }
+
+ for (Edge edge: flow.getEdges()) {
+ ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
+ ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
+
+ sourceNode.addOutNode(edge.getTargetId());
+ targetNode.addInNode(edge.getSourceId());
+ }
+ }
+
+ public List<ExecutableNode> getExecutableNodes() {
+ return new ArrayList<ExecutableNode>(executableNodes.values());
+ }
+
+ public ExecutableNode getExecutableNode(String id) {
+ return executableNodes.get(id);
+ }
+
+ public List<String> getStartNodes() {
+ if (startNodes == null) {
+ startNodes = new ArrayList<String>();
+ for (ExecutableNode node: executableNodes.values()) {
+ if (node.getInNodes().isEmpty()) {
+ startNodes.add(node.getId());
+ }
+ }
+ }
+
+ return startNodes;
+ }
+
+ public List<String> getEndNodes() {
+ if (endNodes == null) {
+ endNodes = new ArrayList<String>();
+ for (ExecutableNode node: executableNodes.values()) {
+ if (node.getOutNodes().isEmpty()) {
+ endNodes.add(node.getId());
+ }
+ }
+ }
+
+ return endNodes;
+ }
+
+ public Map<String,Object> toObject() {
+ Map<String,Object> mapObj = new HashMap<String,Object>();
+ fillMapFromExecutable(mapObj);
+
+ return mapObj;
+ }
+
+ protected void fillMapFromExecutable(Map<String,Object> flowObjMap) {
+ super.fillMapFromExecutable(flowObjMap);
+
+ flowObjMap.put(FLOW_ID_PARAM, flowId);
+
+ ArrayList<Object> nodes = new ArrayList<Object>();
+ for (ExecutableNode node: executableNodes.values()) {
+ nodes.add(node.toObject());
+ }
+ flowObjMap.put(NODES_PARAM, nodes);
+
+ // Flow properties
+ ArrayList<Object> props = new ArrayList<Object>();
+ for (FlowProps fprop: flowProps.values()) {
+ HashMap<String, Object> propObj = new HashMap<String, Object>();
+ String source = fprop.getSource();
+ String inheritedSource = fprop.getInheritedSource();
+
+ propObj.put(SOURCE_PARAM, source);
+ if (inheritedSource != null) {
+ propObj.put(INHERITED_PARAM, inheritedSource);
+ }
+ props.add(propObj);
+ }
+ flowObjMap.put(PROPERTIES_PARAM, props);
+ }
+
+ /**
+ * Using the parameters in the map created from a json file, fill the results of this node
+ */
+ @SuppressWarnings("unchecked")
+ public void fillExecutableFromMapObject(Map<String,Object> flowObjMap) {
+ super.fillExecutableFromMapObject(flowObjMap);
+
+ this.flowId = (String)flowObjMap.get(FLOW_ID_PARAM);
+
+ List<Object> nodes = (List<Object>)flowObjMap.get(NODES_PARAM);
+ if (nodes != null) {
+ for (Object nodeObj: nodes) {
+ Map<String,Object> nodeObjMap = (Map<String,Object>)nodeObj;
+
+ String type = (String)nodeObjMap.get(TYPE_PARAM);
+ if (type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
+ ExecutableFlowBase exFlow = new ExecutableFlowBase();
+ exFlow.fillExecutableFromMapObject(nodeObjMap);
+ exFlow.setParentFlow(this);
+
+ executableNodes.put(exFlow.getId(), exFlow);
+ }
+ else {
+ ExecutableNode exJob = new ExecutableNode();
+ exJob.fillExecutableFromMapObject(nodeObjMap);
+ exJob.setParentFlow(this);
+
+ executableNodes.put(exJob.getId(), exJob);
+ }
+ }
+ }
+
+ List<Object> properties = (List<Object>)flowObjMap.get(PROPERTIES_PARAM);
+ for (Object propNode : properties) {
+ HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
+ String source = (String)fprop.get("source");
+ String inheritedSource = (String)fprop.get("inherited");
+
+ FlowProps flowProps = new FlowProps(inheritedSource, source);
+ this.flowProps.put(source, flowProps);
+ }
+ }
+
+ public Map<String, Object> toUpdateObject(long lastUpdateTime) {
+ Map<String, Object> updateData = super.toUpdateObject();
+
+ List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
+ for (ExecutableNode node: executableNodes.values()) {
+ if (node instanceof ExecutableFlowBase) {
+ Map<String, Object> updatedNodeMap = ((ExecutableFlowBase)node).toUpdateObject(lastUpdateTime);
+ // We add only flows to the list which either have a good update time, or has updated descendants.
+ if (node.getUpdateTime() > lastUpdateTime || updatedNodeMap.containsKey(NODES_PARAM)) {
+ updatedNodes.add(updatedNodeMap);
+ }
+ }
+ else {
+ if (node.getUpdateTime() > lastUpdateTime) {
+ Map<String, Object> updatedNodeMap = node.toUpdateObject();
+ updatedNodes.add(updatedNodeMap);
+ }
+ }
+ }
+
+ // if there are no updated nodes, we just won't add it to the list. This is good
+ // since if this is a nested flow, the parent is given the option to include or
+ // discard these subflows.
+ if (!updatedNodes.isEmpty()) {
+ updateData.put(NODES_PARAM, updatedNodes);
+ }
+ return updateData;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void applyUpdateObject(Map<String, Object> updateData) {
+ super.applyUpdateObject(updateData);
+
+ List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+ for (Map<String,Object> node: updatedNodes) {
+
+ String id = (String)node.get(ID_PARAM);
+ if (id == null) {
+ // Legacy case
+ id = (String)node.get("jobId");
+ }
+
+ ExecutableNode exNode = executableNodes.get(id);
+ exNode.applyUpdateObject(node);
+ }
+ }
+}
\ No newline at end of file
src/java/azkaban/executor/ExecutableNode.java 521(+266 -255)
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 32eac5d..64a0a35 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -1,19 +1,3 @@
-/*
- * Copyright 2012 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
package azkaban.executor;
import java.util.ArrayList;
@@ -22,197 +6,114 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import azkaban.flow.Node;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+/**
+ * Base Executable that nodes and flows are based.
+ */
public class ExecutableNode {
- private String jobId;
- private int executionId;
- private String type;
- private String jobPropsSource;
- private String inheritPropsSource;
+ public static final String ID_PARAM = "id";
+ public static final String STATUS_PARAM = "status";
+ public static final String STARTTIME_PARAM = "startTime";
+ public static final String ENDTIME_PARAM = "endTime";
+ public static final String UPDATETIME_PARAM = "updateTime";
+ public static final String INNODES_PARAM = "inNodes";
+ public static final String OUTNODES_PARAM = "outNodes";
+ public static final String TYPE_PARAM = "type";
+ public static final String PROPS_SOURCE_PARAM = "propSource";
+ public static final String JOB_SOURCE_PARAM = "jobSource";
+ public static final String OUTPUT_PROPS_PARAM = "outputProps";
+
+ private String id;
+ private String type = null;
private Status status = Status.READY;
private long startTime = -1;
private long endTime = -1;
private long updateTime = -1;
- private int level = 0;
- private ExecutableFlow flow;
+
+ // Path to Job File
+ private String jobSource;
+ // Path to top level props file
+ private String propsSource;
+ private Set<String> inNodes = null;
+ private Set<String> outNodes = null;
+
private Props outputProps;
- private int attempt = 0;
- private boolean paused = false;
+ public static final String ATTEMPT_PARAM = "attempt";
+ public static final String PASTATTEMPTS_PARAM = "pastAttempts";
+
+ private int attempt = 0;
private long delayExecution = 0;
-
- private Set<String> inNodes = new HashSet<String>();
- private Set<String> outNodes = new HashSet<String>();
+ private ArrayList<ExecutionAttempt> pastAttempts = null;
- // Used if proxy node
- private Integer externalExecutionId;
- private ArrayList<Attempt> pastAttempts = null;
+ // Transient. These values aren't saved, but rediscovered.
+ private ExecutableFlowBase parentFlow;
- public ExecutableNode(Node node, ExecutableFlow flow) {
- jobId = node.getId();
- executionId = flow.getExecutionId();
- type = node.getType();
- jobPropsSource = node.getJobSource();
- inheritPropsSource = node.getPropsSource();
- status = Status.READY;
- level = node.getLevel();
- this.flow = flow;
+ public ExecutableNode(Node node) {
+ this.id = node.getId();
+ this.jobSource = node.getJobSource();
+ this.propsSource = node.getPropsSource();
}
- public ExecutableNode() {
+ public ExecutableNode(Node node, ExecutableFlowBase parent) {
+ this(node.getId(), node.getJobSource(), node.getPropsSource(), parent);
}
-
- public void resetForRetry() {
- Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
- attempt++;
+
+ public ExecutableNode(String id, String jobSource, String propsSource, ExecutableFlowBase parent) {
+ this.id = id;
+ this.jobSource = jobSource;
+ this.propsSource = propsSource;
- synchronized (this) {
- if (pastAttempts == null) {
- pastAttempts = new ArrayList<Attempt>();
- }
-
- pastAttempts.add(pastAttempt);
- }
- startTime = -1;
- endTime = -1;
- updateTime = System.currentTimeMillis();
- status = Status.READY;
+ setParentFlow(parent);
}
- public void setExecutableFlow(ExecutableFlow flow) {
- this.flow = flow;
+ public ExecutableNode() {
}
- public void setExecutionId(int id) {
- executionId = id;
- }
-
- public int getExecutionId() {
- return executionId;
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String id) {
- this.jobId = id;
+ public ExecutableFlow getExecutableFlow() {
+ if (parentFlow == null) {
+ return null;
+ }
+
+ return parentFlow.getExecutableFlow();
}
-
- public void addInNode(String exNode) {
- inNodes.add(exNode);
+
+ public void setParentFlow(ExecutableFlowBase flow) {
+ this.parentFlow = flow;
}
-
- public void addOutNode(String exNode) {
- outNodes.add(exNode);
+
+ public ExecutableFlowBase getParentFlow() {
+ return parentFlow;
}
-
- public Set<String> getOutNodes() {
- return outNodes;
+
+ public String getId() {
+ return id;
}
- public Set<String> getInNodes() {
- return inNodes;
+ public void setId(String id) {
+ this.id = id;
}
public Status getStatus() {
return status;
}
- public void setStatus(Status status) {
- this.status = status;
+ public String getType() {
+ return type;
}
-
- public long getDelayedExecution() {
- return delayExecution;
+
+ public void setType(String type) {
+ this.type = type;
}
- public void setDelayedExecution(long delayMs) {
- delayExecution = delayMs;
+ public void setStatus(Status status) {
+ this.status = status;
}
- public Object toObject() {
- HashMap<String, Object> objMap = new HashMap<String, Object>();
- objMap.put("id", jobId);
- objMap.put("jobSource", jobPropsSource);
- objMap.put("propSource", inheritPropsSource);
- objMap.put("jobType", type);
- objMap.put("status", status.toString());
- objMap.put("inNodes", new ArrayList<String>(inNodes));
- objMap.put("outNodes", new ArrayList<String>(outNodes));
- objMap.put("startTime", startTime);
- objMap.put("endTime", endTime);
- objMap.put("updateTime", updateTime);
- objMap.put("level", level);
- objMap.put("externalExecutionId", externalExecutionId);
- objMap.put("paused", paused);
-
- if (pastAttempts != null) {
- ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
- for (Attempt attempts : pastAttempts) {
- attemptsList.add(attempts.toObject());
- }
- objMap.put("pastAttempts", attemptsList);
- }
-
- return objMap;
- }
-
- @SuppressWarnings("unchecked")
- public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
- ExecutableNode exNode = new ExecutableNode();
-
- HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
- exNode.executionId = flow == null ? 0 : flow.getExecutionId();
- exNode.jobId = (String)objMap.get("id");
- exNode.jobPropsSource = (String)objMap.get("jobSource");
- exNode.inheritPropsSource = (String)objMap.get("propSource");
- exNode.type = (String)objMap.get("jobType");
- exNode.status = Status.valueOf((String)objMap.get("status"));
-
- exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
- exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
-
- exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
- exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
- exNode.updateTime = JSONUtils.getLongFromObject(objMap.get("updateTime"));
- exNode.level = (Integer)objMap.get("level");
-
- exNode.externalExecutionId = (Integer)objMap.get("externalExecutionId");
-
- exNode.flow = flow;
- Boolean paused = (Boolean)objMap.get("paused");
- if (paused!=null) {
- exNode.paused = paused;
- }
-
- List<Object> pastAttempts = (List<Object>)objMap.get("pastAttempts");
- if (pastAttempts!=null) {
- ArrayList<Attempt> attempts = new ArrayList<Attempt>();
- for (Object attemptObj: pastAttempts) {
- Attempt attempt = Attempt.fromObject(attemptObj);
- attempts.add(attempt);
- }
-
- exNode.pastAttempts = attempts;
- }
-
- return exNode;
- }
-
- @SuppressWarnings("unchecked")
- public void updateNodeFromObject(Object obj) {
- HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
- status = Status.valueOf((String)objMap.get("status"));
-
- startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
- endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
- }
-
public long getStartTime() {
return startTime;
}
@@ -228,23 +129,7 @@ public class ExecutableNode {
public void setEndTime(long endTime) {
this.endTime = endTime;
}
-
- public String getJobPropsSource() {
- return jobPropsSource;
- }
-
- public String getPropsSource() {
- return inheritPropsSource;
- }
-
- public int getLevel() {
- return level;
- }
-
- public ExecutableFlow getFlow() {
- return flow;
- }
-
+
public long getUpdateTime() {
return updateTime;
}
@@ -252,7 +137,45 @@ public class ExecutableNode {
public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}
+
+ public void addOutNode(String exNode) {
+ if (outNodes == null) {
+ outNodes = new HashSet<String>();
+ }
+ outNodes.add(exNode);
+ }
+
+ public void addInNode(String exNode) {
+ if (inNodes == null) {
+ inNodes = new HashSet<String>();
+ }
+ inNodes.add(exNode);
+ }
+ public Set<String> getOutNodes() {
+ return outNodes;
+ }
+
+ public Set<String> getInNodes() {
+ return inNodes;
+ }
+
+ public boolean hasJobSource() {
+ return jobSource != null;
+ }
+
+ public boolean hasPropsSource() {
+ return propsSource != null;
+ }
+
+ public String getJobSource() {
+ return jobSource;
+ }
+
+ public String getPropsSource() {
+ return propsSource;
+ }
+
public void setOutputProps(Props output) {
this.outputProps = output;
}
@@ -260,16 +183,16 @@ public class ExecutableNode {
public Props getOutputProps() {
return outputProps;
}
-
- public Integer getExternalExecutionId() {
- return externalExecutionId;
+
+ public long getDelayedExecution() {
+ return delayExecution;
}
-
- public void setExternalExecutionId(Integer externalExecutionId) {
- this.externalExecutionId = externalExecutionId;
+
+ public void setDelayedExecution(long delayMs) {
+ delayExecution = delayMs;
}
-
- public List<Attempt> getPastAttemptList() {
+
+ public List<ExecutionAttempt> getPastAttemptList() {
return pastAttempts;
}
@@ -281,96 +204,184 @@ public class ExecutableNode {
this.attempt = attempt;
}
- public boolean isPaused() {
- return paused;
+ public void resetForRetry() {
+ ExecutionAttempt pastAttempt = new ExecutionAttempt(attempt, this);
+ attempt++;
+
+ synchronized (this) {
+ if (pastAttempts == null) {
+ pastAttempts = new ArrayList<ExecutionAttempt>();
+ }
+
+ pastAttempts.add(pastAttempt);
+ }
+
+ this.setStartTime(-1);
+ this.setEndTime(-1);
+ this.setUpdateTime(System.currentTimeMillis());
+ this.setStatus(Status.READY);
}
- public void setPaused(boolean paused) {
- this.paused = paused;
- }
public List<Object> getAttemptObjects() {
ArrayList<Object> array = new ArrayList<Object>();
- for (Attempt attempt: pastAttempts) {
+ for (ExecutionAttempt attempt: pastAttempts) {
array.add(attempt.toObject());
}
return array;
}
+ public Map<String,Object> toObject() {
+ Map<String,Object> mapObj = new HashMap<String,Object>();
+ fillMapFromExecutable(mapObj);
+
+ return mapObj;
+ }
- public void updatePastAttempts(List<Object> pastAttemptsList) {
- if (pastAttemptsList == null) {
- return;
+ protected void fillMapFromExecutable(Map<String,Object> objMap) {
+ objMap.put(ID_PARAM, this.id);
+ objMap.put(STATUS_PARAM, status.toString());
+ objMap.put(STARTTIME_PARAM, startTime);
+ objMap.put(ENDTIME_PARAM, endTime);
+ objMap.put(UPDATETIME_PARAM, updateTime);
+ objMap.put(TYPE_PARAM, type);
+
+ if (inNodes != null) {
+ objMap.put(INNODES_PARAM, inNodes);
+ }
+ if (outNodes != null) {
+ objMap.put(OUTNODES_PARAM, outNodes);
}
- synchronized (this) {
- if (this.pastAttempts == null) {
- this.pastAttempts = new ArrayList<Attempt>();
- }
-
- // We just check size because past attempts don't change
- if (pastAttemptsList.size() <= this.pastAttempts.size()) {
- return;
- }
-
- Object[] pastAttemptArray = pastAttemptsList.toArray();
- for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
- Attempt attempt = Attempt.fromObject(pastAttemptArray[i]);
- this.pastAttempts.add(attempt);
+// if (hasPropsSource()) {
+// objMap.put(PROPS_SOURCE_PARAM, this.propsSource);
+// }
+ if (hasJobSource()) {
+ objMap.put(JOB_SOURCE_PARAM, this.jobSource);
+ }
+
+ if (outputProps != null) {
+ objMap.put(OUTPUT_PROPS_PARAM, PropsUtils.toStringMap(outputProps, true));
+ }
+
+ if (pastAttempts != null) {
+ ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
+ for (ExecutionAttempt attempts : pastAttempts) {
+ attemptsList.add(attempts.toObject());
}
+ objMap.put(PASTATTEMPTS_PARAM, attemptsList);
}
-
}
-
- public static class Attempt {
- private int attempt = 0;
- private long startTime = -1;
- private long endTime = -1;
- private Status status;
+
+ @SuppressWarnings("unchecked")
+ public void fillExecutableFromMapObject(Map<String,Object> objMap) {
+ this.id = (String)objMap.get(ID_PARAM);
+ this.status = Status.valueOf((String)objMap.get(STATUS_PARAM));
+ this.startTime = JSONUtils.getLongFromObject(objMap.get(STARTTIME_PARAM));
+ this.endTime = JSONUtils.getLongFromObject(objMap.get(ENDTIME_PARAM));
+ this.updateTime = JSONUtils.getLongFromObject(objMap.get(UPDATETIME_PARAM));
+ this.type = (String)objMap.get(TYPE_PARAM);
- public Attempt(int attempt, long startTime, long endTime, Status status) {
- this.attempt = attempt;
- this.startTime = startTime;
- this.endTime = endTime;
- this.status = status;
+ if (objMap.containsKey(INNODES_PARAM)) {
+ this.inNodes = new HashSet<String>();
+ this.inNodes.addAll((List<String>)objMap.get(INNODES_PARAM));
}
- public long getStartTime() {
- return startTime;
- }
-
- public long getEndTime() {
- return endTime;
+ if (objMap.containsKey(OUTNODES_PARAM)) {
+ this.outNodes = new HashSet<String>();
+ this.outNodes.addAll((List<String>)objMap.get(OUTNODES_PARAM));
}
+//
+// if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
+// this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
+// }
- public Status getStatus() {
- return status;
+ if (objMap.containsKey(JOB_SOURCE_PARAM)) {
+ this.jobSource = (String)objMap.get(JOB_SOURCE_PARAM);
}
- public int getAttempt() {
- return attempt;
+ if (objMap.containsKey(OUTPUT_PROPS_PARAM)) {
+ this.outputProps = new Props(null, (Map<String,String>)objMap.get(OUTPUT_PROPS_PARAM));
}
- public static Attempt fromObject(Object obj) {
- @SuppressWarnings("unchecked")
- Map<String, Object> map = (Map<String, Object>)obj;
- int attempt = (Integer)map.get("attempt");
- long startTime = JSONUtils.getLongFromObject(map.get("startTime"));
- long endTime = JSONUtils.getLongFromObject(map.get("endTime"));
- Status status = Status.valueOf((String)map.get("status"));
+ List<Object> pastAttempts = (List<Object>)objMap.get(PASTATTEMPTS_PARAM);
+ if (pastAttempts!=null) {
+ ArrayList<ExecutionAttempt> attempts = new ArrayList<ExecutionAttempt>();
+ for (Object attemptObj: pastAttempts) {
+ ExecutionAttempt attempt = ExecutionAttempt.fromObject(attemptObj);
+ attempts.add(attempt);
+ }
- return new Attempt(attempt, startTime, endTime, status);
+ this.pastAttempts = attempts;
}
+ }
+
+ public Map<String, Object> toUpdateObject() {
+ Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
+ updatedNodeMap.put(ID_PARAM, getId());
+ updatedNodeMap.put(STATUS_PARAM, getStatus().getNumVal());
+ updatedNodeMap.put(STARTTIME_PARAM, getStartTime());
+ updatedNodeMap.put(ENDTIME_PARAM, getEndTime());
+ updatedNodeMap.put(UPDATETIME_PARAM, getUpdateTime());
- public Map<String, Object> toObject() {
- HashMap<String,Object> attempts = new HashMap<String,Object>();
- attempts.put("attempt", attempt);
- attempts.put("startTime", startTime);
- attempts.put("endTime", endTime);
- attempts.put("status", status.toString());
- return attempts;
+ updatedNodeMap.put(ATTEMPT_PARAM, getAttempt());
+
+ if (getAttempt() > 0) {
+ ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+ for (ExecutionAttempt attempt: getPastAttemptList()) {
+ pastAttempts.add(attempt.toObject());
+ }
+ updatedNodeMap.put(PASTATTEMPTS_PARAM, pastAttempts);
+ }
+
+ return updatedNodeMap;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void applyUpdateObject(Map<String, Object> updateData) {
+ if (updateData.containsKey(STATUS_PARAM)) {
+ this.status = Status.fromInteger((Integer)updateData.get(STATUS_PARAM));
+ }
+ if (updateData.containsKey(STARTTIME_PARAM)) {
+ this.startTime = JSONUtils.getLongFromObject(updateData.get(STARTTIME_PARAM));
+ }
+ if (updateData.containsKey(UPDATETIME_PARAM)) {
+ this.updateTime = JSONUtils.getLongFromObject(updateData.get(UPDATETIME_PARAM));
+ }
+ if (updateData.containsKey(ENDTIME_PARAM)) {
+ this.endTime = JSONUtils.getLongFromObject(updateData.get(ENDTIME_PARAM));
+ }
+
+ if (updateData.containsKey(ATTEMPT_PARAM)) {
+ attempt = (Integer)updateData.get(ATTEMPT_PARAM);
+ if (attempt > 0) {
+ updatePastAttempts((List<Object>)updateData.get(PASTATTEMPTS_PARAM));
+ }
+ }
+ }
+
+ private void updatePastAttempts(List<Object> pastAttemptsList) {
+ if (pastAttemptsList == null) {
+ return;
+ }
+
+ synchronized (this) {
+ if (this.pastAttempts == null) {
+ this.pastAttempts = new ArrayList<ExecutionAttempt>();
+ }
+
+ // We just check size because past attempts don't change
+ if (pastAttemptsList.size() <= this.pastAttempts.size()) {
+ return;
+ }
+
+ Object[] pastAttemptArray = pastAttemptsList.toArray();
+ for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
+ ExecutionAttempt attempt = ExecutionAttempt.fromObject(pastAttemptArray[i]);
+ this.pastAttempts.add(attempt);
+ }
}
}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutionAttempt.java b/src/java/azkaban/executor/ExecutionAttempt.java
new file mode 100644
index 0000000..7da0623
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutionAttempt.java
@@ -0,0 +1,68 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.utils.JSONUtils;
+
+public class ExecutionAttempt {
+ public static final String ATTEMPT_PARAM = "attempt";
+ public static final String STATUS_PARAM = "status";
+ public static final String STARTTIME_PARAM = "startTime";
+ public static final String ENDTIME_PARAM = "endTime";
+
+ private int attempt = 0;
+ private long startTime = -1;
+ private long endTime = -1;
+ private Status status;
+
+ public ExecutionAttempt(int attempt, ExecutableNode executable) {
+ this.attempt = attempt;
+ this.startTime = executable.getStartTime();
+ this.endTime = executable.getEndTime();
+ this.status = executable.getStatus();
+ }
+
+ public ExecutionAttempt(int attempt, long startTime, long endTime, Status status) {
+ this.attempt = attempt;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.status = status;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ public static ExecutionAttempt fromObject(Object obj) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>)obj;
+ int attempt = (Integer)map.get(ATTEMPT_PARAM);
+ long startTime = JSONUtils.getLongFromObject(map.get(STARTTIME_PARAM));
+ long endTime = JSONUtils.getLongFromObject(map.get(ENDTIME_PARAM));
+ Status status = Status.valueOf((String)map.get(STATUS_PARAM));
+
+ return new ExecutionAttempt(attempt, startTime, endTime, status);
+ }
+
+ public Map<String, Object> toObject() {
+ HashMap<String,Object> attempts = new HashMap<String,Object>();
+ attempts.put(ATTEMPT_PARAM, attempt);
+ attempts.put(STARTTIME_PARAM, startTime);
+ attempts.put(ENDTIME_PARAM, endTime);
+ attempts.put(STATUS_PARAM, status.toString());
+ return attempts;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index c6c6f41..17ac116 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -154,11 +154,11 @@ public class ExecutorMailer extends AbstractMailer {
}
}
- private List<String> findFailedJobs(ExecutableFlow flow) {
+ private List<String> findFailedJobs(ExecutableFlowBase flow) {
ArrayList<String> failedJobs = new ArrayList<String>();
for (ExecutableNode node: flow.getExecutableNodes()) {
if (node.getStatus() == Status.FAILED) {
- failedJobs.add(node.getJobId());
+ failedJobs.add(node.getId());
}
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..74bba73 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -361,12 +361,14 @@ public class ExecutorManager {
}
}
- public String submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
+ public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
synchronized(exflow) {
logger.info("Submitting execution flow " + exflow.getFlowId());
int projectId = exflow.getProjectId();
String flowId = exflow.getFlowId();
+ exflow.setSubmitUser(userId);
+ exflow.setSubmitTime(System.currentTimeMillis());
List<Integer> running = getRunningFlows(projectId, flowId);
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 3c7332d..a231c41 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -374,7 +374,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
}
- ExecutableFlow flow = node.getFlow();
+ ExecutableFlow flow = node.getExecutableFlow();
+ String flowId = flow.getFlowId();
+
+ // if the main flow is not the parent, then we'll create a composite key for flowID
+ if (flow != node.getParentFlow()) {
+ flowId = node.getParentFlow().getNestedId();
+ }
+
QueryRunner runner = createQueryRunner();
try {
runner.update(
@@ -382,8 +389,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
flow.getExecutionId(),
flow.getProjectId(),
flow.getVersion(),
- flow.getFlowId(),
- node.getJobId(),
+ flowId,
+ node.getId(),
node.getStartTime(),
node.getEndTime(),
node.getStatus().getNumVal(),
@@ -391,7 +398,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
node.getAttempt()
);
} catch (SQLException e) {
- throw new ExecutorManagerException("Error writing job " + node.getJobId(), e);
+ throw new ExecutorManagerException("Error writing job " + node.getId(), e);
}
}
@@ -418,11 +425,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
node.getEndTime(),
node.getStatus().getNumVal(),
outputParam,
- node.getFlow().getExecutionId(),
- node.getJobId(),
+ node.getExecutableFlow().getExecutionId(),
+ node.getId(),
node.getAttempt());
} catch (SQLException e) {
- throw new ExecutorManagerException("Error updating job " + node.getJobId(), e);
+ throw new ExecutorManagerException("Error updating job " + node.getId(), e);
}
}
src/java/azkaban/scheduler/ScheduleManager.java 326(+191 -135)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..e447067 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -57,7 +57,8 @@ import azkaban.utils.Pair;
public class ScheduleManager {
private static Logger logger = Logger.getLogger(ScheduleManager.class);
- private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+ private final DateTimeFormatter _dateFormat = DateTimeFormat
+ .forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
@@ -66,7 +67,7 @@ public class ScheduleManager {
private final ExecutorManager executorManager;
private final ProjectManager projectManager;
private final SLAManager slaManager;
-
+
// Used for mbeans to query Scheduler status
private long lastCheckTime = -1;
private long nextWakupTime = -1;
@@ -78,10 +79,8 @@ public class ScheduleManager {
* @param loader
*/
public ScheduleManager(ExecutorManager executorManager,
- ProjectManager projectManager,
- SLAManager slaManager,
- ScheduleLoader loader)
- {
+ ProjectManager projectManager, SLAManager slaManager,
+ ScheduleLoader loader) {
this.executorManager = executorManager;
this.projectManager = projectManager;
this.slaManager = slaManager;
@@ -93,7 +92,8 @@ public class ScheduleManager {
scheduleList = loader.loadSchedules();
} catch (ScheduleManagerException e) {
// TODO Auto-generated catch block
- logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
+ logger.error("Failed to load schedules" + e.getCause()
+ + e.getMessage());
e.printStackTrace();
}
@@ -126,9 +126,10 @@ public class ScheduleManager {
*
* @param id
* @return
- */
+ */
public Set<Schedule> getSchedules(int projectId, String flowId) {
- return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+ return scheduleIdentityPairMap.get(new Pair<Integer, String>(projectId,
+ flowId));
}
/**
@@ -136,12 +137,11 @@ public class ScheduleManager {
*
* @param id
* @return
- */
+ */
public Schedule getSchedule(int scheduleId) {
return scheduleIDMap.get(scheduleId);
}
-
/**
* Removes the flow from the schedule if it exists.
*
@@ -149,10 +149,11 @@ public class ScheduleManager {
*/
public synchronized void removeSchedules(int projectId, String flowId) {
Set<Schedule> schedules = getSchedules(projectId, flowId);
- for(Schedule sched : schedules) {
+ for (Schedule sched : schedules) {
removeSchedule(sched);
}
}
+
/**
* Removes the flow from the schedule if it exists.
*
@@ -160,16 +161,16 @@ public class ScheduleManager {
*/
public synchronized void removeSchedule(Schedule sched) {
- Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
+ Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
- if(schedules != null) {
+ if (schedules != null) {
schedules.remove(sched);
- if(schedules.size() == 0) {
+ if (schedules.size() == 0) {
scheduleIdentityPairMap.remove(identityPairMap);
}
}
scheduleIDMap.remove(sched.getScheduleId());
-
+
runner.removeRunnerSchedule(sched);
try {
loader.removeSchedule(sched);
@@ -201,40 +202,28 @@ public class ScheduleManager {
// }
// }
- public Schedule scheduleFlow(
- final int scheduleId,
- final int projectId,
- final String projectName,
- final String flowName,
- final String status,
- final long firstSchedTime,
- final DateTimeZone timezone,
- final ReadablePeriod period,
- final long lastModifyTime,
- final long nextExecTime,
- final long submitTime,
- final String submitUser
- ) {
- return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+ public Schedule scheduleFlow(final int scheduleId, final int projectId,
+ final String projectName, final String flowName,
+ final String status, final long firstSchedTime,
+ final DateTimeZone timezone, final ReadablePeriod period,
+ final long lastModifyTime, final long nextExecTime,
+ final long submitTime, final String submitUser) {
+ return scheduleFlow(scheduleId, projectId, projectName, flowName,
+ status, firstSchedTime, timezone, period, lastModifyTime,
+ nextExecTime, submitTime, submitUser, null, null);
}
-
- public Schedule scheduleFlow(
- final int scheduleId,
- final int projectId,
- final String projectName,
- final String flowName,
- final String status,
- final long firstSchedTime,
- final DateTimeZone timezone,
- final ReadablePeriod period,
- final long lastModifyTime,
- final long nextExecTime,
- final long submitTime,
- final String submitUser,
- ExecutionOptions execOptions,
- SlaOptions slaOptions
- ) {
- Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+
+ public Schedule scheduleFlow(final int scheduleId, final int projectId,
+ final String projectName, final String flowName,
+ final String status, final long firstSchedTime,
+ final DateTimeZone timezone, final ReadablePeriod period,
+ final long lastModifyTime, final long nextExecTime,
+ final long submitTime, final String submitUser,
+ ExecutionOptions execOptions, SlaOptions slaOptions) {
+ Schedule sched = new Schedule(scheduleId, projectId, projectName,
+ flowName, status, firstSchedTime, timezone, period,
+ lastModifyTime, nextExecTime, submitTime, submitUser,
+ execOptions, slaOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
@@ -256,8 +245,9 @@ public class ScheduleManager {
s.updateTime();
this.runner.addRunnerSchedule(s);
scheduleIDMap.put(s.getScheduleId(), s);
- Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
- if(schedules == null) {
+ Set<Schedule> schedules = scheduleIdentityPairMap.get(s
+ .getScheduleIdentityPair());
+ if (schedules == null) {
schedules = new HashSet<Schedule>();
scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
}
@@ -271,13 +261,12 @@ public class ScheduleManager {
*/
public synchronized void insertSchedule(Schedule s) {
boolean exist = s.getScheduleId() != -1;
- if(s.updateTime()) {
+ if (s.updateTime()) {
try {
- if(!exist) {
+ if (!exist) {
loader.insertSchedule(s);
internalSchedule(s);
- }
- else{
+ } else {
loader.updateSchedule(s);
internalSchedule(s);
}
@@ -285,19 +274,19 @@ public class ScheduleManager {
// TODO Auto-generated catch block
e.printStackTrace();
}
- }
- else {
- logger.error("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName());
+ } else {
+ logger.error("The provided schedule is non-recurring and the scheduled time already passed. "
+ + s.getScheduleName());
}
}
-// /**
-// * Save the schedule
-// */
-// private void saveSchedule() {
-// loader.saveSchedule(getSchedule());
-// }
-
+ // /**
+ // * Save the schedule
+ // */
+ // private void saveSchedule() {
+ // loader.saveSchedule(getSchedule());
+ // }
+
/**
* Thread that simply invokes the running of flows when the schedule is
* ready.
@@ -313,7 +302,8 @@ public class ScheduleManager {
private static final int TIMEOUT_MS = 300000;
public ScheduleRunner() {
- schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
+ schedules = new PriorityBlockingQueue<Schedule>(1,
+ new ScheduleComparator());
}
public void shutdown() {
@@ -377,99 +367,158 @@ public class ScheduleManager {
if (s == null) {
// If null, wake up every minute or so to see if
- // there's something to do. Most likely there will not be.
+ // there's something to do. Most likely there will
+ // not be.
try {
logger.info("Nothing scheduled to run. Checking again soon.");
- nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
+ nextWakupTime = System.currentTimeMillis()
+ + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
- // interruption should occur when items are added or removed from the queue.
+ // interruption should occur when items are
+ // added or removed from the queue.
}
} else {
- // We've passed the flow execution time, so we will run.
- if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
- // Run flow. The invocation of flows should be quick.
+ // We've passed the flow execution time, so we will
+ // run.
+ if (!(new DateTime(s.getNextExecTime()))
+ .isAfterNow()) {
+ // Run flow. The invocation of flows should be
+ // quick.
Schedule runningSched = schedules.poll();
- logger.info("Scheduler ready to run " + runningSched.toString());
+ logger.info("Scheduler ready to run "
+ + runningSched.toString());
// Execute the flow here
try {
- Project project = projectManager.getProject(runningSched.getProjectId());
+ Project project = projectManager
+ .getProject(runningSched
+ .getProjectId());
if (project == null) {
- logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
- throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
- }
- //TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
+ logger.error("Scheduled Project "
+ + runningSched.getProjectId()
+ + " does not exist!");
+ throw new RuntimeException(
+ "Error finding the scheduled project. "
+ + runningSched
+ .getProjectId());
+ }
+ // TODO It is possible that the project is
+ // there, but the flow doesn't exist because
+ // upload a version that changes flow
+ // structure
- Flow flow = project.getFlow(runningSched.getFlowName());
+ Flow flow = project.getFlow(runningSched
+ .getFlowName());
if (flow == null) {
- logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
- throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
+ logger.error("Flow "
+ + runningSched
+ .getScheduleName()
+ + " cannot be found in project "
+ + project.getName());
+ throw new RuntimeException(
+ "Error finding the scheduled flow. "
+ + runningSched
+ .getScheduleName());
}
// Create ExecutableFlow
- ExecutableFlow exflow = new ExecutableFlow(flow);
- System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
- exflow.setScheduleId(runningSched.getScheduleId());
- exflow.setSubmitUser(runningSched.getSubmitUser());
- exflow.addAllProxyUsers(project.getProxyUsers());
-
- ExecutionOptions flowOptions = runningSched.getExecutionOptions();
- if(flowOptions == null) {
+ ExecutableFlow exflow = new ExecutableFlow(
+ project, flow);
+ System.out
+ .println("ScheduleManager: creating schedule: "
+ + runningSched
+ .getScheduleId());
+ exflow.setScheduleId(runningSched
+ .getScheduleId());
+ exflow.addAllProxyUsers(project
+ .getProxyUsers());
+
+ ExecutionOptions flowOptions = runningSched
+ .getExecutionOptions();
+ if (flowOptions == null) {
flowOptions = new ExecutionOptions();
- flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ flowOptions
+ .setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
}
exflow.setExecutionOptions(flowOptions);
-
- if (!flowOptions.isFailureEmailsOverridden()) {
- flowOptions.setFailureEmails(flow.getFailureEmails());
+
+ if (!flowOptions
+ .isFailureEmailsOverridden()) {
+ flowOptions.setFailureEmails(flow
+ .getFailureEmails());
}
- if (!flowOptions.isSuccessEmailsOverridden()) {
- flowOptions.setSuccessEmails(flow.getSuccessEmails());
+ if (!flowOptions
+ .isSuccessEmailsOverridden()) {
+ flowOptions.setSuccessEmails(flow
+ .getSuccessEmails());
}
-
+
try {
- executorManager.submitExecutableFlow(exflow);
- logger.info("Scheduler has invoked " + exflow.getExecutionId());
- }
- catch (ExecutorManagerException e) {
+ executorManager
+ .submitExecutableFlow(exflow, runningSched
+ .getSubmitUser());
+ logger.info("Scheduler has invoked "
+ + exflow.getExecutionId());
+ } catch (ExecutorManagerException e) {
throw e;
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
- throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
+ throw new ScheduleManagerException(
+ "Scheduler invoked flow "
+ + exflow.getExecutionId()
+ + " has failed.", e);
}
-
- SlaOptions slaOptions = runningSched.getSlaOptions();
- if(slaOptions != null) {
- logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+
+ SlaOptions slaOptions = runningSched
+ .getSlaOptions();
+ if (slaOptions != null) {
+ logger.info("Submitting SLA checkings for "
+ + runningSched.getFlowName());
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
- for(SlaSetting set : slaOptions.getSettings()) {
- if(set.getId().equals("")) {
- DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
- slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
- }
- else {
+ for (SlaSetting set : slaOptions
+ .getSettings()) {
+ if (set.getId().equals("")) {
+ DateTime checkTime = new DateTime(
+ runningSched
+ .getNextExecTime())
+ .plus(set.getDuration());
+ slaManager
+ .submitSla(
+ exflow.getExecutionId(),
+ "",
+ checkTime,
+ slaOptions
+ .getSlaEmails(),
+ set.getActions(),
+ null,
+ set.getRule());
+ } else {
jobsettings.add(set);
}
}
- if(jobsettings.size() > 0) {
- slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+ if (jobsettings.size() > 0) {
+ slaManager.submitSla(
+ exflow.getExecutionId(),
+ "", DateTime.now(),
+ slaOptions.getSlaEmails(),
+ new ArrayList<SlaAction>(),
+ jobsettings,
+ SlaRule.WAITANDCHECKJOB);
}
}
-
- }
- catch (ExecutorManagerException e) {
- if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+
+ } catch (ExecutorManagerException e) {
+ if (e.getReason() != null
+ && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
logger.info(e.getMessage());
- }
- else {
+ } else {
e.printStackTrace();
}
- }
- catch (Exception e) {
- logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
+ } catch (Exception e) {
+ logger.info("Scheduler failed to run job. "
+ + e.getMessage() + e.getCause());
}
removeRunnerSchedule(runningSched);
@@ -480,15 +529,18 @@ public class ScheduleManager {
if (runningSched.updateTime()) {
addRunnerSchedule(runningSched);
loader.updateSchedule(runningSched);
- }
- else {
+ } else {
removeSchedule(runningSched);
- }
+ }
} else {
// wait until flow run
- long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
+ long millisWait = Math.max(
+ 0,
+ s.getNextExecTime()
+ - (new DateTime()).getMillis());
try {
- nextWakupTime = System.currentTimeMillis() + millisWait;
+ nextWakupTime = System.currentTimeMillis()
+ + millisWait;
this.wait(Math.min(millisWait, TIMEOUT_MS));
} catch (InterruptedException e) {
// interruption should occur when items are
@@ -497,9 +549,13 @@ public class ScheduleManager {
}
}
} catch (Exception e) {
- logger.error("Unexpected exception has been thrown in scheduler", e);
+ logger.error(
+ "Unexpected exception has been thrown in scheduler",
+ e);
} catch (Throwable e) {
- logger.error("Unexpected throwable has been thrown in scheduler", e);
+ logger.error(
+ "Unexpected throwable has been thrown in scheduler",
+ e);
}
}
}
@@ -526,19 +582,19 @@ public class ScheduleManager {
}
}
}
-
+
public long getLastCheckTime() {
return lastCheckTime;
}
-
+
public long getNextUpdateTime() {
return nextWakupTime;
}
-
+
public State getThreadState() {
return runner.getState();
}
-
+
public boolean isThreadActive() {
return runner.isAlive();
}
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 5324cb7..14da916 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -28,7 +28,7 @@ import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.flow.CommonJobProperties;
import org.apache.commons.lang.StringUtils;
@@ -208,7 +208,7 @@ public class PropsUtils {
return buffer.toString();
}
- public static Props addCommonFlowProperties(final ExecutableFlow flow) {
+ public static Props addCommonFlowProperties(final ExecutableFlowBase flow) {
Props parentProps = new Props();
parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
diff --git a/src/java/azkaban/utils/StringUtils.java b/src/java/azkaban/utils/StringUtils.java
index 6684b00..a51f812 100644
--- a/src/java/azkaban/utils/StringUtils.java
+++ b/src/java/azkaban/utils/StringUtils.java
@@ -16,7 +16,6 @@
package azkaban.utils;
import java.util.Collection;
-import java.util.List;
public class StringUtils {
public static final char SINGLE_QUOTE = '\'';
src/java/azkaban/webapp/servlet/ExecutorServlet.java 628(+352 -276)
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a542a82..e0d5e85 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionAttempt;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManager;
@@ -57,7 +58,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
- AzkabanWebServer server = (AzkabanWebServer)getApplication();
+ AzkabanWebServer server = (AzkabanWebServer) getApplication();
projectManager = server.getProjectManager();
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
@@ -65,25 +66,26 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
@Override
- protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException {
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
- }
- else if (hasParam(req, "execid")) {
+ } else if (hasParam(req, "execid")) {
if (hasParam(req, "job")) {
handleExecutionJobPage(req, resp, session);
- }
- else {
+ } else {
handleExecutionFlowPage(req, resp, session);
}
- }
- else {
+ } else {
handleExecutionsPage(req, resp, session);
}
}
-
- private void handleExecutionJobPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/joblogpage.vm");
+
+ private void handleExecutionJobPage(HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ Page page = newPage(req, resp, session,
+ "azkaban/webapp/servlet/velocity/joblogpage.vm");
User user = session.getUser();
int execId = getIntParam(req, "execid");
String jobId = getParam(req, "job");
@@ -91,48 +93,59 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.add("execid", execId);
page.add("jobid", jobId);
page.add("attempt", attempt);
-
+
ExecutableFlow flow = null;
try {
flow = executorManager.getExecutableFlow(execId);
if (flow == null) {
- page.add("errorMsg", "Error loading executing flow " + execId + " not found.");
+ page.add("errorMsg", "Error loading executing flow " + execId
+ + " not found.");
page.render();
return;
}
} catch (ExecutorManagerException e) {
- page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+ page.add("errorMsg",
+ "Error loading executing flow: " + e.getMessage());
page.render();
return;
}
-
+
int projectId = flow.getProjectId();
- Project project = getProjectPageByPermission(page, projectId, user, Type.READ);
+ Project project = getProjectPageByPermission(page, projectId, user,
+ Type.READ);
if (project == null) {
page.render();
return;
}
-
+
page.add("projectName", project.getName());
page.add("flowid", flow.getFlowId());
-
+
page.render();
}
-
- private void handleExecutionsPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/executionspage.vm");
+
+ private void handleExecutionsPage(HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ Page page = newPage(req, resp, session,
+ "azkaban/webapp/servlet/velocity/executionspage.vm");
List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
-
- List<ExecutableFlow> finishedFlows = executorManager.getRecentlyFinishedFlows();
- page.add("recentlyFinished", finishedFlows.isEmpty() ? null : finishedFlows);
+
+ List<ExecutableFlow> finishedFlows = executorManager
+ .getRecentlyFinishedFlows();
+ page.add("recentlyFinished", finishedFlows.isEmpty() ? null
+ : finishedFlows);
page.add("vmutils", velocityHelper);
page.render();
}
-
- private void handleExecutionFlowPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/executingflowpage.vm");
+
+ private void handleExecutionFlowPage(HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
+ Page page = newPage(req, resp, session,
+ "azkaban/webapp/servlet/velocity/executingflowpage.vm");
User user = session.getUser();
int execId = getIntParam(req, "execid");
page.add("execid", execId);
@@ -141,89 +154,95 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
try {
flow = executorManager.getExecutableFlow(execId);
if (flow == null) {
- page.add("errorMsg", "Error loading executing flow " + execId + " not found.");
+ page.add("errorMsg", "Error loading executing flow " + execId
+ + " not found.");
page.render();
return;
}
} catch (ExecutorManagerException e) {
- page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+ page.add("errorMsg",
+ "Error loading executing flow: " + e.getMessage());
page.render();
return;
}
-
+
int projectId = flow.getProjectId();
- Project project = getProjectPageByPermission(page, projectId, user, Type.READ);
- if(project == null) {
+ Project project = getProjectPageByPermission(page, projectId, user,
+ Type.READ);
+ if (project == null) {
page.render();
return;
}
-
+
page.add("projectId", project.getId());
page.add("projectName", project.getName());
page.add("flowid", flow.getFlowId());
-
+
page.render();
}
-
- protected Project getProjectPageByPermission(Page page, int projectId, User user, Permission.Type type) {
+
+ protected Project getProjectPageByPermission(Page page, int projectId,
+ User user, Permission.Type type) {
Project project = projectManager.getProject(projectId);
-
+
if (project == null) {
page.add("errorMsg", "Project " + project + " not found.");
- }
- else if (!hasPermission(project, user, type)) {
- page.add("errorMsg", "User " + user.getUserId() + " doesn't have " + type.name() + " permissions on " + project.getName());
- }
- else {
+ } else if (!hasPermission(project, user, type)) {
+ page.add("errorMsg", "User " + user.getUserId() + " doesn't have "
+ + type.name() + " permissions on " + project.getName());
+ } else {
return project;
}
-
+
return null;
}
- protected Project getProjectAjaxByPermission(Map<String, Object> ret, String projectName, User user, Permission.Type type) {
+ protected Project getProjectAjaxByPermission(Map<String, Object> ret,
+ String projectName, User user, Permission.Type type) {
Project project = projectManager.getProject(projectName);
-
+
if (project == null) {
ret.put("error", "Project '" + project + "' not found.");
- }
- else if (!hasPermission(project, user, type)) {
- ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + project.getName());
- }
- else {
+ } else if (!hasPermission(project, user, type)) {
+ ret.put("error", "User '" + user.getUserId() + "' doesn't have "
+ + type.name() + " permissions on " + project.getName());
+ } else {
return project;
}
-
+
return null;
}
-
- protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
+
+ protected Project getProjectAjaxByPermission(Map<String, Object> ret,
+ int projectId, User user, Permission.Type type) {
Project project = projectManager.getProject(projectId);
-
+
if (project == null) {
ret.put("error", "Project '" + project + "' not found.");
- }
- else if (!hasPermission(project, user, type)) {
- ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + project.getName());
- }
- else {
+ } else if (!hasPermission(project, user, type)) {
+ ret.put("error", "User '" + user.getUserId() + "' doesn't have "
+ + type.name() + " permissions on " + project.getName());
+ } else {
return project;
}
-
+
return null;
}
-
+
@Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException {
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
}
}
- private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ private void handleAJAXAction(HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws ServletException,
+ IOException {
HashMap<String, Object> ret = new HashMap<String, Object>();
String ajaxName = getParam(req, "ajax");
-
+
if (hasParam(req, "execid")) {
int execid = getIntParam(req, "execid");
ExecutableFlow exFlow = null;
@@ -231,64 +250,60 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
try {
exFlow = executorManager.getExecutableFlow(execid);
} catch (ExecutorManagerException e) {
- ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
+ ret.put("error", "Error fetching execution '" + execid + "': "
+ + e.getMessage());
}
if (exFlow == null) {
ret.put("error", "Cannot find execution '" + execid + "'");
- }
- else {
+ } else {
if (ajaxName.equals("fetchexecflow")) {
- ajaxFetchExecutableFlow(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("fetchexecflowupdate")) {
- ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("cancelFlow")) {
+ ajaxFetchExecutableFlow(req, resp, ret, session.getUser(),
+ exFlow);
+ } else if (ajaxName.equals("fetchexecflowupdate")) {
+ ajaxFetchExecutableFlowUpdate(req, resp, ret,
+ session.getUser(), exFlow);
+ } else if (ajaxName.equals("cancelFlow")) {
ajaxCancelFlow(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("restartFlow")) {
+ } else if (ajaxName.equals("restartFlow")) {
ajaxRestartFlow(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("pauseFlow")) {
+ } else if (ajaxName.equals("pauseFlow")) {
ajaxPauseFlow(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("resumeFlow")) {
+ } else if (ajaxName.equals("resumeFlow")) {
ajaxResumeFlow(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("fetchExecFlowLogs")) {
- ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("fetchExecJobLogs")) {
+ } else if (ajaxName.equals("fetchExecFlowLogs")) {
+ ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(),
+ exFlow);
+ } else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
- }
- else if (ajaxName.equals("retryFailedJobs")) {
+ } else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
-// else if (ajaxName.equals("fetchLatestJobStatus")) {
-// ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(), exFlow);
-// }
+ // else if (ajaxName.equals("fetchLatestJobStatus")) {
+ // ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(),
+ // exFlow);
+ // }
else if (ajaxName.equals("flowInfo")) {
- //String projectName = getParam(req, "project");
- //Project project = projectManager.getProject(projectName);
- //String flowName = getParam(req, "flow");
- ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
+ // String projectName = getParam(req, "project");
+ // Project project = projectManager.getProject(projectName);
+ // String flowName = getParam(req, "flow");
+ ajaxFetchExecutableFlowInfo(req, resp, ret,
+ session.getUser(), exFlow);
}
}
- }
- else if (ajaxName.equals("getRunning")) {
+ } else if (ajaxName.equals("getRunning")) {
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
- ajaxGetFlowRunning(req, resp, ret, session.getUser(), projectName, flowName);
- }
- else if (ajaxName.equals("flowInfo")) {
+ ajaxGetFlowRunning(req, resp, ret, session.getUser(), projectName,
+ flowName);
+ } else if (ajaxName.equals("flowInfo")) {
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
- ajaxFetchFlowInfo(req, resp, ret, session.getUser(), projectName, flowName);
- }
- else {
+ ajaxFetchFlowInfo(req, resp, ret, session.getUser(), projectName,
+ flowName);
+ } else {
String projectName = getParam(req, "project");
-
+
ret.put("project", projectName);
if (ajaxName.equals("executeFlow")) {
ajaxAttemptExecuteFlow(req, resp, ret, session.getUser());
@@ -299,68 +314,78 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
-// private void ajaxFetchLatestJobStatus(HttpServletRequest req,HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) {
-// Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
-// if (project == null) {
-// ret.put("error", "Project doesn't exist or incorrect access permission.");
-// return;
-// }
-//
-// String projectName;
-// String flowName;
-// String jobName;
-// try {
-// projectName = getParam(req, "projectName");
-// flowName = getParam(req, "flowName");
-// jobName = getParam(req, "jobName");
-// } catch (Exception e) {
-// ret.put("error", e.getMessage());
-// return;
-// }
-//
-// try {
-// ExecutableNode node = exFlow.getExecutableNode(jobId);
-// if (node == null) {
-// ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
-// return;
-// }
-//
-// int attempt = this.getIntParam(req, "attempt", node.getAttempt());
-// LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
-// if (data == null) {
-// ret.put("length", 0);
-// ret.put("offset", offset);
-// ret.put("data", "");
-// }
-// else {
-// ret.put("length", data.getLength());
-// ret.put("offset", data.getOffset());
-// ret.put("data", data.getData());
-// }
-// } catch (ExecutorManagerException e) {
-// throw new ServletException(e);
-// }
-//
-// }
-
- private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+ // private void ajaxFetchLatestJobStatus(HttpServletRequest
+ // req,HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ // ExecutableFlow exFlow) {
+ // Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(),
+ // user, Type.READ);
+ // if (project == null) {
+ // ret.put("error",
+ // "Project doesn't exist or incorrect access permission.");
+ // return;
+ // }
+ //
+ // String projectName;
+ // String flowName;
+ // String jobName;
+ // try {
+ // projectName = getParam(req, "projectName");
+ // flowName = getParam(req, "flowName");
+ // jobName = getParam(req, "jobName");
+ // } catch (Exception e) {
+ // ret.put("error", e.getMessage());
+ // return;
+ // }
+ //
+ // try {
+ // ExecutableNode node = exFlow.getExecutableNode(jobId);
+ // if (node == null) {
+ // ret.put("error", "Job " + jobId + " doesn't exist in " +
+ // exFlow.getExecutionId());
+ // return;
+ // }
+ //
+ // int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+ // LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset,
+ // length, attempt);
+ // if (data == null) {
+ // ret.put("length", 0);
+ // ret.put("offset", offset);
+ // ret.put("data", "");
+ // }
+ // else {
+ // ret.put("length", data.getLength());
+ // ret.put("offset", data.getOffset());
+ // ret.put("data", data.getData());
+ // }
+ // } catch (ExecutorManagerException e) {
+ // throw new ServletException(e);
+ // }
+ //
+ // }
+
+ private void ajaxRestartFailed(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {
return;
}
-
- if (exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.SUCCEEDED) {
+
+ if (exFlow.getStatus() == Status.FAILED
+ || exFlow.getStatus() == Status.SUCCEEDED) {
ret.put("error", "Flow has already finished. Please re-execute.");
return;
}
-
+
try {
executorManager.retryFailures(exFlow, user.getUserId());
} catch (ExecutorManagerException e) {
ret.put("error", e.getMessage());
}
}
-
+
/**
* Gets the logs through plain text stream to reduce memory overhead.
*
@@ -370,25 +395,28 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
* @param exFlow
* @throws ServletException
*/
- private void ajaxFetchExecFlowLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ private void ajaxFetchExecFlowLogs(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
-
+
int offset = this.getIntParam(req, "offset");
int length = this.getIntParam(req, "length");
-
+
resp.setCharacterEncoding("utf-8");
try {
- LogData data = executorManager.getExecutableFlowLog(exFlow, offset, length);
+ LogData data = executorManager.getExecutableFlowLog(exFlow, offset,
+ length);
if (data == null) {
ret.put("length", 0);
ret.put("offset", offset);
ret.put("data", "");
- }
- else {
+ } else {
ret.put("length", data.getLength());
ret.put("offset", data.getOffset());
ret.put("data", data.getData());
@@ -397,7 +425,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
throw new ServletException(e);
}
}
-
+
/**
* Gets the logs through ajax plain text stream to reduce memory overhead.
*
@@ -407,33 +435,38 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
* @param exFlow
* @throws ServletException
*/
- private void ajaxFetchJobLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ private void ajaxFetchJobLogs(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
-
+
int offset = this.getIntParam(req, "offset");
int length = this.getIntParam(req, "length");
-
+
String jobId = this.getParam(req, "jobId");
resp.setCharacterEncoding("utf-8");
try {
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
- ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+ ret.put("error",
+ "Job " + jobId + " doesn't exist in "
+ + exFlow.getExecutionId());
return;
}
-
+
int attempt = this.getIntParam(req, "attempt", node.getAttempt());
- LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+ LogData data = executorManager.getExecutionJobLog(exFlow, jobId,
+ offset, length, attempt);
if (data == null) {
ret.put("length", 0);
ret.put("offset", offset);
ret.put("data", "");
- }
- else {
+ } else {
ret.put("length", data.getLength());
ret.put("offset", data.getOffset());
ret.put("data", data.getData());
@@ -444,7 +477,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
/**
- * Gets the job metadata through ajax plain text stream to reduce memory overhead.
+ * Gets the job metadata through ajax plain text stream to reduce memory
+ * overhead.
*
* @param req
* @param resp
@@ -452,33 +486,38 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
* @param exFlow
* @throws ServletException
*/
- private void ajaxFetchJobMetaData(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ private void ajaxFetchJobMetaData(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
-
+
int offset = this.getIntParam(req, "offset");
int length = this.getIntParam(req, "length");
-
+
String jobId = this.getParam(req, "jobId");
resp.setCharacterEncoding("utf-8");
try {
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
- ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+ ret.put("error",
+ "Job " + jobId + " doesn't exist in "
+ + exFlow.getExecutionId());
return;
}
-
+
int attempt = this.getIntParam(req, "attempt", node.getAttempt());
- JobMetaData data = executorManager.getExecutionJobMetaData(exFlow, jobId, offset, length, attempt);
+ JobMetaData data = executorManager.getExecutionJobMetaData(exFlow,
+ jobId, offset, length, attempt);
if (data == null) {
ret.put("length", 0);
ret.put("offset", offset);
ret.put("data", "");
- }
- else {
+ } else {
ret.put("length", data.getLength());
ret.put("offset", data.getOffset());
ret.put("data", data.getData());
@@ -487,93 +526,105 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
throw new ServletException(e);
}
}
-
- private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
+
+ private void ajaxFetchFlowInfo(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ String projectName, String flowId) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret, projectName, user,
+ Type.READ);
if (project == null) {
return;
}
-
+
Flow flow = project.getFlow(flowId);
if (flow == null) {
- ret.put("error", "Error loading flow. Flow " + flowId + " doesn't exist in " + projectName);
+ ret.put("error", "Error loading flow. Flow " + flowId
+ + " doesn't exist in " + projectName);
return;
}
-
+
ret.put("successEmails", flow.getSuccessEmails());
ret.put("failureEmails", flow.getFailureEmails());
-
+
Schedule sflow = null;
- for (Schedule sched: scheduleManager.getSchedules()) {
- if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(flowId)) {
+ for (Schedule sched : scheduleManager.getSchedules()) {
+ if (sched.getProjectId() == project.getId()
+ && sched.getFlowName().equals(flowId)) {
sflow = sched;
break;
}
}
-
+
if (sflow != null) {
ret.put("scheduled", sflow.getNextExecTime());
}
}
- private void ajaxFetchExecutableFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exflow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, exflow.getProjectId(), user, Type.READ);
+ private void ajaxFetchExecutableFlowInfo(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exflow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exflow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
-
+
Flow flow = project.getFlow(exflow.getFlowId());
if (flow == null) {
- ret.put("error", "Error loading flow. Flow " + exflow.getFlowId() + " doesn't exist in " + exflow.getProjectId());
+ ret.put("error", "Error loading flow. Flow " + exflow.getFlowId()
+ + " doesn't exist in " + exflow.getProjectId());
return;
}
-
+
ExecutionOptions options = exflow.getExecutionOptions();
-
+
ret.put("successEmails", options.getSuccessEmails());
ret.put("failureEmails", options.getFailureEmails());
ret.put("flowParam", options.getFlowParameters());
-
+
FailureAction action = options.getFailureAction();
String failureAction = null;
switch (action) {
- case FINISH_CURRENTLY_RUNNING:
- failureAction = "finishCurrent";
- break;
- case CANCEL_ALL:
- failureAction = "cancelImmediately";
- break;
- case FINISH_ALL_POSSIBLE:
- failureAction = "finishPossible";
- break;
+ case FINISH_CURRENTLY_RUNNING:
+ failureAction = "finishCurrent";
+ break;
+ case CANCEL_ALL:
+ failureAction = "cancelImmediately";
+ break;
+ case FINISH_ALL_POSSIBLE:
+ failureAction = "finishPossible";
+ break;
}
ret.put("failureAction", failureAction);
-
+
ret.put("notifyFailureFirst", options.getNotifyOnFirstFailure());
ret.put("notifyFailureLast", options.getNotifyOnLastFailure());
-
+
ret.put("failureEmailsOverride", options.isFailureEmailsOverridden());
ret.put("successEmailsOverride", options.isSuccessEmailsOverridden());
-
+
ret.put("concurrentOptions", options.getConcurrentOption());
ret.put("pipelineLevel", options.getPipelineLevel());
ret.put("pipelineExecution", options.getPipelineExecutionId());
ret.put("queueLevel", options.getQueueLevel());
-
- HashMap<String, String> nodeStatus = new HashMap<String,String>();
- for(ExecutableNode node : exflow.getExecutableNodes()) {
- nodeStatus.put(node.getJobId(), node.getStatus().toString());
+
+ HashMap<String, String> nodeStatus = new HashMap<String, String>();
+ for (ExecutableNode node : exflow.getExecutableNodes()) {
+ nodeStatus.put(node.getId(), node.getStatus().toString());
}
ret.put("nodeStatus", nodeStatus);
ret.put("disabled", options.getDisabledJobs());
}
-
- private void ajaxCancelFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+
+ private void ajaxCancelFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {
return;
}
-
+
try {
executorManager.cancelFlow(exFlow, user.getUserId());
} catch (ExecutorManagerException e) {
@@ -581,27 +632,37 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxGetFlowRunning(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectId, String flowId) throws ServletException{
- Project project = getProjectAjaxByPermission(ret, projectId, user, Type.EXECUTE);
+ private void ajaxGetFlowRunning(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ String projectId, String flowId) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret, projectId, user,
+ Type.EXECUTE);
if (project == null) {
return;
}
-
- List<Integer> refs = executorManager.getRunningFlows(project.getId(), flowId);
+
+ List<Integer> refs = executorManager.getRunningFlows(project.getId(),
+ flowId);
if (!refs.isEmpty()) {
ret.put("execIds", refs);
}
}
-
- private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+
+ private void ajaxRestartFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {
return;
}
}
- private void ajaxPauseFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+ private void ajaxPauseFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {
return;
}
@@ -613,8 +674,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxResumeFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+ private void ajaxResumeFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {
return;
}
@@ -625,12 +689,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("resume", e.getMessage());
}
}
-
- private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+
+ private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
System.out.println("Fetching " + exFlow.getExecutionId());
-
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
@@ -641,18 +708,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (node.getUpdateTime() <= lastUpdateTime) {
continue;
}
-
- HashMap<String, Object> nodeObj = new HashMap<String,Object>();
- nodeObj.put("id", node.getJobId());
+
+ HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+ nodeObj.put("id", node.getId());
nodeObj.put("status", node.getStatus());
nodeObj.put("startTime", node.getStartTime());
nodeObj.put("endTime", node.getEndTime());
nodeObj.put("attempt", node.getAttempt());
-
+
if (node.getAttempt() > 0) {
nodeObj.put("pastAttempts", node.getAttemptObjects());
}
-
+
nodeList.add(nodeObj);
}
@@ -663,40 +730,42 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("submitTime", exFlow.getSubmitTime());
ret.put("updateTime", exFlow.getUpdateTime());
}
-
- private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+
+ private void ajaxFetchExecutableFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
System.out.println("Fetching " + exFlow.getExecutionId());
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.READ);
if (project == null) {
return;
}
-
+
ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
- ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String,Object>>();
+ ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String, Object>>();
for (ExecutableNode node : exFlow.getExecutableNodes()) {
- HashMap<String, Object> nodeObj = new HashMap<String,Object>();
- nodeObj.put("id", node.getJobId());
- nodeObj.put("level", node.getLevel());
+ HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+ nodeObj.put("id", node.getId());
nodeObj.put("status", node.getStatus());
nodeObj.put("startTime", node.getStartTime());
nodeObj.put("endTime", node.getEndTime());
-
+
// Add past attempts
if (node.getPastAttemptList() != null) {
ArrayList<Object> pastAttempts = new ArrayList<Object>();
- for (ExecutableNode.Attempt attempt: node.getPastAttemptList()) {
+ for (ExecutionAttempt attempt : node.getPastAttemptList()) {
pastAttempts.add(attempt.toObject());
}
nodeObj.put("pastAttempts", pastAttempts);
}
-
+
nodeList.add(nodeObj);
-
+
// Add edges
- for (String out: node.getOutNodes()) {
- HashMap<String, Object> edgeObj = new HashMap<String,Object>();
- edgeObj.put("from", node.getJobId());
+ for (String out : node.getOutNodes()) {
+ HashMap<String, Object> edgeObj = new HashMap<String, Object>();
+ edgeObj.put("from", node.getId());
edgeObj.put("target", out);
edgeList.add(edgeObj);
}
@@ -710,46 +779,53 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("submitTime", exFlow.getSubmitTime());
ret.put("submitUser", exFlow.getSubmitUser());
}
-
- private void ajaxAttemptExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+
+ private void ajaxAttemptExecuteFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user)
+ throws ServletException {
String projectName = getParam(req, "project");
String flowId = getParam(req, "flow");
-
- Project project = getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
+
+ Project project = getProjectAjaxByPermission(ret, projectName, user,
+ Type.EXECUTE);
if (project == null) {
ret.put("error", "Project '" + projectName + "' doesn't exist.");
return;
}
-
- ret.put("flow", flowId);
+
+ ret.put("flow", flowId);
Flow flow = project.getFlow(flowId);
if (flow == null) {
- ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project);
+ ret.put("error", "Flow '" + flowId
+ + "' cannot be found in project " + project);
return;
}
-
+
ajaxExecuteFlow(req, resp, ret, user);
}
-
- private void ajaxExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+
+ private void ajaxExecuteFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user)
+ throws ServletException {
String projectName = getParam(req, "project");
String flowId = getParam(req, "flow");
-
- Project project = getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
+
+ Project project = getProjectAjaxByPermission(ret, projectName, user,
+ Type.EXECUTE);
if (project == null) {
ret.put("error", "Project '" + projectName + "' doesn't exist.");
return;
}
-
- ret.put("flow", flowId);
+
+ ret.put("flow", flowId);
Flow flow = project.getFlow(flowId);
if (flow == null) {
- ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project);
+ ret.put("error", "Flow '" + flowId
+ + "' cannot be found in project " + project);
return;
}
-
- ExecutableFlow exflow = new ExecutableFlow(flow);
- exflow.setSubmitUser(user.getUserId());
+
+ ExecutableFlow exflow = new ExecutableFlow(project, flow);
exflow.addAllProxyUsers(project.getProxyUsers());
ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
@@ -760,26 +836,26 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
-
+
try {
- String message = executorManager.submitExecutableFlow(exflow);
+ String message = executorManager.submitExecutableFlow(exflow, user.getUserId());
ret.put("message", message);
- }
- catch (ExecutorManagerException e) {
+ } catch (ExecutorManagerException e) {
e.printStackTrace();
- ret.put("error", "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
+ ret.put("error", "Error submitting flow " + exflow.getFlowId()
+ + ". " + e.getMessage());
}
ret.put("execid", exflow.getExecutionId());
}
-
+
public class ExecutorVelocityHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
if (project == null) {
return String.valueOf(id);
}
-
+
return project.getName();
}
}
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index 09d0b36..a21248b 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -20,7 +20,6 @@ import azkaban.utils.Props;
import azkaban.utils.cache.Cache;
import azkaban.utils.cache.CacheManager;
import azkaban.utils.cache.Cache.EjectionPolicy;
-import azkaban.utils.cache.Element;
/**
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index 2bdfc7a..aac1fb1 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -20,6 +20,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.execapp.EventCollectorListener;
import azkaban.test.execapp.MockExecutorLoader;
@@ -136,15 +137,15 @@ public class LocalFlowWatcherTest {
Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
- ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
- System.out.println("Node " + node.getJobId() +
+ System.out.println("Node " + node.getId() +
" start: " + node.getStartTime() +
- " dependent on " + watchedNode.getJobId() +
+ " dependent on " + watchedNode.getId() +
" " + watchedNode.getEndTime() +
" diff: " + (node.getStartTime() - watchedNode.getEndTime()));
@@ -170,7 +171,7 @@ public class LocalFlowWatcherTest {
Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
- ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
@@ -185,7 +186,7 @@ public class LocalFlowWatcherTest {
Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
- System.out.println("Node " + node.getJobId() +
+ System.out.println("Node " + node.getId() +
" start: " + node.getStartTime() +
" dependent on " + watchedChild + " " + child.getEndTime() +
" diff: " + diff);
@@ -228,8 +229,9 @@ public class LocalFlowWatcherTest {
@SuppressWarnings("unchecked")
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+ Project project = new Project(1, "test");
Flow flow = Flow.flowFromObject(flowObj);
- ExecutableFlow execFlow = new ExecutableFlow(flow);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
execFlow.setExecutionId(execId);
execFlow.setExecutionPath(workingDir.getPath());
return execFlow;
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index edf520c..fdff895 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -20,6 +20,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.execapp.EventCollectorListener;
import azkaban.test.execapp.MockExecutorLoader;
@@ -137,15 +138,15 @@ public class RemoteFlowWatcherTest {
Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
- ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
- System.out.println("Node " + node.getJobId() +
+ System.out.println("Node " + node.getId() +
" start: " + node.getStartTime() +
- " dependent on " + watchedNode.getJobId() +
+ " dependent on " + watchedNode.getId() +
" " + watchedNode.getEndTime() +
" diff: " + (node.getStartTime() - watchedNode.getEndTime()));
@@ -170,7 +171,7 @@ public class RemoteFlowWatcherTest {
Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
- ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+ ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
@@ -185,7 +186,7 @@ public class RemoteFlowWatcherTest {
Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
- System.out.println("Node " + node.getJobId() +
+ System.out.println("Node " + node.getId() +
" start: " + node.getStartTime() +
" dependent on " + watchedChild + " " + child.getEndTime() +
" diff: " + diff);
@@ -228,8 +229,9 @@ public class RemoteFlowWatcherTest {
@SuppressWarnings("unchecked")
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+ Project project = new Project(1, "test");
Flow flow = Flow.flowFromObject(flowObj);
- ExecutableFlow execFlow = new ExecutableFlow(flow);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
execFlow.setExecutionId(execId);
execFlow.setExecutionPath(workingDir.getPath());
return execFlow;
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index da1ed27..18513a3 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -22,6 +22,7 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
@@ -327,7 +328,7 @@ public class FlowRunnerTest {
ExecutableNode node = flow.getExecutableNode(name);
if (node.getStatus() != status) {
- Assert.fail("Status of job " + node.getJobId() + " is " + node.getStatus() + " not " + status + " as expected.");
+ Assert.fail("Status of job " + node.getId() + " is " + node.getStatus() + " not " + status + " as expected.");
}
}
@@ -346,8 +347,11 @@ public class FlowRunnerTest {
@SuppressWarnings("unchecked")
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+ Project project = new Project(1, "myproject");
+ project.setVersion(2);
+
Flow flow = Flow.flowFromObject(flowObj);
- ExecutableFlow execFlow = new ExecutableFlow(flow);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
execFlow.setExecutionId(execId);
execFlow.setExecutionPath(workingDir.getPath());
return execFlow;
@@ -373,7 +377,7 @@ public class FlowRunnerTest {
//System.out.println("Node " + node.getJobId() + " start:" + startTime + " end:" + endTime + " previous:" + previousEndTime);
Assert.assertTrue("Checking start and end times", startTime > 0 && endTime >= startTime);
- Assert.assertTrue("Start time for " + node.getJobId() + " is " + startTime +" and less than " + previousEndTime, startTime >= previousEndTime);
+ Assert.assertTrue("Start time for " + node.getId() + " is " + startTime +" and less than " + previousEndTime, startTime >= previousEndTime);
for (String outNode : node.getOutNodes()) {
ExecutableNode childNode = flow.getExecutableNode(outNode);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 1f8edc2..0698483 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -78,7 +78,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
- Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
Assert.assertTrue(eventCollector.checkOrdering());
try {
@@ -110,7 +110,7 @@ public class JobRunnerTest {
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(!runner.isCancelled());
- Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
@@ -145,7 +145,7 @@ public class JobRunnerTest {
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(eventCollector.checkOrdering());
- Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == null);
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
@@ -175,7 +175,7 @@ public class JobRunnerTest {
// Give it 10 ms to fail.
Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
- Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == null);
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
// Log file and output files should not exist.
Props outputProps = runner.getOutputProps();
@@ -223,7 +223,7 @@ public class JobRunnerTest {
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
// Give it 10 ms to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
- Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
// Log file and output files should not exist.
File logFile = new File(runner.getLogFilePath());
@@ -268,7 +268,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
Assert.assertFalse(runner.isCancelled());
- Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
Assert.assertTrue(eventCollector.checkOrdering());
try {
@@ -350,8 +350,8 @@ public class JobRunnerTest {
ExecutableFlow flow = new ExecutableFlow();
flow.setExecutionId(execId);
ExecutableNode node = new ExecutableNode();
- node.setJobId(name);
- node.setExecutableFlow(flow);
+ node.setId(name);
+ node.setParentFlow(flow);
Props props = createProps(time, fail);
HashSet<String> proxyUsers = new HashSet<String>();
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 2b81f82..05641ec 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -65,7 +65,6 @@ public class MockExecutorLoader implements ExecutorLoader {
}
- @SuppressWarnings("unchecked")
@Override
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
ExecutableFlow toUpdate = flows.get(flow.getExecutionId());
@@ -76,24 +75,27 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public void uploadExecutableNode(ExecutableNode node, Props inputParams) throws ExecutorManagerException {
- nodes.put(node.getJobId(), ExecutableNode.createNodeFromObject(node.toObject(), null));
- jobUpdateCount.put(node.getJobId(), 1);
+ ExecutableNode exNode = new ExecutableNode();
+ exNode.fillExecutableFromMapObject(node.toObject());
+
+ nodes.put(node.getId(), exNode);
+ jobUpdateCount.put(node.getId(), 1);
}
@Override
public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
- ExecutableNode foundNode = nodes.get(node.getJobId());
+ ExecutableNode foundNode = nodes.get(node.getId());
foundNode.setEndTime(node.getEndTime());
foundNode.setStartTime(node.getStartTime());
foundNode.setStatus(node.getStatus());
foundNode.setUpdateTime(node.getUpdateTime());
- Integer value = jobUpdateCount.get(node.getJobId());
+ Integer value = jobUpdateCount.get(node.getId());
if (value == null) {
throw new ExecutorManagerException("The node has not been uploaded");
}
else {
- jobUpdateCount.put(node.getJobId(), ++value);
+ jobUpdateCount.put(node.getId(), ++value);
}
flowUpdateCount++;
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ba89a20..86995f5 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -245,7 +245,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow.getProjectId(), info.getProjectId());
Assert.assertEquals(flow.getVersion(), info.getVersion());
Assert.assertEquals(flow.getFlowId(), info.getFlowId());
- Assert.assertEquals(oldNode.getJobId(), info.getJobId());
+ Assert.assertEquals(oldNode.getId(), info.getJobId());
Assert.assertEquals(oldNode.getStatus(), info.getStatus());
Assert.assertEquals(oldNode.getStartTime(), info.getStartTime());
Assert.assertEquals("endTime = " + oldNode.getEndTime() + " info endTime = " + info.getEndTime(), oldNode.getEndTime(), info.getEndTime());
@@ -409,7 +409,8 @@ public class JdbcExecutorLoaderTest {
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
Flow flow = Flow.flowFromObject(flowObj);
- ExecutableFlow execFlow = new ExecutableFlow(executionId, flow);
+ ExecutableFlow execFlow = new ExecutableFlow(flow);
+ execFlow.setExecutionId(executionId);
return execFlow;
}
diff --git a/unit/java/azkaban/test/utils/PropsUtilsTest.java b/unit/java/azkaban/test/utils/PropsUtilsTest.java
index e2c914f..2f1ba02 100644
--- a/unit/java/azkaban/test/utils/PropsUtilsTest.java
+++ b/unit/java/azkaban/test/utils/PropsUtilsTest.java
@@ -88,7 +88,7 @@ public class PropsUtilsTest {
private void failIfNotException(Props props) {
try {
- Props resolved = PropsUtils.resolveProps(props);
+ PropsUtils.resolveProps(props);
Assert.fail();
}
catch (UndefinedPropertyException e) {