azkaban-aplcache

Address comments in PR# 1544 (#1574) * Address comments. *

12/12/2017 7:46:50 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index a2e496e..ee26a14 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -217,7 +217,7 @@ class AzkabanProjectLoader {
           final int newFlowVersion = this.projectLoader
               .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
           this.projectLoader
-              .uploadFlowFile(project.getId(), newProjectVersion, newFlowVersion, file);
+              .uploadFlowFile(project.getId(), newProjectVersion, file, newFlowVersion);
         }
       } else {
         throw new ProjectManagerException("Invalid type of flow loader.");
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index ca3e1d7..8631aba 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -26,7 +26,6 @@ import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -110,8 +109,8 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
         final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
         final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
         this.flowMap.put(flow.getId(), flow);
-      } catch (final FileNotFoundException e) {
-        logger.error("Error loading flow yaml files", e);
+      } catch (final Exception e) {
+        logger.error("Error loading flow yaml file. ", e);
       }
     }
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index d16c7e4..8c719d0 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -22,7 +22,6 @@ import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.FileFilter;
-import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -59,8 +58,8 @@ public class FlowLoaderUtils {
           logger.error("Error getting props for " + path);
         }
       }
-    } catch (final FileNotFoundException e) {
-      logger.error("Failed to get props, error loading flow YAML file " + flowFile);
+    } catch (final Exception e) {
+      logger.error("Failed to get props, error loading flow YAML file. ", e);
     }
     return null;
   }
@@ -95,8 +94,8 @@ public class FlowLoaderUtils {
     try {
       final NodeBean nodeBean = loader.load(flowFile);
       return loader.toFlowTrigger(nodeBean.getTrigger());
-    } catch (final FileNotFoundException e) {
-      logger.error("Failed to get flow trigger, error loading flow YAML file " + flowFile);
+    } catch (final Exception e) {
+      logger.error("Failed to get flow trigger, error loading flow YAML file. ", e);
     }
     return null;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 2ca61bd..eaa607a 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -974,8 +974,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   }
 
   @Override
