Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 7f0a42f..9ae3178 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -33,6 +33,7 @@ package azkaban;
public class Constants {
// Azkaban Flow Versions
+ public static final double DEFAULT_AZKABAN_FLOW_VERSION = 1.0;
public static final double AZKABAN_FLOW_VERSION_2_0 = 2.0;
// Flow 2.0 file suffix
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
index 4b6598d..c29c418 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
@@ -44,6 +44,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
public static final String LASTMODIFIEDTIME_PARAM = "lastModfiedTime";
public static final String LASTMODIFIEDUSER_PARAM = "lastModifiedUser";
public static final String SLAOPTIONS_PARAM = "slaOptions";
+ public static final String AZKABANFLOWVERSION_PARAM = "azkabanFlowVersion";
private final HashSet<String> proxyUsers = new HashSet<>();
private int executionId = -1;
private int scheduleId = -1;
@@ -57,6 +58,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
private String executionPath;
private ExecutionOptions executionOptions;
private List<SlaOption> slaOptions = new ArrayList<>();
+ private double azkabanFlowVersion;
public ExecutableFlow(final Project project, final Flow flow) {
this.projectId = project.getId();
@@ -65,6 +67,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.scheduleId = -1;
this.lastModifiedTimestamp = project.getLastModifiedTimestamp();
this.lastModifiedUser = project.getLastModifiedUser();
+ setAzkabanFlowVersion(flow.getAzkabanFlowVersion());
this.setFlow(project, flow);
}
@@ -209,6 +212,14 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.submitTime = submitTime;
}
+ public double getAzkabanFlowVersion() {
+ return this.azkabanFlowVersion;
+ }
+
+ public void setAzkabanFlowVersion(final double azkabanFlowVersion) {
+ this.azkabanFlowVersion = azkabanFlowVersion;
+ }
+
@Override
public Map<String, Object> toObject() {
final HashMap<String, Object> flowObj = new HashMap<>();
@@ -227,9 +238,9 @@ public class ExecutableFlow extends ExecutableFlowBase {
flowObj.put(VERSION_PARAM, this.version);
flowObj.put(LASTMODIFIEDTIME_PARAM, this.lastModifiedTimestamp);
flowObj.put(LASTMODIFIEDUSER_PARAM, this.lastModifiedUser);
+ flowObj.put(AZKABANFLOWVERSION_PARAM, this.azkabanFlowVersion);
flowObj.put(EXECUTIONOPTIONS_PARAM, this.executionOptions.toObject());
- flowObj.put(VERSION_PARAM, this.version);
final ArrayList<String> proxyUserList = new ArrayList<>(this.proxyUsers);
flowObj.put(PROXYUSERS_PARAM, proxyUserList);
@@ -260,6 +271,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.lastModifiedTimestamp = flowObj.getLong(LASTMODIFIEDTIME_PARAM);
this.lastModifiedUser = flowObj.getString(LASTMODIFIEDUSER_PARAM);
this.submitTime = flowObj.getLong(SUBMITTIME_PARAM);
+ this.azkabanFlowVersion = flowObj.getDouble(AZKABANFLOWVERSION_PARAM);
if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
this.executionOptions =
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index ab43888..a346659 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -16,6 +16,7 @@
package azkaban.flow;
+import azkaban.Constants;
import azkaban.executor.mail.DefaultMailCreator;
import java.util.ArrayList;
import java.util.Collection;
@@ -48,7 +49,7 @@ public class Flow {
private boolean isLayedOut = false;
private boolean isEmbeddedFlow = false;
- private double azkabanFlowVersion;
+ private double azkabanFlowVersion = Constants.DEFAULT_AZKABAN_FLOW_VERSION;
public Flow(final String id) {
this.id = id;
diff --git a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
index b7a157d..3732595 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
@@ -91,6 +91,24 @@ public class TypedMapWrapper<K, V> {
}
}
+ public Double getDouble(final K key) {
+ return getDouble(key, -1.0d);
+ }
+
+ public Double getDouble(final K key, final Double defaultVal) {
+ final Object obj = this.map.get(key);
+ if (obj == null) {
+ return defaultVal;
+ }
+ if (obj instanceof Double) {
+ return (Double) obj;
+ } else if (obj instanceof String) {
+ return Double.valueOf((String) obj);
+ } else {
+ return defaultVal;
+ }
+ }
+
public Collection<String> getStringCollection(final K key) {
final Object obj = this.map.get(key);
return (Collection<String>) obj;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index efa641b..1e9bd11 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -105,8 +105,6 @@ public class FlowRunner extends EventHandler implements Runnable {
// Thread safe swap queue for finishedExecutions.
private final SwapQueue<ExecutableNode> finishedNodes;
private final AzkabanEventReporter azkabanEventReporter;
- // Flag to indicate whether to interpret the flow as the new Flow 2.0 definition.
- private boolean isAzkabanFlowVersion20 = false;
private Logger logger;
private Appender flowAppender;
private File logFile;
@@ -173,9 +171,6 @@ public class FlowRunner extends EventHandler implements Runnable {
// where the uninitialized logger is used in flow preparing state
createLogger(this.flow.getFlowId());
this.azkabanEventReporter = azkabanEventReporter;
-
- // Todo jamiesjc: Enable below check after DB change of project_flow_files table is rolled out.
- // this.isAzkabanFlowVersion20 = checkAzkabanFlowVersion();
}
public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -204,10 +199,6 @@ public class FlowRunner extends EventHandler implements Runnable {
return this.execDir;
}
- public void setAzkabanFlowVersion20(final boolean azkabanFlowVersion20) {
- this.isAzkabanFlowVersion20 = azkabanFlowVersion20;
- }
-
@Override
public void run() {
try {
@@ -220,7 +211,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.logger.info("Updating initial flow directory.");
updateFlow();
this.logger.info("Fetching job and shared properties.");
- if (!this.isAzkabanFlowVersion20) {
+ if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
loadAllProperties();
}
@@ -256,10 +247,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private boolean checkAzkabanFlowVersion() {
- return this.projectLoader.isFlowFileUploaded(this.flow.getProjectId(), this.flow.getVersion());
- }
-
private void setupFlowExecution() {
final int projectId = this.flow.getProjectId();
final int version = this.flow.getVersion();
@@ -268,7 +255,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Add a bunch of common azkaban properties
Props commonFlowProps = FlowUtils.addCommonFlowProperties(null, this.flow);
- if (this.isAzkabanFlowVersion20) {
+ if (FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
final Props flowProps = loadPropsFromYamlFile(this.flow.getId());
if (flowProps != null) {
flowProps.setParent(commonFlowProps);
@@ -675,7 +662,7 @@ public class FlowRunner extends EventHandler implements Runnable {
Props props = null;
- if (!this.isAzkabanFlowVersion20) {
+ if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
// 1. Shared properties (i.e. *.properties) for the jobs only. This takes
// the
// least precedence
@@ -724,7 +711,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private Props loadJobProps(final ExecutableNode node) throws IOException {
Props props = null;
- if (this.isAzkabanFlowVersion20) {
+ if (FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
final String jobPath =
node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
props = loadPropsFromYamlFile(jobPath);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 3691030..a03d32b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -93,8 +93,6 @@ public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
flowProps.put("props6", "flow6");
flowProps.put("props5", "flow5");
final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME, flowProps);
- // Todo jamiesjc: remove below line after project_flow_files DB change is rolled out.
- runner.setAzkabanFlowVersion20(isAzkabanFlowVersion20);
final Map<String, ExecutableNode> nodeMap = new HashMap<>();
createNodeMap(runner.getExecutableFlow(), nodeMap);
final ExecutableFlow flow = runner.getExecutableFlow();