azkaban-uncached
Changes
src/java/azkaban/executor/ExecutableNode.java 37(+16 -21)
src/java/azkaban/executor/ExecutionOptions.java 91(+53 -38)
unit/executions/embedded2/joba.job 2(+2 -0)
unit/executions/embedded2/joba1.job 1(+1 -0)
unit/executions/embedded2/jobb.job 4(+4 -0)
unit/executions/embedded2/jobc.job 3(+3 -0)
unit/executions/embedded2/jobd.job 3(+3 -0)
unit/executions/embedded2/jobe.job 2(+2 -0)
unit/executions/embedded2/jobf.job 2(+2 -0)
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 185(+185 -0)
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 648acef..9b3f199 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -354,7 +354,7 @@ public class FlowRunner extends EventHandler implements Runnable {
for(JobRunner activeRunner : activeJobRunners) {
activeRunner.cancel();
}
-
+
flow.killNode(System.currentTimeMillis());
} catch (Exception e) {
logger.error(e);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index bc24043..a6d5b95 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -352,6 +352,13 @@ public class JobRunner extends EventHandler implements Runnable {
else {
logInfo("Starting job " + this.jobId + " at " + node.getStartTime());
}
+
+ // If it's an embedded flow, we'll add the nested flow info to the job conf
+ if (node.getExecutableFlow() != node.getParentFlow()) {
+ String subFlow = node.getExecutableFlow().getId() + "," + node.getPrintableId(",");
+ props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
+ }
+
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
node.setStatus(Status.RUNNING);
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index e9f81f7..9ee6aa2 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -45,7 +45,6 @@ public class ExecutableFlow extends ExecutableFlowBase {
private long submitTime = -1;
private String submitUser;
private String executionPath;
- private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashSet<String> proxyUsers = new HashSet<String>();
private ExecutionOptions executionOptions;
@@ -58,10 +57,6 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.setFlow(project, flow);
}
- public ExecutableFlow(Flow flow) {
- this.setFlow(null, flow);
- }
-
public ExecutableFlow() {
}
@@ -75,10 +70,6 @@ public class ExecutableFlow extends ExecutableFlowBase {
return this;
}
- public Collection<FlowProps> getFlowProps() {
- return flowProps.values();
- }
-
public void addAllProxyUsers(Collection<String> proxyUsers) {
this.proxyUsers.addAll(proxyUsers);
}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 34bff5d..d89a412 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -16,6 +16,7 @@
package azkaban.executor;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,12 +75,17 @@ public class ExecutableFlowBase extends ExecutableNode {
return -1;
}
+ public Collection<FlowProps> getFlowProps() {
+ return flowProps.values();
+ }
+
public String getFlowId() {
return flowId;
}
protected void setFlow(Project project, Flow flow) {
this.flowId = flow.getId();
+ flowProps.putAll(flow.getAllFlowProps());
for (Node node: flow.getNodes()) {
String id = node.getId();
@@ -100,6 +106,9 @@ public class ExecutableFlowBase extends ExecutableNode {
ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
+ if (sourceNode == null) {
+ System.out.println("Source node " + edge.getSourceId() + " doesn't exist");
+ }
sourceNode.addOutNode(edge.getTargetId());
targetNode.addInNode(edge.getSourceId());
}
@@ -250,16 +259,18 @@ public class ExecutableFlowBase extends ExecutableNode {
super.applyUpdateObject(updateData);
List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
- for (Map<String,Object> node: updatedNodes) {
-
- String id = (String)node.get(ID_PARAM);
- if (id == null) {
- // Legacy case
- id = (String)node.get("jobId");
+ if (updatedNodes != null) {
+ for (Map<String,Object> node: updatedNodes) {
+
+ String id = (String)node.get(ID_PARAM);
+ if (id == null) {
+ // Legacy case
+ id = (String)node.get("jobId");
+ }
+
+ ExecutableNode exNode = executableNodes.get(id);
+ exNode.applyUpdateObject(node);
}
-
- ExecutableNode exNode = executableNodes.get(id);
- exNode.applyUpdateObject(node);
}
}
@@ -314,7 +325,7 @@ public class ExecutableFlowBase extends ExecutableNode {
nodeloop:
for (ExecutableNode node: executableNodes.values()) {
- if(Status.isStatusFinished(node.getStatus())) {
+ if(Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
continue;
}
src/java/azkaban/executor/ExecutableNode.java 37(+16 -21)
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index e0f6522..c0333d0 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -1,6 +1,7 @@
package azkaban.executor;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -38,8 +39,8 @@ public class ExecutableNode {
private String jobSource;
// Path to top level props file
private String propsSource;
- private Set<String> inNodes = null;
- private Set<String> outNodes = null;
+ private Set<String> inNodes = new HashSet<String>();
+ private Set<String> outNodes = new HashSet<String>();
private Props inputProps;
private Props outputProps;
@@ -140,16 +141,10 @@ public class ExecutableNode {
}
public void addOutNode(String exNode) {
- if (outNodes == null) {
- outNodes = new HashSet<String>();
- }
outNodes.add(exNode);
}
public void addInNode(String exNode) {
- if (inNodes == null) {
- inNodes = new HashSet<String>();
- }
inNodes.add(exNode);
}
@@ -268,21 +263,21 @@ public class ExecutableNode {
objMap.put(TYPE_PARAM, type);
objMap.put(ATTEMPT_PARAM, attempt);
- if (inNodes != null) {
+ if (inNodes != null && !inNodes.isEmpty()) {
objMap.put(INNODES_PARAM, inNodes);
}
- if (outNodes != null) {
+ if (outNodes != null && !outNodes.isEmpty()) {
objMap.put(OUTNODES_PARAM, outNodes);
}
-// if (hasPropsSource()) {
-// objMap.put(PROPS_SOURCE_PARAM, this.propsSource);
-// }
+ if (hasPropsSource()) {
+ objMap.put(PROPS_SOURCE_PARAM, this.propsSource);
+ }
if (hasJobSource()) {
objMap.put(JOB_SOURCE_PARAM, this.jobSource);
}
- if (outputProps != null) {
+ if (outputProps != null && outputProps.size() > 0) {
objMap.put(OUTPUT_PROPS_PARAM, PropsUtils.toStringMap(outputProps, true));
}
@@ -307,17 +302,17 @@ public class ExecutableNode {
if (objMap.containsKey(INNODES_PARAM)) {
this.inNodes = new HashSet<String>();
- this.inNodes.addAll((List<String>)objMap.get(INNODES_PARAM));
+ this.inNodes.addAll((Collection<String>)objMap.get(INNODES_PARAM));
}
if (objMap.containsKey(OUTNODES_PARAM)) {
this.outNodes = new HashSet<String>();
- this.outNodes.addAll((List<String>)objMap.get(OUTNODES_PARAM));
+ this.outNodes.addAll((Collection<String>)objMap.get(OUTNODES_PARAM));
+ }
+
+ if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
+ this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
}
-//
-// if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
-// this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
-// }
if (objMap.containsKey(JOB_SOURCE_PARAM)) {
this.jobSource = (String)objMap.get(JOB_SOURCE_PARAM);
@@ -327,7 +322,7 @@ public class ExecutableNode {
this.outputProps = new Props(null, (Map<String,String>)objMap.get(OUTPUT_PROPS_PARAM));
}
- List<Object> pastAttempts = (List<Object>)objMap.get(PASTATTEMPTS_PARAM);
+ Collection<Object> pastAttempts = (Collection<Object>)objMap.get(PASTATTEMPTS_PARAM);
if (pastAttempts!=null) {
ArrayList<ExecutionAttempt> attempts = new ArrayList<ExecutionAttempt>();
for (Object attemptObj: pastAttempts) {
src/java/azkaban/executor/ExecutionOptions.java 91(+53 -38)
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index a91a6d0..cedc3f0 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -16,6 +16,20 @@ public class ExecutionOptions {
public static final String CONCURRENT_OPTION_PIPELINE="pipeline";
public static final String CONCURRENT_OPTION_IGNORE="ignore";
+ private static final String FLOW_PARAMETERS = "flowParameters";
+ private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
+ private static final String NOTIFY_ON_LAST_FAILURE = "notifyOnLastFailure";
+ private static final String SUCCESS_EMAILS = "successEmails";
+ private static final String FAILURE_EMAILS = "failureEmails";
+ private static final String FAILURE_ACTION = "failureAction";
+ private static final String PIPELINE_LEVEL = "pipelineLevel";
+ private static final String PIPELINE_EXECID = "pipelineExecId";
+ private static final String QUEUE_LEVEL = "queueLevel";
+ private static final String CONCURRENT_OPTION = "concurrentOption";
+ private static final String DISABLE = "disabled";
+ private static final String FAILURE_EMAILS_OVERRIDE = "failureEmailsOverride";
+ private static final String SUCCESS_EMAILS_OVERRIDE = "successEmailsOverride";
+
private boolean notifyOnFirstFailure = true;
private boolean notifyOnLastFailure = false;
private boolean failureEmailsOverride = false;
@@ -39,8 +53,8 @@ public class ExecutionOptions {
private Set<String> initiallyDisabledJobs = new HashSet<String>();
- public void setFlowParameters(Map<String,String> flowParam) {
- flowParameters.get(flowParam);
+ public void addAllFlowParameters(Map<String,String> flowParam) {
+ flowParameters.putAll(flowParam);
}
public Map<String,String> getFlowParameters() {
@@ -142,19 +156,19 @@ public class ExecutionOptions {
public Map<String,Object> toObject() {
HashMap<String,Object> flowOptionObj = new HashMap<String,Object>();
- flowOptionObj.put("flowParameters", this.flowParameters);
- flowOptionObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
- flowOptionObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
- flowOptionObj.put("successEmails", successEmails);
- flowOptionObj.put("failureEmails", failureEmails);
- flowOptionObj.put("failureAction", failureAction.toString());
- flowOptionObj.put("pipelineLevel", pipelineLevel);
- flowOptionObj.put("pipelineExecId", pipelineExecId);
- flowOptionObj.put("queueLevel", queueLevel);
- flowOptionObj.put("concurrentOption", concurrentOption);
- flowOptionObj.put("disabled", initiallyDisabledJobs);
- flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
- flowOptionObj.put("successEmailsOverride", successEmailsOverride);
+ flowOptionObj.put(FLOW_PARAMETERS, this.flowParameters);
+ flowOptionObj.put(NOTIFY_ON_FIRST_FAILURE, this.notifyOnFirstFailure);
+ flowOptionObj.put(NOTIFY_ON_LAST_FAILURE, this.notifyOnLastFailure);
+ flowOptionObj.put(SUCCESS_EMAILS, successEmails);
+ flowOptionObj.put(FAILURE_EMAILS, failureEmails);
+ flowOptionObj.put(FAILURE_ACTION, failureAction.toString());
+ flowOptionObj.put(PIPELINE_LEVEL, pipelineLevel);
+ flowOptionObj.put(PIPELINE_EXECID, pipelineExecId);
+ flowOptionObj.put(QUEUE_LEVEL, queueLevel);
+ flowOptionObj.put(CONCURRENT_OPTION, concurrentOption);
+ flowOptionObj.put(DISABLE, initiallyDisabledJobs);
+ flowOptionObj.put(FAILURE_EMAILS_OVERRIDE, failureEmailsOverride);
+ flowOptionObj.put(SUCCESS_EMAILS_OVERRIDE, successEmailsOverride);
return flowOptionObj;
}
@@ -167,46 +181,47 @@ public class ExecutionOptions {
Map<String,Object> optionsMap = (Map<String,Object>)obj;
ExecutionOptions options = new ExecutionOptions();
- if (optionsMap.containsKey("flowParameters")) {
- options.flowParameters = new HashMap<String, String>((Map<String,String>)optionsMap.get("flowParameters"));
+ if (optionsMap.containsKey(FLOW_PARAMETERS)) {
+ options.flowParameters = new HashMap<String, String>();
+ options.flowParameters.putAll((Map<String,String>)optionsMap.get(FLOW_PARAMETERS));
}
// Failure notification
- if (optionsMap.containsKey("notifyOnFirstFailure")) {
- options.notifyOnFirstFailure = (Boolean)optionsMap.get("notifyOnFirstFailure");
+ if (optionsMap.containsKey(NOTIFY_ON_FIRST_FAILURE)) {
+ options.notifyOnFirstFailure = (Boolean)optionsMap.get(NOTIFY_ON_FIRST_FAILURE);
}
- if (optionsMap.containsKey("notifyOnLastFailure")) {
- options.notifyOnLastFailure = (Boolean)optionsMap.get("notifyOnLastFailure");
+ if (optionsMap.containsKey(NOTIFY_ON_LAST_FAILURE)) {
+ options.notifyOnLastFailure = (Boolean)optionsMap.get(NOTIFY_ON_LAST_FAILURE);
}
- if (optionsMap.containsKey("concurrentOption")) {
- options.concurrentOption = (String)optionsMap.get("concurrentOption");
+ if (optionsMap.containsKey(CONCURRENT_OPTION)) {
+ options.concurrentOption = (String)optionsMap.get(CONCURRENT_OPTION);
}
- if (optionsMap.containsKey("disabled")) {
- options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
+ if (optionsMap.containsKey(DISABLE)) {
+ options.initiallyDisabledJobs = new HashSet<String>((Collection<String>)optionsMap.get(DISABLE));
}
// Failure action
- if (optionsMap.containsKey("failureAction")) {
- options.failureAction = FailureAction.valueOf((String)optionsMap.get("failureAction"));
+ if (optionsMap.containsKey(FAILURE_ACTION)) {
+ options.failureAction = FailureAction.valueOf((String)optionsMap.get(FAILURE_ACTION));
}
- options.pipelineLevel = (Integer)optionsMap.get("pipelineLevel");
- options.pipelineExecId = (Integer)optionsMap.get("pipelineExecId");
- options.queueLevel = (Integer)optionsMap.get("queueLevel");
+ options.pipelineLevel = (Integer)optionsMap.get(PIPELINE_LEVEL);
+ options.pipelineExecId = (Integer)optionsMap.get(PIPELINE_EXECID);
+ options.queueLevel = (Integer)optionsMap.get(QUEUE_LEVEL);
// Success emails
- if (optionsMap.containsKey("successEmails")) {
- options.setSuccessEmails((List<String>)optionsMap.get("successEmails"));
+ if (optionsMap.containsKey(SUCCESS_EMAILS)) {
+ options.setSuccessEmails((List<String>)optionsMap.get(SUCCESS_EMAILS));
}
// Failure emails
- if (optionsMap.containsKey("failureEmails")) {
- options.setFailureEmails((List<String>)optionsMap.get("failureEmails"));
+ if (optionsMap.containsKey(FAILURE_EMAILS)) {
+ options.setFailureEmails((List<String>)optionsMap.get(FAILURE_EMAILS));
}
- if (optionsMap.containsKey("successEmailsOverride")) {
- options.setSuccessEmailsOverridden((Boolean)optionsMap.get("successEmailsOverride"));
+ if (optionsMap.containsKey(SUCCESS_EMAILS_OVERRIDE)) {
+ options.setSuccessEmailsOverridden((Boolean)optionsMap.get(SUCCESS_EMAILS_OVERRIDE));
}
- if (optionsMap.containsKey("failureEmailsOverride")) {
- options.setFailureEmailsOverridden((Boolean)optionsMap.get("failureEmailsOverride"));
+ if (optionsMap.containsKey(FAILURE_EMAILS_OVERRIDE)) {
+ options.setFailureEmailsOverridden((Boolean)optionsMap.get(FAILURE_EMAILS_OVERRIDE));
}
return options;
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index f796378..4e82044 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -61,6 +61,7 @@ public enum Status {
switch (status) {
case RUNNING:
case FAILED_FINISHING:
+ case QUEUED:
return true;
default:
return false;
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7a8ffc4..1744af5 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -67,6 +67,11 @@ public class CommonJobProperties {
public static final String FLOW_ID = "azkaban.flow.flowid";
/**
+ * The nested flow id path
+ */
+ public static final String NESTED_FLOW_PATH = "azkaban.flow.nested.path";
+
+ /**
* The execution id. This should be unique per flow, but may not be due to
* restarts.
*/
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 694965d..1a6b945 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -52,7 +52,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
protected String _cwd;
- private volatile Props generatedPropeties;
+ private volatile Props generatedProperties;
protected AbstractProcessJob(String jobid, final Props sysProps, final Props jobProps, final Logger log) {
super(jobid, log);
@@ -83,7 +83,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
@Override
public Props getJobGeneratedProperties() {
- return generatedPropeties;
+ return generatedProperties;
}
/**
@@ -187,7 +187,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
}
public void generateProperties(final File outputFile) {
- generatedPropeties = loadOutputFileProps(outputFile);
+ generatedProperties = loadOutputFileProps(outputFile);
}
}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 7825694..b6db057 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -77,6 +77,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
for (Project project: projects) {
List<Triple<String, Boolean, Permission>> permissions = fetchPermissionsForProject(connection, project);
+
for (Triple<String, Boolean, Permission> entry: permissions) {
if(entry.getSecond()) {
project.setGroupPermission(entry.getFirst(), entry.getThird());
@@ -86,7 +87,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
}
}
}
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ProjectManagerException("Error retrieving all projects", e);
}
finally {
@@ -673,7 +675,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
}
catch (IOException e) {
throw new ProjectManagerException("Flow Upload failed.", e);
- } catch (SQLException e) {
+ }
+ catch (SQLException e) {
throw new ProjectManagerException("Flow Upload failed commit.", e);
}
finally {
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index 4e71930..9ec3ea1 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -74,7 +74,7 @@ public class HttpRequestUtils {
}
Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
- execOptions.setFlowParameters(flowParamGroup);
+ execOptions.addAllFlowParameters(flowParamGroup);
if (hasParam(req, "disabled")) {
String disabled = getParam(req, "disabled");
diff --git a/unit/executions/embedded/innerFlow.job b/unit/executions/embedded/innerFlow.job
index da71d64..e9b3b89 100644
--- a/unit/executions/embedded/innerFlow.job
+++ b/unit/executions/embedded/innerFlow.job
@@ -1,5 +1,4 @@
type=javaprocess
-java.class=azkaban.test.executor.SleepJavaJob
seconds=1
fail=false
dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded2/innerFlow.job b/unit/executions/embedded2/innerFlow.job
new file mode 100644
index 0000000..dfa0e9d
--- /dev/null
+++ b/unit/executions/embedded2/innerFlow.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded2/innerJobA.job b/unit/executions/embedded2/innerJobA.job
new file mode 100644
index 0000000..35ebd72
--- /dev/null
+++ b/unit/executions/embedded2/innerJobA.job
@@ -0,0 +1,2 @@
+type=test
+
diff --git a/unit/executions/embedded2/innerJobB.job b/unit/executions/embedded2/innerJobB.job
new file mode 100644
index 0000000..dca1223
--- /dev/null
+++ b/unit/executions/embedded2/innerJobB.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobA
diff --git a/unit/executions/embedded2/innerJobC.job b/unit/executions/embedded2/innerJobC.job
new file mode 100644
index 0000000..dca1223
--- /dev/null
+++ b/unit/executions/embedded2/innerJobC.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobA
unit/executions/embedded2/joba.job 2(+2 -0)
diff --git a/unit/executions/embedded2/joba.job b/unit/executions/embedded2/joba.job
new file mode 100644
index 0000000..80ad69e
--- /dev/null
+++ b/unit/executions/embedded2/joba.job
@@ -0,0 +1,2 @@
+type=test
+param1=joba.1
\ No newline at end of file
unit/executions/embedded2/joba1.job 1(+1 -0)
diff --git a/unit/executions/embedded2/joba1.job b/unit/executions/embedded2/joba1.job
new file mode 100644
index 0000000..98fd5f5
--- /dev/null
+++ b/unit/executions/embedded2/joba1.job
@@ -0,0 +1 @@
+type=test
unit/executions/embedded2/jobb.job 4(+4 -0)
diff --git a/unit/executions/embedded2/jobb.job b/unit/executions/embedded2/jobb.job
new file mode 100644
index 0000000..efbaa95
--- /dev/null
+++ b/unit/executions/embedded2/jobb.job
@@ -0,0 +1,4 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
+param
\ No newline at end of file
unit/executions/embedded2/jobc.job 3(+3 -0)
diff --git a/unit/executions/embedded2/jobc.job b/unit/executions/embedded2/jobc.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded2/jobc.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embedded2/jobd.job 3(+3 -0)
diff --git a/unit/executions/embedded2/jobd.job b/unit/executions/embedded2/jobd.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded2/jobd.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embedded2/jobe.job 2(+2 -0)
diff --git a/unit/executions/embedded2/jobe.job b/unit/executions/embedded2/jobe.job
new file mode 100644
index 0000000..331a81e
--- /dev/null
+++ b/unit/executions/embedded2/jobe.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=jobb,jobc,jobd
unit/executions/embedded2/jobf.job 2(+2 -0)
diff --git a/unit/executions/embedded2/jobf.job b/unit/executions/embedded2/jobf.job
new file mode 100644
index 0000000..b1b00ce
--- /dev/null
+++ b/unit/executions/embedded2/jobf.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=jobe,joba1
diff --git a/unit/executions/embedded2/test1.properties b/unit/executions/embedded2/test1.properties
new file mode 100644
index 0000000..120fc25
--- /dev/null
+++ b/unit/executions/embedded2/test1.properties
@@ -0,0 +1,4 @@
+param1=test1.1
+param2=test1.2
+param3=test1.3
+param4=test1.4
\ No newline at end of file
diff --git a/unit/executions/embedded2/test2.properties b/unit/executions/embedded2/test2.properties
new file mode 100644
index 0000000..7df7744
--- /dev/null
+++ b/unit/executions/embedded2/test2.properties
@@ -0,0 +1,4 @@
+param5=test2.5
+param6=test2.6
+param7=test2.7
+param8=test2.8
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 18513a3..7634a56 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -24,6 +24,7 @@ import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.test.executor.InteractiveTestJob;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
@@ -45,7 +46,10 @@ public class FlowRunnerTest {
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.registerJobType("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
+
+ InteractiveTestJob.clearTestJobs();
}
@After
@@ -190,7 +194,7 @@ public class FlowRunnerTest {
Assert.assertTrue(runner.isCancelled());
- Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+ Assert.assertTrue("Expected flow " + Status.KILLED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.KILLED);
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2d", Status.FAILED);
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 185(+185 -0)
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
new file mode 100644
index 0000000..def90ca
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -0,0 +1,185 @@
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+public class FlowRunnerTest2 {
+ private File workingDir;
+ private JobTypeManager jobtypeManager;
+ private ProjectLoader fakeProjectLoader;
+ private ExecutorLoader fakeExecutorLoader;
+ private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+ private Project project;
+ private Map<String, Flow> flowMap;
+ private static int id=101;
+
+ public FlowRunnerTest2() {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("Create temp dir");
+ workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ fakeProjectLoader = new MockProjectLoader(workingDir);
+ fakeExecutorLoader = new MockExecutorLoader();
+ project = new Project(1, "testProject");
+
+ File dir = new File("unit/executions/embedded2");
+ prepareProject(dir);
+
+ InteractiveTestJob.clearTestJobs();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ System.out.println("Teardown temp dir");
+ if (workingDir != null) {
+ FileUtils.deleteDirectory(workingDir);
+ workingDir = null;
+ }
+ }
+
+ @Test
+ public void testBasicRun() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. STEP ONE: START FLOW
+ ExecutableFlow flow = runner.getExecutableFlow();
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ runFlowRunnerInThread(runner);
+ pause(1000);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+
+ compareStates(expectedStateMap, nodeMap);
+ ExecutableNode node = nodeMap.get("joba");
+ Props props = node.getInputProps();
+ Assert.assertEquals("joba.1", props.get("param1"));
+ Assert.assertEquals("test1.2", props.get("param2"));
+ Assert.assertEquals("test1.3", props.get("param3"));
+ Assert.assertEquals("override.4", props.get("param4"));
+ Assert.assertEquals("test2.5", props.get("param5"));
+ Assert.assertEquals("test2.6", props.get("param6"));
+ Assert.assertEquals("test2.7", props.get("param7"));
+ Assert.assertEquals("test2.8", props.get("param8"));
+
+ // Make joba successful
+
+
+ }
+
+ private Thread runFlowRunnerInThread(FlowRunner runner) {
+ Thread thread = new Thread(runner);
+ thread.start();
+ return thread;
+ }
+
+ private void pause(long millisec) {
+ synchronized(this) {
+ try {
+ wait(millisec);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void createExpectedStateMap(ExecutableFlowBase flow, Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ expectedStateMap.put(node.getPrintableId(), node.getStatus());
+ nodeMap.put(node.getPrintableId(), node);
+
+ if (node instanceof ExecutableFlowBase) {
+ createExpectedStateMap((ExecutableFlowBase)node, expectedStateMap, nodeMap);
+ }
+ }
+ }
+
+ private void compareStates(Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+ for (String printedId: expectedStateMap.keySet()) {
+ Status expectedStatus = expectedStateMap.get(printedId);
+ ExecutableNode node = nodeMap.get(printedId);
+
+ if (expectedStatus != node.getStatus()) {
+ Assert.fail("Expected values do not match for " + printedId + ". Expected " + expectedStatus + ", instead received " + node.getStatus());
+ }
+ }
+ }
+
+ private void prepareProject(File directory) throws ProjectManagerException, IOException {
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ loader.loadProjectFlow(directory);
+ if (!loader.getErrors().isEmpty()) {
+ for (String error: loader.getErrors()) {
+ System.out.println(error);
+ }
+
+ throw new RuntimeException("Errors found in setup");
+ }
+
+ flowMap = loader.getFlowMap();
+ project.setFlows(flowMap);
+ FileUtils.copyDirectory(directory, workingDir);
+ }
+
+ private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName) throws Exception {
+ Flow flow = flowMap.get(flowName);
+
+ int exId = id++;
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionPath(workingDir.getPath());
+ exFlow.setExecutionId(exId);
+
+ Map<String, String> flowParam = new HashMap<String, String>();
+ flowParam.put("param4", "override.4");
+ flowParam.put("param10", "override.10");
+ flowParam.put("param11", "override.11");
+ exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
+ fakeExecutorLoader.uploadExecutableFlow(exFlow);
+
+ FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+
+ runner.addListener(eventCollector);
+
+ return runner;
+ }
+}
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 6b6ed94..4592720 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -119,7 +119,7 @@ public class ExecutableFlowTest {
HashMap<String, String> flowProps = new HashMap<String,String>();
flowProps.put("la", "fa");
- options.setFlowParameters(flowProps);
+ options.addAllFlowParameters(flowProps);
exFlow.setExecutionOptions(options);
Object obj = exFlow.toObject();
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
new file mode 100644
index 0000000..3141304
--- /dev/null
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -0,0 +1,89 @@
+package azkaban.test.executor;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.utils.Props;
+
+public class InteractiveTestJob extends AbstractProcessJob {
+ private static ConcurrentHashMap<String, InteractiveTestJob> testJobs = new ConcurrentHashMap<String, InteractiveTestJob>();
+ private Props generatedProperties = new Props();
+ private boolean isWaiting = true;
+ private boolean succeed = true;
+
+ public static InteractiveTestJob getTestJob(String name) {
+ return testJobs.get(name);
+ }
+
+ public static void clearTestJobs() {
+ testJobs.clear();
+ }
+
+ public InteractiveTestJob(String jobId, Props sysProps, Props jobProps, Logger log) {
+ super(jobId, sysProps, jobProps, log);
+ }
+
+ @Override
+ public void run() throws Exception {
+ String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
+ String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + "," + this.getId();
+ testJobs.put(id, this);
+
+ while(isWaiting) {
+ synchronized(this) {
+ try {
+ wait(30000);
+ } catch (InterruptedException e) {
+ }
+
+ if (!isWaiting) {
+ if (!succeed) {
+ throw new RuntimeException("Forced failure of " + getId());
+ }
+ else {
+ info("Job " + getId() + " succeeded.");
+ }
+ }
+ }
+ }
+ }
+
+ public void failJob() {
+ synchronized(this) {
+ succeed = false;
+ isWaiting = false;
+ this.notify();
+ }
+ }
+
+ public void succeedJob() {
+ synchronized(this) {
+ succeed = true;
+ isWaiting = false;
+ this.notify();
+ }
+ }
+
+ public void succeedJob(Props generatedProperties) {
+ synchronized(this) {
+ this.generatedProperties = generatedProperties;
+ succeed = true;
+ isWaiting = false;
+ this.notify();
+ }
+ }
+
+ @Override
+ public Props getJobGeneratedProperties() {
+ return generatedProperties;
+ }
+
+ @Override
+ public void cancel() throws InterruptedException {
+ info("Killing job");
+ failJob();
+ }
+}
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 86995f5..a51ea2a 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -32,6 +32,7 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.database.DataSourceUtils;
+import azkaban.project.Project;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
@@ -409,7 +410,11 @@ public class JdbcExecutorLoaderTest {
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
Flow flow = Flow.flowFromObject(flowObj);
- ExecutableFlow execFlow = new ExecutableFlow(flow);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
execFlow.setExecutionId(executionId);
return execFlow;
@@ -421,7 +426,11 @@ public class JdbcExecutorLoaderTest {
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
Flow flow = Flow.flowFromObject(flowObj);
- ExecutableFlow execFlow = new ExecutableFlow(flow);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
return execFlow;
}