azkaban-aplcache

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 7f0a42f..9ae3178 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -33,6 +33,7 @@ package azkaban;
 public class Constants {
 
   // Azkaban Flow Versions
+  public static final double DEFAULT_AZKABAN_FLOW_VERSION = 1.0;
   public static final double AZKABAN_FLOW_VERSION_2_0 = 2.0;
 
   // Flow 2.0 file suffix
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
index 4b6598d..c29c418 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
@@ -44,6 +44,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
   public static final String LASTMODIFIEDTIME_PARAM = "lastModfiedTime";
   public static final String LASTMODIFIEDUSER_PARAM = "lastModifiedUser";
   public static final String SLAOPTIONS_PARAM = "slaOptions";
+  public static final String AZKABANFLOWVERSION_PARAM = "azkabanFlowVersion";
   private final HashSet<String> proxyUsers = new HashSet<>();
   private int executionId = -1;
   private int scheduleId = -1;
@@ -57,6 +58,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
   private String executionPath;
   private ExecutionOptions executionOptions;
   private List<SlaOption> slaOptions = new ArrayList<>();
+  private double azkabanFlowVersion;
 
   public ExecutableFlow(final Project project, final Flow flow) {
     this.projectId = project.getId();
@@ -65,6 +67,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.scheduleId = -1;
     this.lastModifiedTimestamp = project.getLastModifiedTimestamp();
     this.lastModifiedUser = project.getLastModifiedUser();
+    setAzkabanFlowVersion(flow.getAzkabanFlowVersion());
     this.setFlow(project, flow);
   }
 
@@ -209,6 +212,14 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.submitTime = submitTime;
   }
 
