azkaban-aplcache

Refactor uploading project (#1466) * Refactor uploading

9/21/2017 4:01:57 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 695e7b6..dd6cdea 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -31,24 +31,24 @@ import azkaban.storage.StorageManager;
 import azkaban.user.User;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
-import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.zip.ZipFile;
+import javax.inject.Inject;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class handles the downloading and uploading of projects.
+ * Handles the downloading and uploading of projects.
  */
 class AzkabanProjectLoader {
 
   private static final Logger log = LoggerFactory.getLogger(AzkabanProjectLoader.class);
+  private static final String DIRECTORY_FLOW_REPORT_KEY = "Directory Flow";
 
   private final Props props;
 
@@ -79,10 +79,51 @@ class AzkabanProjectLoader {
       final File archive, final String fileType, final User uploader, final Props additionalProps)
       throws ProjectManagerException {
     log.info("Uploading files to " + project.getName());
+    final Map<String, ValidationReport> reports;
+
+    // Since props is an instance variable of ProjectManager, and each
+    // invocation to the uploadProject manager needs to pass a different
+    // value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
+    // create a new instance of Props to make sure these different values
+    // are isolated from each other.
+    final Props prop = new Props(this.props);
+    prop.putAll(additionalProps);
 
-    // Unzip.
     File file = null;
     try {
+      file = unzipProject(archive, fileType);
+
+      reports = validateProject(project, archive, file, prop);
+
+      // Todo jamiesjc: in Flow 2.0, we need to create new flowLoader class and
+      // call new method to load the project flows.
+      // Need to guicify it later so that we can mock flowLoader in the tests.
+      // Load the project flows.
+      final DirectoryFlowLoader directoryFlowLoader = new DirectoryFlowLoader(prop);
+      reports.put(DIRECTORY_FLOW_REPORT_KEY,
+          directoryFlowLoader.loadProject(project, file));
+    } finally {
+      cleanUpProjectTempDir(file);
+    }
+
+    // Check the validation report.
+    if (!isReportStatusValid(reports, project)) {
+      return reports;
+    }
+
+    // Upload the project to DB and storage.
+    persistProject(project, archive, uploader);
+
+    // Clean up project old installations after new project is uploaded successfully.
+    cleanUpProjectOldInstallations(project);
+
+    return reports;
+  }
+
+  private File unzipProject(final File archive, final String fileType)
+      throws ProjectManagerException {
+    final File file;
+    try {
       if (fileType == null) {
         throw new ProjectManagerException("Unknown file type for "
             + archive.getName());
@@ -95,14 +136,11 @@ class AzkabanProjectLoader {
     } catch (final IOException e) {
       throw new ProjectManagerException("Error unzipping file.", e);
     }
+    return file;
+  }
 
-    // Since props is an instance variable of ProjectManager, and each
-    // invocation to the uploadProject manager needs to pass a different
-    // value for the PROJECT_ARCHIVE_FILE_PATH key, it is necessary to
-    // create a new instance of Props to make sure these different values
-    // are isolated from each other.
-    final Props prop = new Props(this.props);
-    prop.putAll(additionalProps);
+  private Map<String, ValidationReport> validateProject(final Project project,
+      final File archive, final File file, final Props prop) {
     prop.put(ValidatorConfigs.PROJECT_ARCHIVE_FILE_PATH,
         archive.getAbsolutePath());
     // Basically, we want to make sure that for different invocations to the
@@ -125,7 +163,11 @@ class AzkabanProjectLoader {
     log.info("Validating project " + archive.getName()
         + " using the registered validators "
         + validatorManager.getValidatorsInfo().toString());
-    final Map<String, ValidationReport> reports = validatorManager.validate(project, file);
+    return validatorManager.validate(project, file);
+  }
+
+  private boolean isReportStatusValid(final Map<String, ValidationReport> reports,
+      final Project project) {
     ValidationStatus status = ValidationStatus.PASS;
     for (final Entry<String, ValidationReport> report : reports.entrySet()) {
       if (report.getValue().getStatus().compareTo(status) > 0) {
@@ -133,27 +175,17 @@ class AzkabanProjectLoader {
       }
     }
     if (status == ValidationStatus.ERROR) {
-      log.error("Error found in upload to " + project.getName()
-          + ". Cleaning up.");
-
-      try {
-        FileUtils.deleteDirectory(file);
-      } catch (final IOException e) {
-        file.deleteOnExit();
-        e.printStackTrace();
-      }
-
-      return reports;
+      log.error("Error found in uploading to " + project.getName());
+      return false;
     }
+    return true;
+  }
 
-    final DirectoryFlowLoader loader =
-        (DirectoryFlowLoader) validatorManager.getDefaultValidator();
-    final Map<String, Props> jobProps = loader.getJobProps();
-    final List<Props> propProps = loader.getProps();
-
+  private void persistProject(final Project project, final File archive, final User uploader)
+      throws ProjectManagerException{
     synchronized (project) {
       final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
-      final Map<String, Flow> flows = loader.getFlowMap();
+      final Map<String, Flow> flows = project.getFlowMap();
       for (final Flow flow : flows.values()) {
         flow.setProjectId(project.getId());
         flow.setVersion(newVersion);
@@ -166,24 +198,18 @@ class AzkabanProjectLoader {
       log.info("Changing project versions " + archive.getName());
       this.projectLoader.changeProjectVersion(project, newVersion,
           uploader.getUserId());
-      project.setFlows(flows);
       log.info("Uploading Job properties");
       this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
-          jobProps.values()));
+          project.getJobPropsMap().values()));
       log.info("Uploading Props properties");
-      this.projectLoader.uploadProjectProperties(project, propProps);
-    }
-
-    log.info("Uploaded project files. Cleaning up temp files.");
-    this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
-        "Uploaded project files zip " + archive.getName());
-    try {
-      FileUtils.deleteDirectory(file);
-    } catch (final IOException e) {
-      file.deleteOnExit();
-      e.printStackTrace();
+      this.projectLoader.uploadProjectProperties(project, project.getPropsList());
+      this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+          "Uploaded project files zip " + archive.getName());
     }
+  }
 
+  private void cleanUpProjectOldInstallations(final Project project)
+      throws ProjectManagerException{
     log.info("Cleaning up old install files older than "
         + (project.getVersion() - this.projectVersionRetention));
     this.projectLoader.cleanOlderProjectVersion(project.getId(),
@@ -191,8 +217,18 @@ class AzkabanProjectLoader {
 
     // Clean up storage
     this.storageManager.cleanupProjectArtifacts(project.getId());
+  }
 
-    return reports;
+  private void cleanUpProjectTempDir(final File file) {
+    log.info("Cleaning up temp files.");
+    try {
+      if (file != null) {
+        FileUtils.deleteDirectory(file);
+      }
+    } catch (final IOException e) {
+      log.error("Failed to delete temp directory", e);
+      file.deleteOnExit();
+    }
   }
 
   private File unzipFile(final File archiveFile) throws IOException {
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index 6055fea..cc99174 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -23,9 +23,7 @@ import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
 import azkaban.jobcallback.JobCallbackValidator;
-import azkaban.project.validator.ProjectValidator;
 import azkaban.project.validator.ValidationReport;
-import azkaban.project.validator.XmlValidatorManager;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
@@ -42,7 +40,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.log4j.Logger;
 
-public class DirectoryFlowLoader implements ProjectValidator {
+public class DirectoryFlowLoader {
 
   public static final String JOB_MAX_XMS = "job.max.Xms";
   public static final String MAX_XMS_DEFAULT = "1G";
@@ -54,7 +52,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
   private static final String XMS = "Xms";
   private static final String XMX = "Xmx";
 
-  private final Logger logger;
+  private static final Logger logger = Logger.getLogger(DirectoryFlowLoader.class);
   private final Props props;
   private HashSet<String> rootNodes;
   private HashMap<String, Flow> flowMap;
@@ -74,23 +72,12 @@ public class DirectoryFlowLoader implements ProjectValidator {
    * Creates a new DirectoryFlowLoader.
    *
    * @param props Properties to add.
-   * @param logger The Logger to use.
    */
-  public DirectoryFlowLoader(final Props props, final Logger logger) {
-    this.logger = logger;
+  public DirectoryFlowLoader(final Props props) {
     this.props = props;
   }
 
   /**
-   * Returns the flow map constructed from the loaded flows.
-   *
-   * @return Map of flow name to Flow.
-   */
-  public Map<String, Flow> getFlowMap() {
-    return this.flowMap;
-  }
-
-  /**
    * Returns errors caught when loading flows.
    *
    * @return Set of error strings.
@@ -100,24 +87,6 @@ public class DirectoryFlowLoader implements ProjectValidator {
   }
 
   /**
-   * Returns job properties.
-   *
-   * @return Map of job name to properties.
-   */
-  public Map<String, Props> getJobProps() {
-    return this.jobPropsMap;
-  }
-
-  /**
-   * Returns list of properties.
-   *
-   * @return List of Props.
-   */
-  public List<Props> getProps() {
-    return this.propsList;
-  }
-
-  /**
    * Loads all flows from the directory into the project.
    *
    * @param project The project to load flows to.
@@ -138,8 +107,6 @@ public class DirectoryFlowLoader implements ProjectValidator {
     // Load all the props files and create the Node objects
     loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
 
-    jobPropertiesCheck(project);
-
     // Create edges and find missing dependencies
     resolveDependencies();
 
@@ -149,6 +116,10 @@ public class DirectoryFlowLoader implements ProjectValidator {
     // Resolve embedded flows
     resolveEmbeddedFlows();
 
+    project.setFlows(this.flowMap);
+    project.setPropsList(this.propsList);
+    project.setJobPropsMap(this.jobPropsMap);
+
   }
 
   private void loadProjectFromDir(final String base, final File dir, Props parent) {
@@ -409,7 +380,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
     visited.remove(node.getId());
   }
 
-  private void jobPropertiesCheck(final Project project) {
+  private void checkJobProperties(final Project project) {
     // if project is in the memory check whitelist, then we don't need to check
     // its memory settings
     if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
@@ -456,19 +427,9 @@ public class DirectoryFlowLoader implements ProjectValidator {
     return filePath.substring(basePath.length() + 1);
   }
 
-  @Override
-  public boolean initialize(final Props configuration) {
-    return true;
-  }
-
-  @Override
-  public String getValidatorName() {
-    return XmlValidatorManager.DEFAULT_VALIDATOR_KEY;
-  }
-
-  @Override
-  public ValidationReport validateProject(final Project project, final File projectDir) {
+  public ValidationReport loadProject(final Project project, final File projectDir) {
     loadProjectFlow(project, projectDir);
+    checkJobProperties(project);
     final ValidationReport report = new ValidationReport();
     report.addErrorMsgs(this.errors);
     return report;
diff --git a/azkaban-common/src/main/java/azkaban/project/Project.java b/azkaban-common/src/main/java/azkaban/project/Project.java
index 1ffdb75..c2f1911 100644
--- a/azkaban-common/src/main/java/azkaban/project/Project.java
+++ b/azkaban-common/src/main/java/azkaban/project/Project.java
@@ -21,6 +21,9 @@ import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -46,7 +49,9 @@ public class Project {
   private long lastModifiedTimestamp;
   private String lastModifiedUser;
   private String source;
-  private Map<String, Flow> flows = null;
+  private Map<String, Flow> flows = new HashMap<>();
+  private Map<String, Props> jobPropsMap = new HashMap<>();
+  private List<Props> propsList = new ArrayList<>();
   private Map<String, Object> metadata = new HashMap<>();
 
   public Project(final int id, final String name) {
@@ -128,7 +133,7 @@ public class Project {
   }
 
   public void setFlows(final Map<String, Flow> flows) {
-    this.flows = flows;
+    this.flows = ImmutableMap.copyOf(flows);
   }
 
   public Permission getCollectivePermission(final User user) {
@@ -450,4 +455,20 @@ public class Project {
   public void setVersion(final int version) {
     this.version = version;
   }
+
+  public Map<String, Props> getJobPropsMap() {
+    return this.jobPropsMap;
+  }
+
+  public void setJobPropsMap(final Map<String, Props> jobPropsMap) {
+    this.jobPropsMap = ImmutableMap.copyOf(jobPropsMap);
+  }
+
+  public List<Props> getPropsList() {
+    return this.propsList;
+  }
+
+  public void setPropsList(final List<Props> propsList) {
+    this.propsList = ImmutableList.copyOf(propsList);
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
index c34a841..93c89fb 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
@@ -28,13 +28,6 @@ public interface ValidatorManager {
   Map<String, ValidationReport> validate(Project project, File projectDir);
 
   /**
-   * The ValidatorManager should have a default validator which checks for the most essential
-   * components of a project. The ValidatorManager should always load the default validator. This
-   * method returns the default validator of this ValidatorManager.
-   */
-  ProjectValidator getDefaultValidator();
-
-  /**
    * Returns a list of String containing the name of each registered validators.
    */
   List<String> getValidatorsInfo();
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index 1ad5bab..83f48a2 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -1,6 +1,5 @@
 package azkaban.project.validator;
 
-import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
 import azkaban.utils.Props;
 import java.io.File;
@@ -42,11 +41,9 @@ import org.xml.sax.SAXException;
  */
 public class XmlValidatorManager implements ValidatorManager {
 
-  public static final String AZKABAN_VALIDATOR_TAG = "azkaban-validators";
   public static final String VALIDATOR_TAG = "validator";
   public static final String CLASSNAME_ATTR = "classname";
   public static final String ITEM_TAG = "property";
-  public static final String DEFAULT_VALIDATOR_KEY = "Directory Flow";
   private static final Logger logger = Logger.getLogger(XmlValidatorManager.class);
   private static final Map<String, Long> resourceTimestamps = new HashMap<>();
   private static ValidatorClassLoader validatorLoader;
@@ -58,6 +55,7 @@ public class XmlValidatorManager implements ValidatorManager {
    * validator ClassLoader. This enables creating instances of these validators in the
    * loadValidators() method.
    */
+  // Todo jamiesjc: guicify XmlValidatorManager class
   public XmlValidatorManager(final Props props) {
     this.validatorDirPath = props
         .getString(ValidatorConfigs.VALIDATOR_PLUGIN_DIR, ValidatorConfigs.DEFAULT_VALIDATOR_DIR);
@@ -119,9 +117,8 @@ public class XmlValidatorManager implements ValidatorManager {
 
   /**
    * Instances of the validators are created here rather than in the constructors. This is because
-   * some validators might need to maintain project-specific states, such as {@link
-   * DirectoryFlowLoader}. By instantiating the validators here, it ensures that the validator
-   * objects are project-specific, rather than global.
+   * some validators might need to maintain project-specific states. By instantiating the validators
+   * here, it ensures that the validator objects are project-specific, rather than global.
    *
    * {@inheritDoc}
    *
@@ -131,10 +128,6 @@ public class XmlValidatorManager implements ValidatorManager {
   @Override
   public void loadValidators(final Props props, final Logger log) {
     this.validators = new LinkedHashMap<>();
-    // Add the default validator
-    final DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(props, log);
-    this.validators.put(flowLoader.getValidatorName(), flowLoader);
-
     if (!props.containsKey(ValidatorConfigs.XML_FILE_PARAM)) {
       logger.warn(
           "Azkaban properties file does not contain the key " + ValidatorConfigs.XML_FILE_PARAM);
@@ -237,11 +230,6 @@ public class XmlValidatorManager implements ValidatorManager {
   }
 
   @Override
-  public ProjectValidator getDefaultValidator() {
-    return this.validators.get(DEFAULT_VALIDATOR_KEY);
-  }
-
-  @Override
   public List<String> getValidatorsInfo() {
     final List<String> info = new ArrayList<>();
     for (final String key : this.validators.keySet()) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index cbca048..11bbcaa 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -209,13 +208,11 @@ public class ExecutableFlowTest {
   public void setUp() throws Exception {
     this.project = new Project(11, "myTestProject");
 
-    final Logger logger = Logger.getLogger(this.getClass());
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
 
-    this.project.setFlows(loader.getFlowMap());
     this.project.setVersion(123);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index b3057f6..10eccac 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -18,8 +18,6 @@ package azkaban.project;
 
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
-import java.net.URISyntaxException;
-import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,27 +32,30 @@ public class DirectoryFlowLoaderTest {
   }
 
   @Test
-  public void testDirectoryLoad() throws URISyntaxException {
-    final Logger logger = Logger.getLogger(this.getClass());
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
+  public void testDirectoryLoad() {
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("exectest1"));
-    logger.info(loader.getFlowMap().size());
+    Assert.assertEquals(0, loader.getErrors().size());
+    Assert.assertEquals(5, this.project.getFlowMap().size());
+    Assert.assertEquals(2, this.project.getPropsList().size());
+    Assert.assertEquals(14, this.project.getJobPropsMap().size());
   }
 
   @Test
-  public void testLoadEmbeddedFlow() throws URISyntaxException {
-    final Logger logger = Logger.getLogger(this.getClass());
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
+  public void testLoadEmbeddedFlow() {
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
+    Assert.assertEquals(2, this.project.getFlowMap().size());
+    Assert.assertEquals(0, this.project.getPropsList().size());
+    Assert.assertEquals(9, this.project.getJobPropsMap().size());
   }
 
   @Test
-  public void testRecursiveLoadEmbeddedFlow() throws URISyntaxException {
-    final Logger logger = Logger.getLogger(this.getClass());
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
+  public void testRecursiveLoadEmbeddedFlow() {
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded_bad"));
     for (final String error : loader.getErrors()) {
diff --git a/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java b/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
index b5a3baa..5fea32f 100644
--- a/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
@@ -12,8 +12,7 @@ public class XmlValidatorManagerTest {
   private final Props baseProps = new Props();
 
   /**
-   * Test that if the validator directory does not exist, XmlValidatorManager should still load the
-   * default validator.
+   * Test that no validator directory exists when there is no xml configuration.
    */
   @Test
   public void testNoValidatorsDir() {
@@ -21,32 +20,8 @@ public class XmlValidatorManagerTest {
 
     final XmlValidatorManager manager = new XmlValidatorManager(props);
     assertEquals(
-        "XmlValidatorManager should contain only the default validator when no xml configuration "
-            + "file is present.", manager.getValidatorsInfo().size(), 1);
-    assertEquals(
-        "XmlValidatorManager should contain only the default validator when no xml configuration "
-            + "file is present.", manager.getValidatorsInfo().get(0),
-        XmlValidatorManager.DEFAULT_VALIDATOR_KEY);
-  }
-
-  /**
-   * Test that if the validator directory exists but the xml configuration file does not,
-   * XmlValidatorManager only loads the default validator.
-   */
-  @Test
-  public void testDefaultValidator() {
-    final Props props = new Props(this.baseProps);
-    final URL validatorUrl = Resources.getResource("project/testValidators");
-    props.put(ValidatorConfigs.VALIDATOR_PLUGIN_DIR, validatorUrl.getPath());
-
-    final XmlValidatorManager manager = new XmlValidatorManager(props);
-    assertEquals(
-        "XmlValidatorManager should contain only the default validator when no xml configuration "
-            + "file is present.", manager.getValidatorsInfo().size(), 1);
-    assertEquals(
-        "XmlValidatorManager should contain only the default validator when no xml configuration "
-            + "file is present.", manager.getValidatorsInfo().get(0),
-        XmlValidatorManager.DEFAULT_VALIDATOR_KEY);
+        "XmlValidatorManager should contain 0 validator when no xml configuration "
+            + "file is present.", manager.getValidatorsInfo().size(), 0);
   }
 
   /**
@@ -66,10 +41,9 @@ public class XmlValidatorManagerTest {
   }
 
   /**
-   * Test that if the xml config file is properly set, XmlValidatorManager loads both the default
-   * validator and the one specified in the xml file. The TestValidator class specified in the xml
-   * configuration file is located with the jar file inside test resource directory
-   * project/testValidators.
+   * Test that if the xml config file is properly set, XmlValidatorManager loads the validator
+   * specified in the xml file. The TestValidator class specified in the xml configuration file
+   * is located with the jar file inside test resource directory project/testValidators.
    */
   @Test
   public void testLoadValidators() {
@@ -80,11 +54,11 @@ public class XmlValidatorManagerTest {
     props.put(ValidatorConfigs.XML_FILE_PARAM, configUrl.getPath());
 
     final XmlValidatorManager manager = new XmlValidatorManager(props);
-    assertEquals("XmlValidatorManager should contain 2 validators.",
-        manager.getValidatorsInfo().size(), 2);
+    assertEquals("XmlValidatorManager should contain 1 validator.",
+        manager.getValidatorsInfo().size(), 1);
     assertEquals(
         "XmlValidatorManager should contain the validator specified in the xml configuration file.",
-        manager.getValidatorsInfo().get(1), "Test");
+        manager.getValidatorsInfo().get(0), "Test");
   }
 
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index fa8f430..46394f3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -25,7 +25,6 @@ import azkaban.test.executions.ExecutionsTestUtil;
 import com.codahale.metrics.MetricRegistry;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -51,13 +50,11 @@ public class EmailerTest {
   public void setUp() throws Exception {
     this.receiveAddrList.add(this.receiveAddr);
     this.project = new Project(11, "myTestProject");
-    final Logger logger = Logger.getLogger(this.getClass());
 
     this.props = createMailProperties();
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props, logger);
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
-    this.project.setFlows(loader.getFlowMap());
     this.project.setVersion(123);
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 89f8b4b..24efe86 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -59,7 +59,6 @@ import org.junit.rules.TemporaryFolder;
 public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   private static int id = 101;
-  private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
   private final AzkabanEventReporter azkabanEventReporter =
       EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
@@ -89,7 +88,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
     final File dir = ExecutionsTestUtil.getFlowDir("embedded2");
     this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, dir, this.logger, this.workingDir);
+        .prepareProject(this.project, dir, this.workingDir);
 
     InteractiveTestJob.clearTestJobs();
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index d202193..cbfb3a8 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -61,7 +60,6 @@ import org.junit.Test;
 public class FlowRunnerPropertyResolutionTest {
 
   private static int id = 101;
-  private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
   private final AzkabanEventReporter azkabanEventReporter =
       EventReporterUtil.getTestAzkabanEventReporter();
   private File workingDir;
@@ -88,7 +86,7 @@ public class FlowRunnerPropertyResolutionTest {
 
     final File dir = new File("unit/executions/execpropstest");
     this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, dir, this.logger, this.workingDir);
+        .prepareProject(this.project, dir, this.workingDir);
 
     InteractiveTestJob.clearTestJobs();
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 06b4b2c..6a25699 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -42,7 +42,6 @@ import azkaban.utils.Props;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.log4j.Logger;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -97,7 +96,6 @@ import org.junit.rules.TemporaryFolder;
 public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
   private static int id = 101;
-  private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
   private final AzkabanEventReporter azkabanEventReporter =
       EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
@@ -124,7 +122,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     JmxJobMBeanManager.getInstance().initialize(new Props());
 
     this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, ExecutionsTestUtil.getFlowDir("embedded2"), this.logger,
+        .prepareProject(this.project, ExecutionsTestUtil.getFlowDir("embedded2"),
             this.workingDir);
 
     InteractiveTestJob.clearTestJobs();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index ef0413d..465445d 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -27,7 +27,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
 
 public class FlowRunnerTestUtil {
 
@@ -37,16 +36,15 @@ public class FlowRunnerTestUtil {
    *
    * @param project project to initialize
    * @param sourceDir the source dir
-   * @param logger the logger
    * @param workingDir the working dir
    * @return the flow name to flow map
    * @throws ProjectManagerException the project manager exception
    * @throws IOException the io exception
    */
   public static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
-      final Logger logger, final File workingDir)
+      final File workingDir)
       throws ProjectManagerException, IOException {
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
     loader.loadProjectFlow(project, sourceDir);
     if (!loader.getErrors().isEmpty()) {
       for (final String error : loader.getErrors()) {
@@ -57,8 +55,7 @@ public class FlowRunnerTestUtil {
           project.getName(), sourceDir));
     }
 
-    final Map<String, Flow> flowMap = loader.getFlowMap();
-    project.setFlows(flowMap);
+    final Map<String, Flow> flowMap = project.getFlowMap();
     FileUtils.copyDirectory(sourceDir, workingDir);
 
     return flowMap;