azkaban-aplcache

Wiring get calls via StorageManager (#1019) This changes

4/26/2017 2:16:14 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3a65a58..a8a4eaf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -16,12 +16,12 @@
 
 package azkaban.executor;
 
+import com.google.inject.Inject;
 import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.lang.annotation.Inherited;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -58,6 +58,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
+  @Inject
   public JdbcExecutorLoader(Props props) {
     super(props);
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 806f7b0..7a78873 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -515,22 +515,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     }
   }
 
-
-  @Override
-  public ProjectFileHandler getUploadedFile(Project project, int version)
-      throws ProjectManagerException {
-    logger.info("Retrieving to " + project.getName() + " version:" + version);
-    Connection connection = getConnection();
-    ProjectFileHandler handler = null;
-    try {
-      handler = getUploadedFile(connection, project.getId(), version);
-    } finally {
-      DbUtils.closeQuietly(connection);
-    }
-
-    return handler;
-  }
-
   @Override
   public ProjectFileHandler getUploadedFile(int projectId, int version)
       throws ProjectManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index c7af71c..0372773 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -139,14 +139,6 @@ public interface ProjectLoader {
    *
    * @return
    */
-  ProjectFileHandler getUploadedFile(Project project, int version)
-      throws ProjectManagerException;
-
-  /**
-   * Get file that's uploaded.
-   *
-   * @return
-   */
   ProjectFileHandler getUploadedFile(int projectId, int version)
       throws ProjectManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 6e77d78..da5c351 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -413,7 +413,7 @@ public class ProjectManager {
     if (version == -1) {
       version = projectLoader.getLatestProjectVersion(project);
     }
-    return projectLoader.getUploadedFile(project, version);
+    return storageManager.getProjectFile(project.getId(), version);
   }
 
   public Map<String, ValidationReport> uploadProject(Project project,
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 3149d9c..972ffe1 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -17,6 +17,7 @@
 
 package azkaban.storage;
 
+import azkaban.project.ProjectFileHandler;
 import azkaban.project.ProjectLoader;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageMetadata;
@@ -33,6 +34,8 @@ import javax.inject.Inject;
  * behavior of Azkaban.
  */
 public class DatabaseStorage implements Storage {
+  public static final String PROJECT_ID = "projectId";
+  public static final String VERSION = "version";
 
   private final ProjectLoader projectLoader;
 
@@ -44,7 +47,11 @@ public class DatabaseStorage implements Storage {
 
   @Override
   public InputStream get(URI key) {
-    throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException("Not implemented yet. Use get(projectId, version) instead");
+  }
+
+  public ProjectFileHandler get(int projectId, int version) {
+    return projectLoader.getUploadedFile(projectId, version);
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index d09e5f5..a04f189 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -18,18 +18,12 @@
 package azkaban.storage;
 
 import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.project.ProjectManagerException;
+import azkaban.project.ProjectFileHandler;
 import azkaban.spi.Storage;
-import azkaban.spi.StorageException;
 import azkaban.spi.StorageMetadata;
 import azkaban.user.User;
 import com.google.inject.Inject;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.URI;
 import org.apache.log4j.Logger;
 
 
@@ -71,4 +65,20 @@ public class StorageManager {
         metadata, localFile.getName(), localFile.length()));
     storage.put(metadata, localFile);
   }
+
+  /**
+   * Fetch project file from storage.
+   *
+   * @param projectId required project ID
+   * @param version version to be fetched
+   * @return Handler object containing hooks to fetched project file
+   */
+  public ProjectFileHandler getProjectFile(final int projectId, final int version) {
+    log.info(String.format("Fetching project file. project ID: %d version: %d", projectId, version));
+    // TODO spyne: remove huge hack ! There should not be any special handling for Database Storage.
+    if (storage instanceof DatabaseStorage) {
+      return ((DatabaseStorage) storage).get(projectId, version);
+    }
+    throw new UnsupportedOperationException("Operation currently unsupported for other types.");
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index fb0543b..f78456d 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -541,7 +541,7 @@ public class JdbcProjectLoaderTest {
 
     loader.uploadProjectFile(project.getId(), 1, testFile, user.getUserId());
 
-    ProjectFileHandler handler = loader.getUploadedFile(project, 1);
+    ProjectFileHandler handler = loader.getUploadedFile(project.getId(), 1);
     Assert.assertEquals(handler.getProjectId(), project.getId());
     Assert.assertEquals(handler.getFileName(), "testjob.zip");
     Assert.assertEquals(handler.getVersion(), 1);
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 1e1d37d..062d880 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,13 +123,6 @@ public class MockProjectLoader implements ProjectLoader {
   }
 
   @Override
-  public ProjectFileHandler getUploadedFile(Project project, int version)
-      throws ProjectManagerException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
   public ProjectFileHandler getUploadedFile(int projectId, int version)
       throws ProjectManagerException {
     // TODO Auto-generated method stub
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index c8803f9..3f9772b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,6 +17,8 @@
 
 package azkaban.execapp;
 
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.JdbcExecutorLoader;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 
@@ -29,6 +31,7 @@ import com.google.inject.Scopes;
 public class AzkabanExecServerModule extends AbstractModule {
   @Override
   protected void configure() {
+    bind(ExecutorLoader.class).to(JdbcExecutorLoader.class).in(Scopes.SINGLETON);
     bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 54068a9..7cd3cb6 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -95,7 +95,6 @@ public class AzkabanExecutorServer {
   private static AzkabanExecutorServer app;
 
   private final ExecutorLoader executionLoader;
-  private final ProjectLoader projectLoader;
   private final FlowRunnerManager runnerManager;
   private final Props props;
   private final Server server;
@@ -103,19 +102,15 @@ public class AzkabanExecutorServer {
   private final ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
   private MBeanServer mbeanServer;
 
-  /**
-   * Constructor
-   *
-   * @throws Exception
-   */
   @Inject
-  public AzkabanExecutorServer(Props props) throws Exception {
+  public AzkabanExecutorServer(Props props,
+      ExecutorLoader executionLoader,
+      FlowRunnerManager runnerManager) throws Exception {
     this.props = props;
-    server = createJettyServer(props);
+    this.executionLoader = executionLoader;
+    this.runnerManager = runnerManager;
 
-    executionLoader = new JdbcExecutorLoader(props);
-    projectLoader = new JdbcProjectLoader(props);
-    runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader());
+    server = createJettyServer(props);
 
     JmxJobMBeanManager.getInstance().initialize(props);
 
@@ -314,11 +309,6 @@ public class AzkabanExecutorServer {
     }
   }
 
-
-  public ProjectLoader getProjectLoader() {
-    return projectLoader;
-  }
-
   public ExecutorLoader getExecutorLoader() {
     return executionLoader;
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index e2b137c..c8a4039 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
+import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Utils;
@@ -41,19 +41,17 @@ import static java.util.Objects.*;
 public class FlowPreparer {
   private static final Logger log = Logger.getLogger(FlowPreparer.class);
 
-  // TODO move to config class
+  // TODO spyne: move to config class
   private final File executionsDir;
-  // TODO move to config class
+  // TODO spyne: move to config class
   private final File projectsDir;
 
   private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
-  private final ProjectLoader projectLoader;
+  private final StorageManager storageManager;
 
-  public FlowPreparer(ProjectLoader projectLoader,
-      File executionsDir,
-      File projectsDir,
+  public FlowPreparer(StorageManager storageManager, File executionsDir, File projectsDir,
       Map<Pair<Integer, Integer>, ProjectVersion> installedProjects) {
-    this.projectLoader = projectLoader;
+    this.storageManager = storageManager;
     this.executionsDir = executionsDir;
     this.projectsDir = projectsDir;
     this.installedProjects = installedProjects;
@@ -116,12 +114,12 @@ public class FlowPreparer {
 
     File tempDir = new File(projectsDir, "_temp." + projectDir + "." + System.currentTimeMillis());
 
-    // TODO Why mkdirs? This path should be already set up.
+    // TODO spyne: Why mkdirs? This path should be already set up.
     tempDir.mkdirs();
 
     ProjectFileHandler projectFileHandler = null;
     try {
-      projectFileHandler = requireNonNull(projectLoader.getUploadedFile(projectId, version));
+      projectFileHandler = requireNonNull(storageManager.getProjectFile(projectId, version));
       checkState("zip".equals(projectFileHandler.getFileType()));
 
       log.info("Downloading zip file.");
@@ -152,7 +150,7 @@ public class FlowPreparer {
     File execDir = new File(executionsDir, String.valueOf(execId));
     flow.setExecutionPath(execDir.getPath());
 
-    // TODO Why mkdirs? This path should be already set up.
+    // TODO spyne: Why mkdirs? This path should be already set up.
     execDir.mkdirs();
     return execDir;
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 4cd61cb..9d0336a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,8 @@ package azkaban.execapp;
 
 import azkaban.Constants;
 import azkaban.executor.Status;
+import azkaban.storage.StorageManager;
+import com.google.inject.Inject;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -145,11 +147,13 @@ public class FlowRunnerManager implements EventListener,
   // whether the current executor is active
   private volatile boolean isExecutorActive = false;
 
-  public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
+  @Inject
+  public FlowRunnerManager(Props props,
+      ExecutorLoader executorLoader,
+      ProjectLoader projectLoader,
+      StorageManager storageManager) throws IOException {
     azkabanProps = props;
 
-    // JobWrappingFactory.init(props, getClass().getClassLoader());
     executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
     logger.info("Execution dir retention set to " + executionDirRetention + " ms");
 
@@ -170,7 +174,7 @@ public class FlowRunnerManager implements EventListener,
     executorService = createExecutorService(numThreads);
 
     // Create a flow preparer
-    flowPreparer = new FlowPreparer(projectLoader, executionDirectory, projectDirectory, installedProjects);
+    flowPreparer = new FlowPreparer(storageManager, executionDirectory, projectDirectory, installedProjects);
 
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
@@ -193,7 +197,7 @@ public class FlowRunnerManager implements EventListener,
         new JobTypeManager(props.getString(
             AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR,
             JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), globalProps,
-            parentClassLoader);
+            getClass().getClassLoader());
   }
 
   private TrackingThreadPool createExecutorService(int nThreads) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index 495d496..07797d6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -19,7 +19,7 @@ package azkaban.execapp;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectLoader;
+import azkaban.storage.StorageManager;
 import azkaban.utils.Pair;
 import java.io.File;
 import java.util.HashMap;
@@ -49,7 +49,6 @@ public class FlowPreparerTest {
     executionsDir.mkdirs();
     projectsDir.mkdirs();
 
-
     ClassLoader classLoader = getClass().getClassLoader();
     File file = new File(classLoader.getResource(SAMPLE_FLOW_01 + ".zip").getFile());
 
@@ -57,10 +56,10 @@ public class FlowPreparerTest {
     when(projectFileHandler.getFileType()).thenReturn("zip");
     when(projectFileHandler.getLocalFile()).thenReturn(file);
 
-    ProjectLoader projectLoader = mock(ProjectLoader.class);
-    when(projectLoader.getUploadedFile(12, 34)).thenReturn(projectFileHandler);
+    StorageManager storageManager = mock(StorageManager.class);
+    when(storageManager.getProjectFile(12, 34)).thenReturn(projectFileHandler);
 
-    instance = new FlowPreparer(projectLoader, executionsDir, projectsDir, installedProjects);
+    instance = new FlowPreparer(storageManager, executionsDir, projectsDir, installedProjects);
   }
 
   @After
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 23cfd00..5c6e356 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -1207,9 +1207,9 @@ public class AzkabanWebServer extends AzkabanServer {
     executorManager.shutdown();
     try {
       server.stop();
-    } catch (Throwable t) {
+    } catch (Exception e) {
       // Catch all while closing server
-      logger.error(t);
+      logger.error(e);
     }
     server.destroy();
   }