-  public void uploadFlowFile(final int projectId, final int projectVersion, final int flowVersion,
-      final File flowFile) throws ProjectManagerException {
+  public void uploadFlowFile(final int projectId, final int projectVersion, final File flowFile,
+      final int flowVersion) throws ProjectManagerException {
     logger.info(String
         .format(
             "Uploading flow file %s, version %d for project %d, version %d, file length is [%d bytes]",
@@ -1015,31 +1015,37 @@ public class JdbcProjectImpl implements ProjectLoader {
 
   @Override
   public File getUploadedFlowFile(final int projectId, final int projectVersion,
-      final int flowVersion, final String flowName) throws ProjectManagerException {
+      final String flowFileName, final int flowVersion, final File tempDir)
+      throws ProjectManagerException, IOException {
     final FlowFileResultHandler handler = new FlowFileResultHandler();
 
     final List<byte[]> data;
-    // Todo jamiesjc: delete the flow file after used.
-    final File file = new File(this.tempDir, flowName);
+    // Created separate temp directory for each flow file to avoid overwriting the same file by
+    // multiple threads concurrently. Flow file name will be interpret as the flow name when
+    // parsing the yaml flow file, so it has to be specific.
+    final File file = new File(tempDir, flowFileName);
     try (final FileOutputStream output = new FileOutputStream(file);
         final BufferedOutputStream bufferedStream = new BufferedOutputStream(output)) {
       try {
         data = this.dbOperator
             .query(FlowFileResultHandler.SELECT_FLOW_FILE, handler,
-                projectId, projectVersion, flowName, flowVersion);
+                projectId, projectVersion, flowFileName, flowVersion);
       } catch (final SQLException e) {
-        logger.error(e);
-        throw new ProjectManagerException("Failed to query uploaded flow file " + flowName + ".",
-            e);
+        throw new ProjectManagerException(
+            "Failed to query uploaded flow file for project " + projectId + " version "
+                + projectVersion + ", flow file " + flowFileName + " version " + flowVersion, e);
       }
 
-      try {
-        bufferedStream.write(data.get(0));
-      } catch (final IOException e) {
-        throw new ProjectManagerException("Error writing flow file" + flowName, e);
+      if (data == null || data.isEmpty()) {
+        throw new ProjectManagerException(
+            "No flow file could be found in DB table for project " + projectId + " version " +
+                projectVersion + ", flow file " + flowFileName + " version " + flowVersion);
       }
+      bufferedStream.write(data.get(0));
     } catch (final IOException e) {
-      throw new ProjectManagerException("Error creating output stream for flow file" + flowName, e);
+      throw new ProjectManagerException(
+          "Error writing to output stream for project " + projectId + " version " + projectVersion
+              + ", flow file " + flowFileName + " version " + flowVersion, e);
     }
     return file;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 945fff3..2b303d3 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -23,7 +23,6 @@ import azkaban.Constants;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.Set;
@@ -35,11 +34,15 @@ import org.yaml.snakeyaml.Yaml;
  */
 public class NodeBeanLoader {
 
-  public NodeBean load(final File flowFile) throws FileNotFoundException {
-    checkArgument(flowFile.exists());
+  public NodeBean load(final File flowFile) throws Exception {
+    checkArgument(flowFile != null && flowFile.exists());
     checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
 
     final NodeBean nodeBean = new Yaml().loadAs(new FileInputStream(flowFile), NodeBean.class);
+    if (nodeBean == null) {
+      throw new ProjectManagerException(
+          "Failed to load flow file " + flowFile.getName() + ". Node bean is null .");
+    }
     nodeBean.setName(getFlowName(flowFile));
     nodeBean.setType(Constants.FLOW_NODE_TYPE);
     return nodeBean;
@@ -95,7 +98,7 @@ public class NodeBeanLoader {
   }
 
   public String getFlowName(final File flowFile) {
-    checkArgument(flowFile.exists());
+    checkArgument(flowFile != null && flowFile.exists());
     checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
 
     return Files.getNameWithoutExtension(flowFile.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 5af9ef3..41b5ebd 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -23,6 +23,7 @@ import azkaban.user.User;
 import azkaban.utils.Props;
 import azkaban.utils.Triple;
 import java.io.File;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -204,14 +205,15 @@ public interface ProjectLoader {
   /**
    * Uploads flow file.
    */
-  void uploadFlowFile(int projectId, int projectVersion, int flowVersion, File flowFile)
+  void uploadFlowFile(int projectId, int projectVersion, File flowFile, int flowVersion)
       throws ProjectManagerException;
 
   /**
    * Gets flow file that's uploaded.
    */
-  File getUploadedFlowFile(int projectId, int projectVersion, int flowVersion, String flowName)
-      throws ProjectManagerException;
+  File getUploadedFlowFile(int projectId, int projectVersion, String flowFileName, int
+      flowVersion, final File tempDir)
+      throws ProjectManagerException, IOException;
 
   /**
    * Gets the latest flow version.
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index ad91a23..fe36a47 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -113,7 +113,7 @@ public class AzkabanProjectLoaderTest {
     verify(this.storageManager)
         .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
     verify(this.projectLoader)
-        .uploadFlowFile(eq(this.ID), eq(this.VERSION + 1), eq(flowVersion + 1), any(File.class));
+        .uploadFlowFile(eq(this.ID), eq(this.VERSION + 1), any(File.class), eq(flowVersion + 1));
 
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index c4d3e00..dd86418 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -27,6 +27,7 @@ import azkaban.user.User;
 import azkaban.utils.Md5Hasher;
 import azkaban.utils.Props;
 import azkaban.utils.Triple;
+import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -357,21 +358,22 @@ public class JdbcProjectImplTest {
   @Test
   public void testUploadFlowFile() throws Exception {
     final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
-    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
-
+    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION);
+    final File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
     final File file = this.loader
-        .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, BASIC_FLOW_FILE);
+        .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, BASIC_FLOW_FILE, FLOW_VERSION, tempDir);
     assertThat(file.getName()).isEqualTo(BASIC_FLOW_FILE);
-    FileUtils.contentEquals(testYamlFile, file);
+    assertThat(FileUtils.contentEquals(testYamlFile, file)).isTrue();
   }
 
   @Test
   public void testDuplicateUploadFlowFileException() throws Exception {
     final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
-    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION);
 
     assertThatThrownBy(
-        () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+        () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION))
         .isInstanceOf(ProjectManagerException.class)
         .hasMessageContaining(
             "Error uploading flow file " + BASIC_FLOW_FILE + ", version " + FLOW_VERSION + ".");
@@ -382,7 +384,7 @@ public class JdbcProjectImplTest {
     final File testYamlFile = ExecutionsTestUtil.getFlowFile(LARGE_FLOW_YAML_DIR, LARGE_FLOW_FILE);
 
     assertThatThrownBy(
-        () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+        () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION))
         .isInstanceOf(ProjectManagerException.class)
         .hasMessageContaining(
             "Flow file length exceeds 10 MB limit.");
@@ -395,7 +397,7 @@ public class JdbcProjectImplTest {
     assertThat(
         this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
         .isEqualTo(0);
-    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION);
     assertThat(
         this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
         .isEqualTo(FLOW_VERSION);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 8b81297..15383cc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -52,6 +52,7 @@ import azkaban.utils.Props;
 import azkaban.utils.SwapQueue;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -66,6 +67,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
@@ -267,7 +269,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     Props commonFlowProps = FlowUtils.addCommonFlowProperties(null, this.flow);
 
     if (this.isAzkabanFlowVersion20) {
-      final Props flowProps = loadFlowPropsFromYamlFile();
+      final Props flowProps = loadPropsFromYamlFile(this.flow.getId());
       if (flowProps != null) {
         flowProps.setParent(commonFlowProps);
         commonFlowProps = flowProps;
@@ -723,7 +725,9 @@ public class FlowRunner extends EventHandler implements Runnable {
   private Props loadJobProps(final ExecutableNode node) throws IOException {
     Props props = null;
     if (this.isAzkabanFlowVersion20) {
-      props = loadJobPropsFromYamlFile(node);
+      final String jobPath =
+          node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
+      props = loadPropsFromYamlFile(jobPath);
       if (props == null) {
         this.logger.info("Job props loaded from yaml file is empty for job " + node.getId());
         return props;
@@ -768,46 +772,41 @@ public class FlowRunner extends EventHandler implements Runnable {
     return props;
   }
 
-  private Props loadFlowPropsFromYamlFile() {
-    final File flowFile = getFlowFile();
-    final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(this.flow.getId(), flowFile);
-    if (flowFile != null && flowFile.exists()) {
-      flowFile.delete();
-    }
-    return flowProps;
-  }
-
-  private Props loadJobPropsFromYamlFile(final ExecutableNode node) {
-    final File flowFile = getFlowFile();
-    final String jobPath =
-        node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
-    final Props props = FlowLoaderUtils.getPropsFromYamlFile(jobPath, flowFile);
-    if (flowFile != null && flowFile.exists()) {
-      flowFile.delete();
+  private Props loadPropsFromYamlFile(final String path) {
+    File tempDir = null;
+    Props props = null;
+    try {
+      tempDir = Files.createTempDir();
+      props = FlowLoaderUtils.getPropsFromYamlFile(path, getFlowFile(tempDir));
+    } catch (final Exception e) {
+      this.logger.error("Failed to get props from flow file. " + e);
+    } finally {
+      if (tempDir != null && tempDir.exists()) {
+        try {
+          FileUtils.deleteDirectory(tempDir);
+        } catch (final IOException e) {
+          this.logger.error("Failed to delete temp directory." + e);
+          tempDir.deleteOnExit();
+        }
+      }
     }
     return props;
   }
 
-  private File getFlowFile() {
-    File flowFile = null;
-    String source = null;
-    try {
-      final List<FlowProps> flowPropsList = ImmutableList.copyOf(this.flow.getFlowProps());
-      // There should be exact one source(file name) for each flow file.
-      if (!flowPropsList.isEmpty()) {
-        source = flowPropsList.get(0).getSource();
-      } else {
-        this.logger.error("Failed to get flow file source, flow props is empty.");
-        return null;
-      }
-      final int flowVersion = this.projectLoader
-          .getLatestFlowVersion(this.flow.getProjectId(), this.flow.getVersion(), source);
-      flowFile = this.projectLoader
-          .getUploadedFlowFile(this.flow.getProjectId(), this.flow.getVersion(), flowVersion,
-              source);
-    } catch (final ProjectManagerException e) {
-      this.logger.error("Failed to get flow file " + source, e);
+  private File getFlowFile(final File tempDir) throws Exception {
+    final List<FlowProps> flowPropsList = ImmutableList.copyOf(this.flow.getFlowProps());
+    // There should be exact one source (file name) for each flow file.
+    if (flowPropsList.isEmpty() || flowPropsList.get(0) == null) {
+      throw new ProjectManagerException(
+          "Failed to get flow file source. Flow props is empty for " + this.flow.getId());
     }
+    final String source = flowPropsList.get(0).getSource();
+    final int flowVersion = this.projectLoader
+        .getLatestFlowVersion(this.flow.getProjectId(), this.flow.getVersion(), source);
+    final File flowFile = this.projectLoader
+        .getUploadedFlowFile(this.flow.getProjectId(), this.flow.getVersion(), source,
+            flowVersion, tempDir);
+
     return flowFile;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 5dd2fd4..3691030 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -16,7 +16,8 @@
 
 package azkaban.execapp;
 
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutableFlow;
@@ -30,7 +31,6 @@ import azkaban.utils.Props;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -77,12 +77,10 @@ public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
         .thenReturn(true);
     when(this.testUtil.getProjectLoader()
         .getLatestFlowVersion(project.getId(), project.getVersion(), FLOW_YAML_FILE)).thenReturn(1);
-    doAnswer(invocation -> {
-      final File flowFile = this.temporaryFolder.newFile(FLOW_YAML_FILE);
-      FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE), flowFile);
-      return flowFile;
-    }).when(this.testUtil.getProjectLoader())
-        .getUploadedFlowFile(project.getId(), project.getVersion(), 1, FLOW_YAML_FILE);
+    when(this.testUtil.getProjectLoader()
+        .getUploadedFlowFile(eq(project.getId()), eq(project.getVersion()), eq(FLOW_YAML_FILE),
+            eq(1), any(File.class)))
+        .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE));
     assertProperties(true);
   }