Details
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index a2e496e..ee26a14 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -217,7 +217,7 @@ class AzkabanProjectLoader {
final int newFlowVersion = this.projectLoader
.getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
this.projectLoader
- .uploadFlowFile(project.getId(), newProjectVersion, newFlowVersion, file);
+ .uploadFlowFile(project.getId(), newProjectVersion, file, newFlowVersion);
}
} else {
throw new ProjectManagerException("Invalid type of flow loader.");
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index ca3e1d7..8631aba 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -26,7 +26,6 @@ import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import com.google.common.collect.ImmutableList;
import java.io.File;
-import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -110,8 +109,8 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
this.flowMap.put(flow.getId(), flow);
- } catch (final FileNotFoundException e) {
- logger.error("Error loading flow yaml files", e);
+ } catch (final Exception e) {
+ logger.error("Error loading flow yaml file. ", e);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index d16c7e4..8c719d0 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -22,7 +22,6 @@ import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import java.io.File;
import java.io.FileFilter;
-import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -59,8 +58,8 @@ public class FlowLoaderUtils {
logger.error("Error getting props for " + path);
}
}
- } catch (final FileNotFoundException e) {
- logger.error("Failed to get props, error loading flow YAML file " + flowFile);
+ } catch (final Exception e) {
+ logger.error("Failed to get props, error loading flow YAML file. ", e);
}
return null;
}
@@ -95,8 +94,8 @@ public class FlowLoaderUtils {
try {
final NodeBean nodeBean = loader.load(flowFile);
return loader.toFlowTrigger(nodeBean.getTrigger());
- } catch (final FileNotFoundException e) {
- logger.error("Failed to get flow trigger, error loading flow YAML file " + flowFile);
+ } catch (final Exception e) {
+ logger.error("Failed to get flow trigger, error loading flow YAML file. ", e);
}
return null;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 2ca61bd..eaa607a 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -974,8 +974,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void uploadFlowFile(final int projectId, final int projectVersion, final int flowVersion,
- final File flowFile) throws ProjectManagerException {
+ public void uploadFlowFile(final int projectId, final int projectVersion, final File flowFile,
+ final int flowVersion) throws ProjectManagerException {
logger.info(String
.format(
"Uploading flow file %s, version %d for project %d, version %d, file length is [%d bytes]",
@@ -1015,31 +1015,37 @@ public class JdbcProjectImpl implements ProjectLoader {
@Override
public File getUploadedFlowFile(final int projectId, final int projectVersion,
- final int flowVersion, final String flowName) throws ProjectManagerException {
+ final String flowFileName, final int flowVersion, final File tempDir)
+ throws ProjectManagerException, IOException {
final FlowFileResultHandler handler = new FlowFileResultHandler();
final List<byte[]> data;
- // Todo jamiesjc: delete the flow file after used.
- final File file = new File(this.tempDir, flowName);
+ // Created separate temp directory for each flow file to avoid overwriting the same file by
+ // multiple threads concurrently. Flow file name will be interpret as the flow name when
+ // parsing the yaml flow file, so it has to be specific.
+ final File file = new File(tempDir, flowFileName);
try (final FileOutputStream output = new FileOutputStream(file);
final BufferedOutputStream bufferedStream = new BufferedOutputStream(output)) {
try {
data = this.dbOperator
.query(FlowFileResultHandler.SELECT_FLOW_FILE, handler,
- projectId, projectVersion, flowName, flowVersion);
+ projectId, projectVersion, flowFileName, flowVersion);
} catch (final SQLException e) {
- logger.error(e);
- throw new ProjectManagerException("Failed to query uploaded flow file " + flowName + ".",
- e);
+ throw new ProjectManagerException(
+ "Failed to query uploaded flow file for project " + projectId + " version "
+ + projectVersion + ", flow file " + flowFileName + " version " + flowVersion, e);
}
- try {
- bufferedStream.write(data.get(0));
- } catch (final IOException e) {
- throw new ProjectManagerException("Error writing flow file" + flowName, e);
+ if (data == null || data.isEmpty()) {
+ throw new ProjectManagerException(
+ "No flow file could be found in DB table for project " + projectId + " version " +
+ projectVersion + ", flow file " + flowFileName + " version " + flowVersion);
}
+ bufferedStream.write(data.get(0));
} catch (final IOException e) {
- throw new ProjectManagerException("Error creating output stream for flow file" + flowName, e);
+ throw new ProjectManagerException(
+ "Error writing to output stream for project " + projectId + " version " + projectVersion
+ + ", flow file " + flowFileName + " version " + flowVersion, e);
}
return file;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 945fff3..2b303d3 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -23,7 +23,6 @@ import azkaban.Constants;
import com.google.common.io.Files;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
@@ -35,11 +34,15 @@ import org.yaml.snakeyaml.Yaml;
*/
public class NodeBeanLoader {
- public NodeBean load(final File flowFile) throws FileNotFoundException {
- checkArgument(flowFile.exists());
+ public NodeBean load(final File flowFile) throws Exception {
+ checkArgument(flowFile != null && flowFile.exists());
checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
final NodeBean nodeBean = new Yaml().loadAs(new FileInputStream(flowFile), NodeBean.class);
+ if (nodeBean == null) {
+ throw new ProjectManagerException(
+ "Failed to load flow file " + flowFile.getName() + ". Node bean is null .");
+ }
nodeBean.setName(getFlowName(flowFile));
nodeBean.setType(Constants.FLOW_NODE_TYPE);
return nodeBean;
@@ -95,7 +98,7 @@ public class NodeBeanLoader {
}
public String getFlowName(final File flowFile) {
- checkArgument(flowFile.exists());
+ checkArgument(flowFile != null && flowFile.exists());
checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
return Files.getNameWithoutExtension(flowFile.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 5af9ef3..41b5ebd 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -23,6 +23,7 @@ import azkaban.user.User;
import azkaban.utils.Props;
import azkaban.utils.Triple;
import java.io.File;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -204,14 +205,15 @@ public interface ProjectLoader {
/**
* Uploads flow file.
*/
- void uploadFlowFile(int projectId, int projectVersion, int flowVersion, File flowFile)
+ void uploadFlowFile(int projectId, int projectVersion, File flowFile, int flowVersion)
throws ProjectManagerException;
/**
* Gets flow file that's uploaded.
*/
- File getUploadedFlowFile(int projectId, int projectVersion, int flowVersion, String flowName)
- throws ProjectManagerException;
+ File getUploadedFlowFile(int projectId, int projectVersion, String flowFileName, int
+ flowVersion, final File tempDir)
+ throws ProjectManagerException, IOException;
/**
* Gets the latest flow version.
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index ad91a23..fe36a47 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -113,7 +113,7 @@ public class AzkabanProjectLoaderTest {
verify(this.storageManager)
.uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
verify(this.projectLoader)
- .uploadFlowFile(eq(this.ID), eq(this.VERSION + 1), eq(flowVersion + 1), any(File.class));
+ .uploadFlowFile(eq(this.ID), eq(this.VERSION + 1), any(File.class), eq(flowVersion + 1));
}
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index c4d3e00..dd86418 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -27,6 +27,7 @@ import azkaban.user.User;
import azkaban.utils.Md5Hasher;
import azkaban.utils.Props;
import azkaban.utils.Triple;
+import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
@@ -357,21 +358,22 @@ public class JdbcProjectImplTest {
@Test
public void testUploadFlowFile() throws Exception {
final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
- this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
-
+ this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION);
+ final File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
final File file = this.loader
- .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, BASIC_FLOW_FILE);
+ .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, BASIC_FLOW_FILE, FLOW_VERSION, tempDir);
assertThat(file.getName()).isEqualTo(BASIC_FLOW_FILE);
- FileUtils.contentEquals(testYamlFile, file);
+ assertThat(FileUtils.contentEquals(testYamlFile, file)).isTrue();
}
@Test
public void testDuplicateUploadFlowFileException() throws Exception {
final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
- this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+ this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION);
assertThatThrownBy(
- () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+ () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION))
.isInstanceOf(ProjectManagerException.class)
.hasMessageContaining(
"Error uploading flow file " + BASIC_FLOW_FILE + ", version " + FLOW_VERSION + ".");
@@ -382,7 +384,7 @@ public class JdbcProjectImplTest {
final File testYamlFile = ExecutionsTestUtil.getFlowFile(LARGE_FLOW_YAML_DIR, LARGE_FLOW_FILE);
assertThatThrownBy(
- () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+ () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION))
.isInstanceOf(ProjectManagerException.class)
.hasMessageContaining(
"Flow file length exceeds 10 MB limit.");
@@ -395,7 +397,7 @@ public class JdbcProjectImplTest {
assertThat(
this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
.isEqualTo(0);
- this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+ this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, testYamlFile, FLOW_VERSION);
assertThat(
this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
.isEqualTo(FLOW_VERSION);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 8b81297..15383cc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -52,6 +52,7 @@ import azkaban.utils.Props;
import azkaban.utils.SwapQueue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -66,6 +67,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
@@ -267,7 +269,7 @@ public class FlowRunner extends EventHandler implements Runnable {
Props commonFlowProps = FlowUtils.addCommonFlowProperties(null, this.flow);
if (this.isAzkabanFlowVersion20) {
- final Props flowProps = loadFlowPropsFromYamlFile();
+ final Props flowProps = loadPropsFromYamlFile(this.flow.getId());
if (flowProps != null) {
flowProps.setParent(commonFlowProps);
commonFlowProps = flowProps;
@@ -723,7 +725,9 @@ public class FlowRunner extends EventHandler implements Runnable {
private Props loadJobProps(final ExecutableNode node) throws IOException {
Props props = null;
if (this.isAzkabanFlowVersion20) {
- props = loadJobPropsFromYamlFile(node);
+ final String jobPath =
+ node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
+ props = loadPropsFromYamlFile(jobPath);
if (props == null) {
this.logger.info("Job props loaded from yaml file is empty for job " + node.getId());
return props;
@@ -768,46 +772,41 @@ public class FlowRunner extends EventHandler implements Runnable {
return props;
}
- private Props loadFlowPropsFromYamlFile() {
- final File flowFile = getFlowFile();
- final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(this.flow.getId(), flowFile);
- if (flowFile != null && flowFile.exists()) {
- flowFile.delete();
- }
- return flowProps;
- }
-
- private Props loadJobPropsFromYamlFile(final ExecutableNode node) {
- final File flowFile = getFlowFile();
- final String jobPath =
- node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
- final Props props = FlowLoaderUtils.getPropsFromYamlFile(jobPath, flowFile);
- if (flowFile != null && flowFile.exists()) {
- flowFile.delete();
+ private Props loadPropsFromYamlFile(final String path) {
+ File tempDir = null;
+ Props props = null;
+ try {
+ tempDir = Files.createTempDir();
+ props = FlowLoaderUtils.getPropsFromYamlFile(path, getFlowFile(tempDir));
+ } catch (final Exception e) {
+ this.logger.error("Failed to get props from flow file. " + e);
+ } finally {
+ if (tempDir != null && tempDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ } catch (final IOException e) {
+ this.logger.error("Failed to delete temp directory." + e);
+ tempDir.deleteOnExit();
+ }
+ }
}
return props;
}
- private File getFlowFile() {
- File flowFile = null;
- String source = null;
- try {
- final List<FlowProps> flowPropsList = ImmutableList.copyOf(this.flow.getFlowProps());
- // There should be exact one source(file name) for each flow file.
- if (!flowPropsList.isEmpty()) {
- source = flowPropsList.get(0).getSource();
- } else {
- this.logger.error("Failed to get flow file source, flow props is empty.");
- return null;
- }
- final int flowVersion = this.projectLoader
- .getLatestFlowVersion(this.flow.getProjectId(), this.flow.getVersion(), source);
- flowFile = this.projectLoader
- .getUploadedFlowFile(this.flow.getProjectId(), this.flow.getVersion(), flowVersion,
- source);
- } catch (final ProjectManagerException e) {
- this.logger.error("Failed to get flow file " + source, e);
+ private File getFlowFile(final File tempDir) throws Exception {
+ final List<FlowProps> flowPropsList = ImmutableList.copyOf(this.flow.getFlowProps());
+ // There should be exact one source (file name) for each flow file.
+ if (flowPropsList.isEmpty() || flowPropsList.get(0) == null) {
+ throw new ProjectManagerException(
+ "Failed to get flow file source. Flow props is empty for " + this.flow.getId());
}
+ final String source = flowPropsList.get(0).getSource();
+ final int flowVersion = this.projectLoader
+ .getLatestFlowVersion(this.flow.getProjectId(), this.flow.getVersion(), source);
+ final File flowFile = this.projectLoader
+ .getUploadedFlowFile(this.flow.getProjectId(), this.flow.getVersion(), source,
+ flowVersion, tempDir);
+
return flowFile;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 5dd2fd4..3691030 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -16,7 +16,8 @@
package azkaban.execapp;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutableFlow;
@@ -30,7 +31,6 @@ import azkaban.utils.Props;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -77,12 +77,10 @@ public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
.thenReturn(true);
when(this.testUtil.getProjectLoader()
.getLatestFlowVersion(project.getId(), project.getVersion(), FLOW_YAML_FILE)).thenReturn(1);
- doAnswer(invocation -> {
- final File flowFile = this.temporaryFolder.newFile(FLOW_YAML_FILE);
- FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE), flowFile);
- return flowFile;
- }).when(this.testUtil.getProjectLoader())
- .getUploadedFlowFile(project.getId(), project.getVersion(), 1, FLOW_YAML_FILE);
+ when(this.testUtil.getProjectLoader()
+ .getUploadedFlowFile(eq(project.getId()), eq(project.getVersion()), eq(FLOW_YAML_FILE),
+ eq(1), any(File.class)))
+ .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE));
assertProperties(true);
}