Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3a65a58..a8a4eaf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -16,12 +16,12 @@
package azkaban.executor;
+import com.google.inject.Inject;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.lang.annotation.Inherited;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -58,6 +58,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
private EncodingType defaultEncodingType = EncodingType.GZIP;
+ @Inject
public JdbcExecutorLoader(Props props) {
super(props);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 806f7b0..7a78873 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -515,22 +515,6 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
}
}
-
- @Override
- public ProjectFileHandler getUploadedFile(Project project, int version)
- throws ProjectManagerException {
- logger.info("Retrieving to " + project.getName() + " version:" + version);
- Connection connection = getConnection();
- ProjectFileHandler handler = null;
- try {
- handler = getUploadedFile(connection, project.getId(), version);
- } finally {
- DbUtils.closeQuietly(connection);
- }
-
- return handler;
- }
-
@Override
public ProjectFileHandler getUploadedFile(int projectId, int version)
throws ProjectManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index c7af71c..0372773 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -139,14 +139,6 @@ public interface ProjectLoader {
*
* @return
*/
- ProjectFileHandler getUploadedFile(Project project, int version)
- throws ProjectManagerException;
-
- /**
- * Get file that's uploaded.
- *
- * @return
- */
ProjectFileHandler getUploadedFile(int projectId, int version)
throws ProjectManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 6e77d78..da5c351 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -413,7 +413,7 @@ public class ProjectManager {
if (version == -1) {
version = projectLoader.getLatestProjectVersion(project);
}
- return projectLoader.getUploadedFile(project, version);
+ return storageManager.getProjectFile(project.getId(), version);
}
public Map<String, ValidationReport> uploadProject(Project project,
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 3149d9c..972ffe1 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -17,6 +17,7 @@
package azkaban.storage;
+import azkaban.project.ProjectFileHandler;
import azkaban.project.ProjectLoader;
import azkaban.spi.Storage;
import azkaban.spi.StorageMetadata;
@@ -33,6 +34,8 @@ import javax.inject.Inject;
* behavior of Azkaban.
*/
public class DatabaseStorage implements Storage {
+ public static final String PROJECT_ID = "projectId";
+ public static final String VERSION = "version";
private final ProjectLoader projectLoader;
@@ -44,7 +47,11 @@ public class DatabaseStorage implements Storage {
@Override
public InputStream get(URI key) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Not implemented yet. Use get(projectId, version) instead");
+ }
+
+ public ProjectFileHandler get(int projectId, int version) {
+ return projectLoader.getUploadedFile(projectId, version);
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index d09e5f5..a04f189 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -18,18 +18,12 @@
package azkaban.storage;
import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.project.ProjectManagerException;
+import azkaban.project.ProjectFileHandler;
import azkaban.spi.Storage;
-import azkaban.spi.StorageException;
import azkaban.spi.StorageMetadata;
import azkaban.user.User;
import com.google.inject.Inject;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.URI;
import org.apache.log4j.Logger;
@@ -71,4 +65,20 @@ public class StorageManager {
metadata, localFile.getName(), localFile.length()));
storage.put(metadata, localFile);
}
+
+ /**
+ * Fetch project file from storage.
+ *
+ * @param projectId required project ID
+ * @param version version to be fetched
+ * @return Handler object containing hooks to fetched project file
+ */
+ public ProjectFileHandler getProjectFile(final int projectId, final int version) {
+ log.info(String.format("Fetching project file. project ID: %d version: %d", projectId, version));
+ // TODO spyne: remove huge hack ! There should not be any special handling for Database Storage.
+ if (storage instanceof DatabaseStorage) {
+ return ((DatabaseStorage) storage).get(projectId, version);
+ }
+ throw new UnsupportedOperationException("Operation currently unsupported for other types.");
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index fb0543b..f78456d 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -541,7 +541,7 @@ public class JdbcProjectLoaderTest {
loader.uploadProjectFile(project.getId(), 1, testFile, user.getUserId());
- ProjectFileHandler handler = loader.getUploadedFile(project, 1);
+ ProjectFileHandler handler = loader.getUploadedFile(project.getId(), 1);
Assert.assertEquals(handler.getProjectId(), project.getId());
Assert.assertEquals(handler.getFileName(), "testjob.zip");
Assert.assertEquals(handler.getVersion(), 1);
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 1e1d37d..062d880 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,13 +123,6 @@ public class MockProjectLoader implements ProjectLoader {
}
@Override
- public ProjectFileHandler getUploadedFile(Project project, int version)
- throws ProjectManagerException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public ProjectFileHandler getUploadedFile(int projectId, int version)
throws ProjectManagerException {
// TODO Auto-generated method stub
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index c8803f9..3f9772b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,6 +17,8 @@
package azkaban.execapp;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.JdbcExecutorLoader;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
@@ -29,6 +31,7 @@ import com.google.inject.Scopes;
public class AzkabanExecServerModule extends AbstractModule {
@Override
protected void configure() {
+ bind(ExecutorLoader.class).to(JdbcExecutorLoader.class).in(Scopes.SINGLETON);
bind(AzkabanExecutorServer.class).in(Scopes.SINGLETON);
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 54068a9..7cd3cb6 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -95,7 +95,6 @@ public class AzkabanExecutorServer {
private static AzkabanExecutorServer app;
private final ExecutorLoader executionLoader;
- private final ProjectLoader projectLoader;
private final FlowRunnerManager runnerManager;
private final Props props;
private final Server server;
@@ -103,19 +102,15 @@ public class AzkabanExecutorServer {
private final ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
private MBeanServer mbeanServer;
- /**
- * Constructor
- *
- * @throws Exception
- */
@Inject
- public AzkabanExecutorServer(Props props) throws Exception {
+ public AzkabanExecutorServer(Props props,
+ ExecutorLoader executionLoader,
+ FlowRunnerManager runnerManager) throws Exception {
this.props = props;
- server = createJettyServer(props);
+ this.executionLoader = executionLoader;
+ this.runnerManager = runnerManager;
- executionLoader = new JdbcExecutorLoader(props);
- projectLoader = new JdbcProjectLoader(props);
- runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader());
+ server = createJettyServer(props);
JmxJobMBeanManager.getInstance().initialize(props);
@@ -314,11 +309,6 @@ public class AzkabanExecutorServer {
}
}
-
- public ProjectLoader getProjectLoader() {
- return projectLoader;
- }
-
public ExecutorLoader getExecutorLoader() {
return executionLoader;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index e2b137c..c8a4039 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
import azkaban.executor.ExecutableFlow;
import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
+import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Pair;
import azkaban.utils.Utils;
@@ -41,19 +41,17 @@ import static java.util.Objects.*;
public class FlowPreparer {
private static final Logger log = Logger.getLogger(FlowPreparer.class);
- // TODO move to config class
+ // TODO spyne: move to config class
private final File executionsDir;
- // TODO move to config class
+ // TODO spyne: move to config class
private final File projectsDir;
private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
- private final ProjectLoader projectLoader;
+ private final StorageManager storageManager;
- public FlowPreparer(ProjectLoader projectLoader,
- File executionsDir,
- File projectsDir,
+ public FlowPreparer(StorageManager storageManager, File executionsDir, File projectsDir,
Map<Pair<Integer, Integer>, ProjectVersion> installedProjects) {
- this.projectLoader = projectLoader;
+ this.storageManager = storageManager;
this.executionsDir = executionsDir;
this.projectsDir = projectsDir;
this.installedProjects = installedProjects;
@@ -116,12 +114,12 @@ public class FlowPreparer {
File tempDir = new File(projectsDir, "_temp." + projectDir + "." + System.currentTimeMillis());
- // TODO Why mkdirs? This path should be already set up.
+ // TODO spyne: Why mkdirs? This path should be already set up.
tempDir.mkdirs();
ProjectFileHandler projectFileHandler = null;
try {
- projectFileHandler = requireNonNull(projectLoader.getUploadedFile(projectId, version));
+ projectFileHandler = requireNonNull(storageManager.getProjectFile(projectId, version));
checkState("zip".equals(projectFileHandler.getFileType()));
log.info("Downloading zip file.");
@@ -152,7 +150,7 @@ public class FlowPreparer {
File execDir = new File(executionsDir, String.valueOf(execId));
flow.setExecutionPath(execDir.getPath());
- // TODO Why mkdirs? This path should be already set up.
+ // TODO spyne: Why mkdirs? This path should be already set up.
execDir.mkdirs();
return execDir;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 4cd61cb..9d0336a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,8 @@ package azkaban.execapp;
import azkaban.Constants;
import azkaban.executor.Status;
+import azkaban.storage.StorageManager;
+import com.google.inject.Inject;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -145,11 +147,13 @@ public class FlowRunnerManager implements EventListener,
// whether the current executor is active
private volatile boolean isExecutorActive = false;
- public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
+ @Inject
+ public FlowRunnerManager(Props props,
+ ExecutorLoader executorLoader,
+ ProjectLoader projectLoader,
+ StorageManager storageManager) throws IOException {
azkabanProps = props;
- // JobWrappingFactory.init(props, getClass().getClassLoader());
executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
logger.info("Execution dir retention set to " + executionDirRetention + " ms");
@@ -170,7 +174,7 @@ public class FlowRunnerManager implements EventListener,
executorService = createExecutorService(numThreads);
// Create a flow preparer
- flowPreparer = new FlowPreparer(projectLoader, executionDirectory, projectDirectory, installedProjects);
+ flowPreparer = new FlowPreparer(storageManager, executionDirectory, projectDirectory, installedProjects);
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
@@ -193,7 +197,7 @@ public class FlowRunnerManager implements EventListener,
new JobTypeManager(props.getString(
AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR,
JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), globalProps,
- parentClassLoader);
+ getClass().getClassLoader());
}
private TrackingThreadPool createExecutorService(int nThreads) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index 495d496..07797d6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -19,7 +19,7 @@ package azkaban.execapp;
import azkaban.executor.ExecutableFlow;
import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectLoader;
+import azkaban.storage.StorageManager;
import azkaban.utils.Pair;
import java.io.File;
import java.util.HashMap;
@@ -49,7 +49,6 @@ public class FlowPreparerTest {
executionsDir.mkdirs();
projectsDir.mkdirs();
-
ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource(SAMPLE_FLOW_01 + ".zip").getFile());
@@ -57,10 +56,10 @@ public class FlowPreparerTest {
when(projectFileHandler.getFileType()).thenReturn("zip");
when(projectFileHandler.getLocalFile()).thenReturn(file);
- ProjectLoader projectLoader = mock(ProjectLoader.class);
- when(projectLoader.getUploadedFile(12, 34)).thenReturn(projectFileHandler);
+ StorageManager storageManager = mock(StorageManager.class);
+ when(storageManager.getProjectFile(12, 34)).thenReturn(projectFileHandler);
- instance = new FlowPreparer(projectLoader, executionsDir, projectsDir, installedProjects);
+ instance = new FlowPreparer(storageManager, executionsDir, projectsDir, installedProjects);
}
@After
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 23cfd00..5c6e356 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -1207,9 +1207,9 @@ public class AzkabanWebServer extends AzkabanServer {
executorManager.shutdown();
try {
server.stop();
- } catch (Throwable t) {
+ } catch (Exception e) {
// Catch all while closing server
- logger.error(t);
+ logger.error(e);
}
server.destroy();
}