+  public double getAzkabanFlowVersion() {
+    return this.azkabanFlowVersion;
+  }
+
+  public void setAzkabanFlowVersion(final double azkabanFlowVersion) {
+    this.azkabanFlowVersion = azkabanFlowVersion;
+  }
+
   @Override
   public Map<String, Object> toObject() {
     final HashMap<String, Object> flowObj = new HashMap<>();
@@ -227,9 +238,9 @@ public class ExecutableFlow extends ExecutableFlowBase {
     flowObj.put(VERSION_PARAM, this.version);
     flowObj.put(LASTMODIFIEDTIME_PARAM, this.lastModifiedTimestamp);
     flowObj.put(LASTMODIFIEDUSER_PARAM, this.lastModifiedUser);
+    flowObj.put(AZKABANFLOWVERSION_PARAM, this.azkabanFlowVersion);
 
     flowObj.put(EXECUTIONOPTIONS_PARAM, this.executionOptions.toObject());
-    flowObj.put(VERSION_PARAM, this.version);
 
     final ArrayList<String> proxyUserList = new ArrayList<>(this.proxyUsers);
     flowObj.put(PROXYUSERS_PARAM, proxyUserList);
@@ -260,6 +271,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.lastModifiedTimestamp = flowObj.getLong(LASTMODIFIEDTIME_PARAM);
     this.lastModifiedUser = flowObj.getString(LASTMODIFIEDUSER_PARAM);
     this.submitTime = flowObj.getLong(SUBMITTIME_PARAM);
+    this.azkabanFlowVersion = flowObj.getDouble(AZKABANFLOWVERSION_PARAM);
 
     if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
       this.executionOptions =
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index ab43888..a346659 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -16,6 +16,7 @@
 
 package azkaban.flow;
 
+import azkaban.Constants;
 import azkaban.executor.mail.DefaultMailCreator;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,7 +49,7 @@ public class Flow {
 
   private boolean isLayedOut = false;
   private boolean isEmbeddedFlow = false;
-  private double azkabanFlowVersion;
+  private double azkabanFlowVersion = Constants.DEFAULT_AZKABAN_FLOW_VERSION;
 
   public Flow(final String id) {
     this.id = id;
diff --git a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
index b7a157d..3732595 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TypedMapWrapper.java
@@ -91,6 +91,24 @@ public class TypedMapWrapper<K, V> {
     }
   }
 
+  public Double getDouble(final K key) {
+    return getDouble(key, -1.0d);
+  }
+
+  public Double getDouble(final K key, final Double defaultVal) {
+    final Object obj = this.map.get(key);
+    if (obj == null) {
+      return defaultVal;
+    }
+    if (obj instanceof Double) {
+      return (Double) obj;
+    } else if (obj instanceof String) {
+      return Double.valueOf((String) obj);
+    } else {
+      return defaultVal;
+    }
+  }
+
   public Collection<String> getStringCollection(final K key) {
     final Object obj = this.map.get(key);
     return (Collection<String>) obj;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index efa641b..1e9bd11 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -105,8 +105,6 @@ public class FlowRunner extends EventHandler implements Runnable {
   // Thread safe swap queue for finishedExecutions.
   private final SwapQueue<ExecutableNode> finishedNodes;
   private final AzkabanEventReporter azkabanEventReporter;
-  // Flag to indicate whether to interpret the flow as the new Flow 2.0 definition.
-  private boolean isAzkabanFlowVersion20 = false;
   private Logger logger;
   private Appender flowAppender;
   private File logFile;
@@ -173,9 +171,6 @@ public class FlowRunner extends EventHandler implements Runnable {
     // where the uninitialized logger is used in flow preparing state
     createLogger(this.flow.getFlowId());
     this.azkabanEventReporter = azkabanEventReporter;
-
-    // Todo jamiesjc: Enable below check after DB change of project_flow_files table is rolled out.
-    // this.isAzkabanFlowVersion20 = checkAzkabanFlowVersion();
   }
 
   public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -204,10 +199,6 @@ public class FlowRunner extends EventHandler implements Runnable {
     return this.execDir;
   }
 
-  public void setAzkabanFlowVersion20(final boolean azkabanFlowVersion20) {
-    this.isAzkabanFlowVersion20 = azkabanFlowVersion20;
-  }
-
   @Override
   public void run() {
     try {
@@ -220,7 +211,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.logger.info("Updating initial flow directory.");
       updateFlow();
       this.logger.info("Fetching job and shared properties.");
-      if (!this.isAzkabanFlowVersion20) {
+      if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
         loadAllProperties();
       }
 
@@ -256,10 +247,6 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-  private boolean checkAzkabanFlowVersion() {
-    return this.projectLoader.isFlowFileUploaded(this.flow.getProjectId(), this.flow.getVersion());
-  }
-
   private void setupFlowExecution() {
     final int projectId = this.flow.getProjectId();
     final int version = this.flow.getVersion();
@@ -268,7 +255,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Add a bunch of common azkaban properties
     Props commonFlowProps = FlowUtils.addCommonFlowProperties(null, this.flow);
 
-    if (this.isAzkabanFlowVersion20) {
+    if (FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
       final Props flowProps = loadPropsFromYamlFile(this.flow.getId());
       if (flowProps != null) {
         flowProps.setParent(commonFlowProps);
@@ -675,7 +662,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     Props props = null;
 
-    if (!this.isAzkabanFlowVersion20) {
+    if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
       // 1. Shared properties (i.e. *.properties) for the jobs only. This takes
       // the
       // least precedence
@@ -724,7 +711,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private Props loadJobProps(final ExecutableNode node) throws IOException {
     Props props = null;
-    if (this.isAzkabanFlowVersion20) {
+    if (FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
       final String jobPath =
           node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
       props = loadPropsFromYamlFile(jobPath);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 3691030..a03d32b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -93,8 +93,6 @@ public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
     flowProps.put("props6", "flow6");
     flowProps.put("props5", "flow5");
     final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME, flowProps);
-    // Todo jamiesjc: remove below line after project_flow_files DB change is rolled out.
-    runner.setAzkabanFlowVersion20(isAzkabanFlowVersion20);
     final Map<String, ExecutableNode> nodeMap = new HashMap<>();
     createNodeMap(runner.getExecutableFlow(), nodeMap);
     final ExecutableFlow flow = runner.getExecutableFlow();