azkaban-memoizeit
Changes
.classpath 1(+1 -0)
lib/h2-1.3.170.jar 0(+0 -0)
src/java/azkaban/project/JdbcProjectLoader.java 122(+46 -76)
src/java/azkaban/sla/JdbcSLALoader.java 57(+7 -50)
src/java/azkaban/utils/db/AbstractJdbcLoader.java 102(+102 -0)
src/java/azkaban/utils/db/h2/projects.sql 15(+15 -0)
src/java/azkaban/utils/db/h2/schedules.sql 16(+16 -0)
src/java/azkaban/utils/db/H2TableSetup.java 178(+178 -0)
src/java/azkaban/utils/db/TableData.java 26(+26 -0)
src/java/azkaban/webapp/AzkabanWebServer.java 58(+34 -24)
src/sql/create_settings.sql 8(+8 -0)
unit/project/testfailure/myjob1.job 4(+4 -0)
unit/project/testfailure/myjob2.job 4(+4 -0)
unit/project/testfailure/myjob3.job 5(+5 -0)
unit/project/testfailure/test.jar 0(+0 -0)
Details
.classpath 1(+1 -0)
diff --git a/.classpath b/.classpath
index 3265399..230b137 100644
--- a/.classpath
+++ b/.classpath
@@ -31,5 +31,6 @@
<classpathentry kind="lib" path="lib/commons-dbcp-1.4.jar"/>
<classpathentry kind="lib" path="extlib/mysql-connector-java-5.1.16-bin.jar"/>
<classpathentry kind="lib" path="lib/commons-pool-1.6.jar"/>
+ <classpathentry kind="lib" path="lib/h2-1.3.170.jar"/>
<classpathentry kind="output" path="dist/classes"/>
</classpath>
lib/h2-1.3.170.jar 0(+0 -0)
diff --git a/lib/h2-1.3.170.jar b/lib/h2-1.3.170.jar
new file mode 100644
index 0000000..ad61ab8
Binary files /dev/null and b/lib/h2-1.3.170.jar differ
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 4f5fd02..d4dffef 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -296,7 +296,7 @@ public class AzkabanExecutorServer {
logger.info("Registering MBeans...");
mbeanServer = ManagementFactory.getPlatformMBeanServer();
- registerMbean("jetty", new JmxJettyServer(server));
+ registerMbean("executorJetty", new JmxJettyServer(server));
registerMbean("flowRunnerManager", new JmxFlowRunnerManager(runnerManager));
}
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 4a13a4b..8ca895e 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -81,6 +81,7 @@ public abstract class FlowWatcher {
cancelWatch = true;
for(BlockingStatus status : map.values()) {
+ logger.info("Unblocking " + status.getJobId());
status.changeStatus(Status.KILLED);
status.unblock();
}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 005e720..fb9caf6 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -725,7 +725,10 @@ public class FlowRunner extends EventHandler implements Runnable {
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner)event.getRunner();
- if (event.getType() == Type.JOB_FINISHED) {
+ if (event.getType() == Type.JOB_STATUS_CHANGED) {
+ updateFlow();
+ }
+ else if (event.getType() == Type.JOB_FINISHED) {
synchronized(mainSyncObj) {
ExecutableNode node = runner.getNode();
activeJobRunners.remove(node.getJobId());
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 5f8cade..9096b2f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -231,6 +231,8 @@ public class JobRunner extends EventHandler implements Runnable {
break;
}
}
+ writeStatus();
+ fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED));
}
if (watcher.isWatchCancelled()) {
logger.info("Job was cancelled while waiting on pipeline. Quiting.");
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index dbcc02b..e67a7e2 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -15,8 +15,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.sql.DataSource;
-
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@@ -24,7 +22,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.GZIPUtils;
@@ -33,53 +31,13 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
-public class JdbcExecutorLoader implements ExecutorLoader {
+public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLoader {
private static final Logger logger = Logger.getLogger(JdbcExecutorLoader.class);
-
- /**
- * Used for when we store text data. Plain uses UTF8 encoding.
- */
- public static enum EncodingType {
- PLAIN(1), GZIP(2);
-
- private int numVal;
-
- EncodingType(int numVal) {
- this.numVal = numVal;
- }
-
- public int getNumVal() {
- return numVal;
- }
- public static EncodingType fromInteger(int x) {
- switch (x) {
- case 1:
- return PLAIN;
- case 2:
- return GZIP;
- default:
- return PLAIN;
- }
- }
- }
-
- private DataSource dataSource;
private EncodingType defaultEncodingType = EncodingType.GZIP;
public JdbcExecutorLoader(Props props) {
- String databaseType = props.getString("database.type");
-
- if (databaseType.equals("mysql")) {
- int port = props.getInt("mysql.port");
- String host = props.getString("mysql.host");
- String database = props.getString("mysql.database");
- String user = props.getString("mysql.user");
- String password = props.getString("mysql.password");
- int numConnections = props.getInt("mysql.numconnections");
-
- dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
- }
+ super(props);
}
public EncodingType getDefaultEncodingType() {
@@ -168,7 +126,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public ExecutableFlow fetchExecutableFlow(int id) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
@@ -181,7 +139,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
try {
@@ -194,7 +152,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public int fetchNumExecutableFlows() throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
IntHandler intHandler = new IntHandler();
try {
@@ -207,7 +165,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
IntHandler intHandler = new IntHandler();
try {
@@ -220,7 +178,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
IntHandler intHandler = new IntHandler();
try {
@@ -233,7 +191,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
@@ -246,7 +204,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
@@ -338,7 +296,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
params.add(num);
}
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
@@ -352,7 +310,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public void addActiveExecutableReference(ExecutionReference reference) throws ExecutorManagerException {
final String INSERT = "INSERT INTO active_executing_flows (exec_id, host, port, update_time) values (?,?,?,?)";
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
runner.update(INSERT, reference.getExecId(), reference.getHost(), reference.getPort(), reference.getUpdateTime());
@@ -365,7 +323,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
public void removeActiveExecutableReference(int execid) throws ExecutorManagerException {
final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
runner.update(DELETE, execid);
} catch (SQLException e) {
@@ -377,7 +335,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
public boolean updateExecutableReference(int execId, long updateTime) throws ExecutorManagerException {
final String DELETE = "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
int updateNum = 0;
try {
updateNum = runner.update(DELETE, updateTime, execId);
@@ -404,7 +362,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
ExecutableFlow flow = node.getFlow();
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
runner.update(
INSERT_EXECUTION_NODE,
@@ -439,7 +397,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
}
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
runner.update(
UPSERT_EXECUTION_NODE,
@@ -457,7 +415,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, new FetchExecutableJobHandler(), execId, jobId);
@@ -473,7 +431,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE, new FetchExecutableJobHandler(), execId, jobId);
@@ -489,7 +447,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
return props.getFirst();
@@ -501,7 +459,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
return props.getFirst();
@@ -513,7 +471,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
return props;
@@ -525,7 +483,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE, new FetchExecutableJobHandler(), projectId, jobId, skip, size);
@@ -541,7 +499,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public LogData fetchLogs(int execId, String name, int attempt, int startByte, int length) throws ExecutorManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
@@ -638,8 +596,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
private Connection getConnection() throws ExecutorManagerException {
Connection connection = null;
try {
- connection = dataSource.getConnection();
- connection.setAutoCommit(false);
+ connection = super.getDBConnection(false);
} catch (Exception e) {
DbUtils.closeQuietly(connection);
throw new ExecutorManagerException("Error getting DB connection.", e);
@@ -919,7 +876,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
int updateNum = 0;
try {
updateNum = runner.update(DELETE_BY_TIME, millis);
diff --git a/src/java/azkaban/executor/RemoteExecutorConnector.java b/src/java/azkaban/executor/RemoteExecutorConnector.java
new file mode 100644
index 0000000..6bbfe34
--- /dev/null
+++ b/src/java/azkaban/executor/RemoteExecutorConnector.java
@@ -0,0 +1,5 @@
+package azkaban.executor;
+
+public class RemoteExecutorConnector {
+
+}
\ No newline at end of file
src/java/azkaban/project/JdbcProjectLoader.java 122(+46 -76)
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 18d8cf3..972b531 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -17,8 +17,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.sql.DataSource;
-
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@@ -29,7 +27,7 @@ import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Md5Hasher;
@@ -38,60 +36,20 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Triple;
-public class JdbcProjectLoader implements ProjectLoader {
-
- /**
- * Used for when we store text data. Plain uses UTF8 encoding.
- */
- public static enum EncodingType {
- PLAIN(1), GZIP(2);
-
- private int numVal;
-
- EncodingType(int numVal) {
- this.numVal = numVal;
- }
-
- public int getNumVal() {
- return numVal;
- }
-
- public static EncodingType fromInteger(int x) {
- switch (x) {
- case 1:
- return PLAIN;
- case 2:
- return GZIP;
- default:
- return PLAIN;
- }
- }
- }
-
+public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoader {
private static final Logger logger = Logger.getLogger(JdbcProjectLoader.class);
+
private static final int CHUCK_SIZE = 1024*1024*10;
private File tempDir;
-
- private DataSource dataSource;
+
private EncodingType defaultEncodingType = EncodingType.GZIP;
public JdbcProjectLoader(Props props) {
+ super(props);
tempDir = new File(props.getString("project.temp.dir", "temp"));
if (!tempDir.exists()) {
tempDir.mkdirs();
}
- String databaseType = props.getString("database.type");
-
- if (databaseType.equals("mysql")) {
- int port = props.getInt("mysql.port");
- String host = props.getString("mysql.host");
- String database = props.getString("mysql.database");
- String user = props.getString("mysql.user");
- String password = props.getString("mysql.password");
- int numConnections = props.getInt("mysql.numconnections");
-
- dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
- }
}
@Override
@@ -240,7 +198,7 @@ public class JdbcProjectLoader implements ProjectLoader {
// Insert project
try {
long time = System.currentTimeMillis();
- int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description, defaultEncodingType.numVal, null);
+ int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description, defaultEncodingType.getNumVal(), null);
if (i == 0) {
throw new ProjectManagerException("No projects have been inserted.");
}
@@ -468,7 +426,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public void changeProjectVersion(Project project, int version, String user) throws ProjectManagerException {
long timestamp = System.currentTimeMillis();
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
final String UPDATE_PROJECT_VERSION = "UPDATE projects SET version=?,modified_time=?,last_modified_by=? WHERE id=?";
@@ -484,18 +442,32 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public void updatePermission(Project project, String name, Permission perm, boolean isGroup) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
- long updateTime = System.currentTimeMillis();
- final String INSERT_PROJECT_PERMISSION =
- "INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)" +
- "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
-
- try {
- runner.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
- } catch (SQLException e) {
- logger.error(e);
- throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, e);
+ if (this.allowsOnDuplicateKey()) {
+ long updateTime = System.currentTimeMillis();
+ final String INSERT_PROJECT_PERMISSION =
+ "INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)" +
+ "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
+
+ try {
+ runner.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+ } catch (SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, e);
+ }
+ }
+ else {
+ long updateTime = System.currentTimeMillis();
+ final String MERGE_PROJECT_PERMISSION =
+ "MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
+
+ try {
+ runner.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+ } catch (SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, e);
+ }
}
if (isGroup) {
@@ -542,7 +514,7 @@ public class JdbcProjectLoader implements ProjectLoader {
}
try {
- runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.numVal, data, project.getId());
+ runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.getNumVal(), data, project.getId());
connection.commit();
} catch (SQLException e) {
throw new ProjectManagerException("Error updating project " + project.getName() + " version " + project.getVersion(), e);
@@ -551,7 +523,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public void removePermission(Project project, String name, boolean isGroup) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
final String DELETE_PROJECT_PERMISSION = "DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
try {
@@ -572,7 +544,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException {
ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
List<Triple<String, Boolean,Permission>> permissions = null;
try {
permissions = runner.query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, projectId);
@@ -586,7 +558,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public void removeProject(Project project, String user) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
long updateTime = System.currentTimeMillis();
final String UPDATE_INACTIVE_PROJECT = "UPDATE projects SET active=false,modified_time=?,last_modified_by=? WHERE id=?";
@@ -600,7 +572,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public boolean postEvent(Project project, EventType type, String user, String message) {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
final String INSERT_PROJECT_EVENTS =
"INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
@@ -623,7 +595,7 @@ public class JdbcProjectLoader implements ProjectLoader {
* @throws ProjectManagerException
*/
public List<ProjectLogEvent> getProjectEvents(Project project, int num, int skip) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
List<ProjectLogEvent> events = null;
@@ -638,7 +610,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public void updateDescription(Project project, String description, String user) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
final String UPDATE_PROJECT_DESCRIPTION =
"UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
@@ -656,7 +628,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public int getLatestProjectVersion(Project project) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
IntHander handler = new IntHander();
try {
@@ -739,7 +711,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public Flow fetchFlow(Project project, String flowId) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
try {
@@ -757,7 +729,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public List<Flow> fetchAllProjectFlows(Project project) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
List<Flow> flows = null;
@@ -867,7 +839,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
try {
@@ -886,7 +858,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public Props fetchProjectProperty(Project project, String propsName) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
try {
@@ -964,7 +936,7 @@ public class JdbcProjectLoader implements ProjectLoader {
@Override
public Map<String,Props> fetchProjectProperties(int projectId, int version) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
try {
@@ -1278,8 +1250,7 @@ public class JdbcProjectLoader implements ProjectLoader {
private Connection getConnection() throws ProjectManagerException {
Connection connection = null;
try {
- connection = dataSource.getConnection();
- connection.setAutoCommit(false);
+ connection = super.getDBConnection(false);
} catch (Exception e) {
DbUtils.closeQuietly(connection);
throw new ProjectManagerException("Error getting DB connection.", e);
@@ -1287,6 +1258,5 @@ public class JdbcProjectLoader implements ProjectLoader {
return connection;
}
-
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 5227a90..4b70fbc 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -16,7 +16,6 @@
package azkaban.scheduler;
-
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -25,8 +24,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import javax.sql.DataSource;
-
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@@ -35,42 +32,15 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
-public class JdbcScheduleLoader implements ScheduleLoader {
-
+public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLoader {
private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
-
- public static enum EncodingType {
- PLAIN(1), GZIP(2);
-
- private int numVal;
- EncodingType(int numVal) {
- this.numVal = numVal;
- }
-
- public int getNumVal() {
- return numVal;
- }
-
- public static EncodingType fromInteger(int x) {
- switch (x) {
- case 1:
- return PLAIN;
- case 2:
- return GZIP;
- default:
- return PLAIN;
- }
- }
- }
-
- private DataSource dataSource;
private EncodingType defaultEncodingType = EncodingType.GZIP;
private static final String scheduleTableName = "schedules";
@@ -90,18 +60,6 @@ public class JdbcScheduleLoader implements ScheduleLoader {
private static String UPDATE_NEXT_EXEC_TIME =
"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
- private Connection getConnection() throws ScheduleManagerException {
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- } catch (Exception e) {
- DbUtils.closeQuietly(connection);
- throw new ScheduleManagerException("Error getting DB connection.", e);
- }
-
- return connection;
- }
-
public EncodingType getDefaultEncodingType() {
return defaultEncodingType;
}
@@ -111,18 +69,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
}
public JdbcScheduleLoader(Props props) {
- String databaseType = props.getString("database.type");
-
- if (databaseType.equals("mysql")) {
- int port = props.getInt("mysql.port");
- String host = props.getString("mysql.host");
- String database = props.getString("mysql.database");
- String user = props.getString("mysql.user");
- String password = props.getString("mysql.password");
- int numConnections = props.getInt("mysql.numconnections");
-
- dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
- }
+ super(props);
}
@Override
@@ -178,8 +125,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
public void removeSchedule(Schedule s) throws ScheduleManagerException {
logger.info("Removing schedule " + s.getScheduleName() + " from db.");
- QueryRunner runner = new QueryRunner(dataSource);
-
+ QueryRunner runner = createQueryRunner();
try {
int removes = runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
if (removes == 0) {
@@ -214,7 +160,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
}
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
int inserts = runner.update(
INSERT_SCHEDULE,
@@ -281,7 +227,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
}
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
int updates = runner.update(
@@ -362,4 +308,16 @@ public class JdbcScheduleLoader implements ScheduleLoader {
}
}
+
+ private Connection getConnection() throws ScheduleManagerException {
+ Connection connection = null;
+ try {
+ connection = super.getDBConnection(false);
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new ScheduleManagerException("Error getting DB connection.", e);
+ }
+
+ return connection;
+ }
}
\ No newline at end of file
src/java/azkaban/sla/JdbcSLALoader.java 57(+7 -50)
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
index 0ab7b3b..ffa4221 100644
--- a/src/java/azkaban/sla/JdbcSLALoader.java
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -8,8 +8,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import javax.sql.DataSource;
-
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@@ -17,44 +15,13 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.sla.SLA.SlaRule;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
-public class JdbcSLALoader implements SLALoader {
-
+public class JdbcSLALoader extends AbstractJdbcLoader implements SLALoader {
private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
-
- /**
- * Used for when we store text data. Plain uses UTF8 encoding.
- */
- public static enum EncodingType {
- PLAIN(1), GZIP(2);
-
- private int numVal;
-
- EncodingType(int numVal) {
- this.numVal = numVal;
- }
-
- public int getNumVal() {
- return numVal;
- }
-
- public static EncodingType fromInteger(int x) {
- switch (x) {
- case 1:
- return PLAIN;
- case 2:
- return GZIP;
- default:
- return PLAIN;
- }
- }
- }
-
- private DataSource dataSource;
private EncodingType defaultEncodingType = EncodingType.GZIP;
private static String slaTblName = "active_sla";
@@ -73,28 +40,18 @@ public class JdbcSLALoader implements SLALoader {
"DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
public JdbcSLALoader(Props props) {
- String databaseType = props.getString("database.type");
-
- if (databaseType.equals("mysql")) {
- int port = props.getInt("mysql.port");
- String host = props.getString("mysql.host");
- String database = props.getString("mysql.database");
- String user = props.getString("mysql.user");
- String password = props.getString("mysql.password");
- int numConnections = props.getInt("mysql.numconnections");
-
- dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
- }
+ super(props);
}
private Connection getConnection() throws SLAManagerException {
Connection connection = null;
try {
- connection = dataSource.getConnection();
+ connection = super.getDBConnection(false);
} catch (Exception e) {
DbUtils.closeQuietly(connection);
throw new SLAManagerException("Error getting DB connection.", e);
}
+
return connection;
}
@@ -166,7 +123,7 @@ public class JdbcSLALoader implements SLALoader {
logger.info("Removing SLA " + s.toString() + " from db.");
- QueryRunner runner = new QueryRunner(dataSource);
+ QueryRunner runner = createQueryRunner();
try {
int removes = runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
@@ -322,5 +279,5 @@ public class JdbcSLALoader implements SLALoader {
}
}
-
+
}
src/java/azkaban/utils/db/AbstractJdbcLoader.java 102(+102 -0)
diff --git a/src/java/azkaban/utils/db/AbstractJdbcLoader.java b/src/java/azkaban/utils/db/AbstractJdbcLoader.java
new file mode 100644
index 0000000..fabfbac
--- /dev/null
+++ b/src/java/azkaban/utils/db/AbstractJdbcLoader.java
@@ -0,0 +1,102 @@
+package azkaban.utils.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+
+import azkaban.utils.Props;
+import azkaban.utils.db.H2TableSetup;
+
+public abstract class AbstractJdbcLoader {
+ private boolean allowsOnDuplicateKey = false;
+
+ /**
+ * Used for when we store text data. Plain uses UTF8 encoding.
+ */
+ public static enum EncodingType {
+ PLAIN(1), GZIP(2);
+
+ private int numVal;
+
+ EncodingType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EncodingType fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return PLAIN;
+ case 2:
+ return GZIP;
+ default:
+ return PLAIN;
+ }
+ }
+ }
+
+ private DataSource dataSource;
+
+ public static void setupTables(Props props) throws SQLException, IOException {
+ String databaseType = props.getString("database.type");
+
+ if (databaseType.equals("h2")) {
+ String path = props.getString("h2.path");
+ DataSource dataSource = DataSourceUtils.getH2DataSource(path);
+ H2TableSetup tableSetup = new H2TableSetup(dataSource);
+
+ tableSetup.createProjectTables();
+ tableSetup.createExecutionTables();
+ tableSetup.createOtherTables();
+ }
+ }
+
+ public AbstractJdbcLoader(Props props) {
+ String databaseType = props.getString("database.type");
+
+ if (databaseType.equals("mysql")) {
+ int port = props.getInt("mysql.port");
+ String host = props.getString("mysql.host");
+ String database = props.getString("mysql.database");
+ String user = props.getString("mysql.user");
+ String password = props.getString("mysql.password");
+ int numConnections = props.getInt("mysql.numconnections");
+
+ allowsOnDuplicateKey = true;
+ dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ }
+ else if (databaseType.equals("h2")) {
+ String path = props.getString("h2.path");
+ dataSource = DataSourceUtils.getH2DataSource(path);
+ }
+ }
+
+ protected Connection getDBConnection(boolean autoCommit) throws IOException {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ connection.setAutoCommit(autoCommit);
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new IOException("Error getting DB connection.", e);
+ }
+
+ return connection;
+ }
+
+ protected QueryRunner createQueryRunner() {
+ return new QueryRunner(dataSource);
+ }
+
+ protected boolean allowsOnDuplicateKey() {
+ return allowsOnDuplicateKey;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/active_executing_flows.sql b/src/java/azkaban/utils/db/h2/active_executing_flows.sql
new file mode 100644
index 0000000..960ebc5
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/active_executing_flows.sql
@@ -0,0 +1,7 @@
+CREATE TABLE active_executing_flows (
+ exec_id INT,
+ host VARCHAR(255),
+ port INT,
+ update_time BIGINT,
+ PRIMARY KEY (exec_id)
+);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/active_sla.sql b/src/java/azkaban/utils/db/h2/active_sla.sql
new file mode 100644
index 0000000..1bbc1b8
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/active_sla.sql
@@ -0,0 +1,9 @@
+CREATE TABLE active_sla (
+ exec_id INT NOT NULL,
+ job_name VARCHAR(128) NOT NULL,
+ check_time BIGINT NOT NULL,
+ rule TINYINT NOT NULL,
+ enc_type TINYINT,
+ options LONGBLOB NOT NULL,
+ primary key(exec_id, job_name)
+);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/execution_flows.sql b/src/java/azkaban/utils/db/h2/execution_flows.sql
new file mode 100644
index 0000000..12bfa46
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/execution_flows.sql
@@ -0,0 +1,20 @@
+CREATE TABLE execution_flows (
+ exec_id INT NOT NULL AUTO_INCREMENT,
+ project_id INT NOT NULL,
+ version INT NOT NULL,
+ flow_id VARCHAR(128) NOT NULL,
+ status TINYINT,
+ submit_user VARCHAR(64),
+ submit_time BIGINT,
+ update_time BIGINT,
+ start_time BIGINT,
+ end_time BIGINT,
+ enc_type TINYINT,
+ flow_data LONGBLOB,
+ PRIMARY KEY (exec_id)
+);
+
+CREATE INDEX ex_flows_start_time ON execution_flows(start_time);
+CREATE INDEX ex_flows_end_time ON execution_flows(end_time);
+CREATE INDEX ex_flows_time_range ON execution_flows(start_time, end_time);
+CREATE INDEX ex_flows_flows ON execution_flows(project_id, flow_id);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/execution_jobs.sql b/src/java/azkaban/utils/db/h2/execution_jobs.sql
new file mode 100644
index 0000000..a62d3a9
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/execution_jobs.sql
@@ -0,0 +1,19 @@
+CREATE TABLE execution_jobs (
+ exec_id INT NOT NULL,
+ project_id INT NOT NULL,
+ version INT NOT NULL,
+ flow_id VARCHAR(128) NOT NULL,
+ job_id VARCHAR(128) NOT NULL,
+ attempt INT,
+ start_time BIGINT,
+ end_time BIGINT,
+ status TINYINT,
+ input_params LONGBLOB,
+ output_params LONGBLOB,
+ attachments LONGBLOB,
+ PRIMARY KEY (exec_id, job_id, attempt)
+);
+
+CREATE INDEX exec_job ON execution_jobs(exec_id, job_id);
+CREATE INDEX exec_id ON execution_jobs(exec_id);
+CREATE INDEX ex_job_id ON execution_jobs(project_id, job_id);
diff --git a/src/java/azkaban/utils/db/h2/execution_logs.sql b/src/java/azkaban/utils/db/h2/execution_logs.sql
new file mode 100644
index 0000000..0aa6a36
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/execution_logs.sql
@@ -0,0 +1,14 @@
+CREATE TABLE execution_logs (
+ exec_id INT NOT NULL,
+ name VARCHAR(128),
+ attempt INT,
+ enc_type TINYINT,
+ start_byte INT,
+ end_byte INT,
+ log LONGBLOB,
+ upload_time BIGINT,
+ PRIMARY KEY (exec_id, name, attempt, start_byte)
+);
+
+CREATE INDEX ex_log_attempt ON execution_logs(exec_id, name, attempt);
+CREATE INDEX ex_log_index ON execution_logs(exec_id, name);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_events.sql b/src/java/azkaban/utils/db/h2/project_events.sql
new file mode 100644
index 0000000..dd24d5f
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_events.sql
@@ -0,0 +1,9 @@
+CREATE TABLE project_events (
+ project_id INT NOT NULL,
+ event_type TINYINT NOT NULL,
+ event_time BIGINT NOT NULL,
+ username VARCHAR(64),
+ message VARCHAR(512)
+);
+
+CREATE INDEX log ON project_events(project_id, event_time);
diff --git a/src/java/azkaban/utils/db/h2/project_files.sql b/src/java/azkaban/utils/db/h2/project_files.sql
new file mode 100644
index 0000000..7dc3737
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_files.sql
@@ -0,0 +1,10 @@
+CREATE TABLE project_files (
+ project_id INT NOT NULL,
+ version INT not NULL,
+ chunk INT,
+ size INT,
+ file LONGBLOB,
+ PRIMARY KEY (project_id, version, chunk)
+);
+
+CREATE INDEX file_version ON project_files(project_id, version);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_flows.sql b/src/java/azkaban/utils/db/h2/project_flows.sql
new file mode 100644
index 0000000..bbd524a
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_flows.sql
@@ -0,0 +1,11 @@
+CREATE TABLE project_flows (
+ project_id INT NOT NULL,
+ version INT NOT NULL,
+ flow_id VARCHAR(128),
+ modified_time BIGINT NOT NULL,
+ encoding_type TINYINT,
+ json BLOB,
+ PRIMARY KEY (project_id, version, flow_id)
+);
+
+CREATE INDEX flow_index ON project_flows(project_id, version);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_permissions.sql b/src/java/azkaban/utils/db/h2/project_permissions.sql
new file mode 100644
index 0000000..f49d0b8
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_permissions.sql
@@ -0,0 +1,10 @@
+CREATE TABLE project_permissions (
+ project_id VARCHAR(64) NOT NULL,
+ modified_time BIGINT NOT NULL,
+ name VARCHAR(64) NOT NULL,
+ permissions INT NOT NULL,
+ isGroup BOOLEAN NOT NULL,
+ PRIMARY KEY (project_id, name)
+);
+
+CREATE INDEX permission_index ON project_permissions(project_id);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_properties.sql b/src/java/azkaban/utils/db/h2/project_properties.sql
new file mode 100644
index 0000000..1c602bb
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_properties.sql
@@ -0,0 +1,11 @@
+CREATE TABLE project_properties (
+ project_id INT NOT NULL,
+ version INT NOT NULL,
+ name VARCHAR(128),
+ modified_time BIGINT NOT NULL,
+ encoding_type TINYINT,
+ property BLOB,
+ PRIMARY KEY (project_id, version, name)
+);
+
+CREATE INDEX properties_index ON project_properties(project_id, version);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_versions.sql b/src/java/azkaban/utils/db/h2/project_versions.sql
new file mode 100644
index 0000000..1c764a2
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_versions.sql
@@ -0,0 +1,13 @@
+CREATE TABLE project_versions (
+ project_id INT NOT NULL,
+ version INT not NULL,
+ upload_time BIGINT NOT NULL,
+ uploader VARCHAR(64) NOT NULL,
+ file_type VARCHAR(16),
+ file_name VARCHAR(128),
+ md5 BINARY(16),
+ num_chunks INT,
+ PRIMARY KEY (project_id, version)
+);
+
+CREATE INDEX version_index ON project_versions(project_id);
\ No newline at end of file
src/java/azkaban/utils/db/h2/projects.sql 15(+15 -0)
diff --git a/src/java/azkaban/utils/db/h2/projects.sql b/src/java/azkaban/utils/db/h2/projects.sql
new file mode 100644
index 0000000..475615a
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/projects.sql
@@ -0,0 +1,15 @@
+CREATE TABLE projects (
+ id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+ name VARCHAR(64) NOT NULL,
+ active BOOLEAN,
+ modified_time BIGINT NOT NULL,
+ create_time BIGINT NOT NULL,
+ version INT,
+ last_modified_by VARCHAR(64) NOT NULL,
+ description VARCHAR(255),
+ enc_type TINYINT,
+ settings_blob LONGBLOB,
+ UNIQUE INDEX project_id (id)
+);
+
+CREATE INDEX project_name ON projects(name);
\ No newline at end of file
src/java/azkaban/utils/db/h2/schedules.sql 16(+16 -0)
diff --git a/src/java/azkaban/utils/db/h2/schedules.sql b/src/java/azkaban/utils/db/h2/schedules.sql
new file mode 100644
index 0000000..1924ecd
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/schedules.sql
@@ -0,0 +1,16 @@
+CREATE TABLE schedules (
+ project_id INT NOT NULL,
+ project_name VARCHAR(128) NOT NULL,
+ flow_name VARCHAR(128) NOT NULL,
+ status VARCHAR(16),
+ first_sched_time BIGINT,
+ timezone VARCHAR(64),
+ period VARCHAR(16),
+ last_modify_time BIGINT,
+ next_exec_time BIGINT,
+ submit_time BIGINT,
+ submit_user VARCHAR(128),
+ enc_type TINYINT,
+ schedule_options LONGBLOB,
+ primary key(project_id, flow_name)
+);
src/java/azkaban/utils/db/H2TableSetup.java 178(+178 -0)
diff --git a/src/java/azkaban/utils/db/H2TableSetup.java b/src/java/azkaban/utils/db/H2TableSetup.java
new file mode 100644
index 0000000..5c576f2
--- /dev/null
+++ b/src/java/azkaban/utils/db/H2TableSetup.java
@@ -0,0 +1,178 @@
+package azkaban.utils.db;
+
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+public class H2TableSetup {
+ private static final Logger logger = Logger.getLogger(H2TableSetup.class);
+
+ private DataSource dataSource;
+ private Map<String, TableData> tables = new HashMap<String,TableData>();
+
+ public H2TableSetup(DataSource source) throws SQLException {
+ this.dataSource = source;
+ setupTables();
+ }
+
+ public void setupTables() throws SQLException {
+ Connection conn = dataSource.getConnection();
+ ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), null, null, new String[]{"TABLE"});
+
+ while(rs.next()) {
+ TableData data = new TableData(rs.getString("TABLE_NAME").toLowerCase());
+ data.setSchema(rs.getString("TABLE_SCHEMA"));
+ tables.put(data.getName(), data);
+ System.out.println(data.toString());
+ }
+
+ conn.close();
+ }
+
+ public void createExecutionTables() throws SQLException, IOException {
+ Connection conn = null;
+ try {
+ conn = dataSource.getConnection();
+
+ if (!tables.containsKey("execution_flows")) {
+ logger.info("Creating execution_flows table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/execution_flows.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("execution_jobs")) {
+ logger.info("Creating execution_jobs table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/execution_jobs.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("execution_logs")) {
+ logger.info("Creating execution_logs table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/execution_logs.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("active_executing_flows")) {
+ logger.info("Creating active_executing_flows table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/active_executing_flows.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ }
+ finally {
+ conn.close();
+ }
+ }
+
+ public void createOtherTables() throws SQLException, IOException {
+ Connection conn = null;
+ try {
+ conn = dataSource.getConnection();
+
+ if (!tables.containsKey("active_sla")) {
+ logger.info("Creating active_sla table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/active_sla.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("schedules")) {
+ logger.info("Creating schedules table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/schedules.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ }
+ finally {
+ conn.close();
+ }
+ }
+
+ public void createProjectTables() throws SQLException, IOException {
+ Connection conn = null;
+ try {
+ conn = dataSource.getConnection();
+
+ if (!tables.containsKey("projects")) {
+ logger.info("Creating projects table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/projects.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("project_versions")) {
+ logger.info("Creating project_versions table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_versions.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("project_events")) {
+ logger.info("Creating project_events table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_events.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("project_properties")) {
+ logger.info("Creating project_properties table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_properties.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("project_files")) {
+ logger.info("Creating project_files table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_files.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("project_permissions")) {
+ logger.info("Creating project_permissions table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_permissions.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ if (!tables.containsKey("project_flows")) {
+ logger.info("Creating project_flows table.");
+ URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_flows.sql");
+ String query = IOUtils.toString(url.openStream());
+ QueryRunner runner = new QueryRunner();
+ runner.update(conn, query);
+ conn.commit();
+ }
+ }
+ finally {
+ conn.close();
+ }
+ }
+}
src/java/azkaban/utils/db/TableData.java 26(+26 -0)
diff --git a/src/java/azkaban/utils/db/TableData.java b/src/java/azkaban/utils/db/TableData.java
new file mode 100644
index 0000000..c957744
--- /dev/null
+++ b/src/java/azkaban/utils/db/TableData.java
@@ -0,0 +1,26 @@
+package azkaban.utils.db;
+
+public class TableData {
+ private String name;
+ private String schema;
+
+ public TableData(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+
+ public String getSchema() {
+ return this.schema;
+ }
+
+ public String toString() {
+ return name + ": " + schema;
+ }
+}
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 7934a19..6f1c0c5 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -197,13 +197,14 @@ public class FileIOUtils {
return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
}
-
+
public static JobMetaData readUtf8MetaDataFile(File file, int fileOffset, int length) throws IOException {
byte[] buffer = new byte[length];
FileInputStream fileStream = new FileInputStream(file);
long skipped = fileStream.skip(fileOffset);
if (skipped < fileOffset) {
+ fileStream.close();
return new JobMetaData(fileOffset, 0, "");
}
diff --git a/src/java/azkaban/webapp/AzkabanSingleServer.java b/src/java/azkaban/webapp/AzkabanSingleServer.java
new file mode 100644
index 0000000..45bef7b
--- /dev/null
+++ b/src/java/azkaban/webapp/AzkabanSingleServer.java
@@ -0,0 +1,18 @@
+package azkaban.webapp;
+
+
+import org.apache.log4j.Logger;
+
+import azkaban.execapp.AzkabanExecutorServer;
+
+public class AzkabanSingleServer {
+ private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
+ public static void main(String[] args) throws Exception {
+ logger.info("Starting Azkaban Server");
+
+ AzkabanWebServer.main(args);
+ logger.info("Azkaban Web Server started...");
+ AzkabanExecutorServer.main(args);
+ logger.info("Azkaban Exec Server started...");
+ }
+}
\ No newline at end of file
src/java/azkaban/webapp/AzkabanWebServer.java 58(+34 -24)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index e3fe50f..c8f6244 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -43,6 +43,7 @@ import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
@@ -66,6 +67,7 @@ import azkaban.sla.SLAManager;
import azkaban.sla.SLAManagerException;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
+import azkaban.utils.db.AbstractJdbcLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -120,7 +122,7 @@ public class AzkabanWebServer implements AzkabanServer {
private static AzkabanWebServer app;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
- // private static final int DEFAULT_PORT_NUMBER = 8081;
+ private static final int DEFAULT_PORT_NUMBER = 8081;
private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
private static final int DEFAULT_THREAD_NUMBER = 20;
private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
@@ -202,7 +204,6 @@ public class AzkabanWebServer implements AzkabanServer {
logger.error("Could not instantiate UserManager "+ userManagerClass.getName());
throw new RuntimeException(e);
}
-
}
else {
manager = new XmlUserManager(props);
@@ -210,7 +211,7 @@ public class AzkabanWebServer implements AzkabanServer {
return manager;
}
-
+
private ProjectManager loadProjectManager(Props props) {
logger.info("Loading JDBC for project management");
@@ -396,25 +397,36 @@ public class AzkabanWebServer implements AzkabanServer {
return;
}
- // int portNumber =
- // azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
- int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",
- DEFAULT_SSL_PORT_NUMBER);
- int maxThreads = azkabanSettings.getInt("jetty.maxThreads",
- DEFAULT_THREAD_NUMBER);
-
- logger.info("Setting up Jetty Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
- final Server server = new Server();
- SslSocketConnector secureConnector = new SslSocketConnector();
- secureConnector.setPort(sslPortNumber);
- secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
- secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
- secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
- secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
- secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
- secureConnector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+ AbstractJdbcLoader.setupTables(azkabanSettings);
- server.addConnector(secureConnector);
+ int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
+ int port;
+ boolean usingSSL = false;
+ final Server server = new Server();
+ if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
+ int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port", DEFAULT_SSL_PORT_NUMBER);
+ port = sslPortNumber;
+ usingSSL = true;
+ logger.info("Setting up Jetty Https Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
+
+ SslSocketConnector secureConnector = new SslSocketConnector();
+ secureConnector.setPort(sslPortNumber);
+ secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
+ secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
+ secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
+ secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
+ secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
+ secureConnector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+
+ server.addConnector(secureConnector);
+ }
+ else {
+ port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
+ SocketConnector connector = new SocketConnector();
+ connector.setPort(port);
+ connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+ server.addConnector(connector);
+ }
app = new AzkabanWebServer(server, azkabanSettings);
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
@@ -445,8 +457,6 @@ public class AzkabanWebServer implements AzkabanServer {
String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine()));
- //root.addServlet(new ServletHolder(new HdfsBrowserServlet()), "/hdfs/*");
-
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
try {
server.start();
@@ -471,7 +481,7 @@ public class AzkabanWebServer implements AzkabanServer {
logger.info("kk thx bye.");
}
});
- logger.info("Server running on port " + sslPortNumber + ".");
+ logger.info("Server running on " + (usingSSL ? "ssl" : "") + " port " + port + ".");
}
private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
diff --git a/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java b/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java
index 744b4f7..c087f86 100644
--- a/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java
+++ b/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java
@@ -39,7 +39,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
import azkaban.utils.Props;
import azkaban.utils.Utils;
import azkaban.webapp.AzkabanAdminServer;
src/sql/create_settings.sql 8(+8 -0)
diff --git a/src/sql/create_settings.sql b/src/sql/create_settings.sql
new file mode 100644
index 0000000..b8a73a7
--- /dev/null
+++ b/src/sql/create_settings.sql
@@ -0,0 +1,8 @@
+CREATE TABLE properties (
+ name VARCHAR(64) NOT NULL,
+ modified_time BIGINT NOT NULL,
+ property BLOB,
+ PRIMARY KEY (name)
+) ENGINE=InnoDB;
+
+INSERT INTO properties (name, modified_time, property) VALUES ( UNIX_TIMESTAMP(), "2.1")
\ No newline at end of file
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index f1717b2..7ead267 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -22,7 +22,7 @@ import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
import azkaban.sla.SlaOptions;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
import azkaban.utils.Props;
public class JdbcScheduleLoaderTest {
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 903491a..3603608 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -31,7 +31,7 @@ import azkaban.executor.JdbcExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
diff --git a/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java b/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
index acb21a3..008a812 100644
--- a/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
+++ b/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
@@ -22,7 +22,6 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.JdbcProjectLoader;
-import azkaban.project.JdbcProjectLoader.EncodingType;
import azkaban.project.Project;
import azkaban.project.ProjectFileHandler;
import azkaban.project.ProjectLoader;
@@ -31,7 +30,7 @@ import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.ProjectManagerException;
import azkaban.user.Permission;
import azkaban.user.User;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -307,7 +306,7 @@ public class JdbcProjectLoaderTest {
@Test
public void testFlowUpload() throws ProjectManagerException {
ProjectLoader loader = createLoader();
- ((JdbcProjectLoader)loader).setDefaultEncodingType(EncodingType.GZIP);
+ ((JdbcProjectLoader)loader).setDefaultEncodingType(JdbcProjectLoader.EncodingType.GZIP);
String projectName = "mytestFlowUpload1";
String projectDescription = "This is my new project";
User user = new User("testUser");
@@ -340,7 +339,7 @@ public class JdbcProjectLoaderTest {
@Test
public void testFlowUploadPlain() throws ProjectManagerException {
ProjectLoader loader = createLoader();
- ((JdbcProjectLoader)loader).setDefaultEncodingType(EncodingType.PLAIN);
+ ((JdbcProjectLoader)loader).setDefaultEncodingType(JdbcProjectLoader.EncodingType.PLAIN);
String projectName = "mytestFlowUpload2";
String projectDescription = "This is my new project";
User user = new User("testUser");
@@ -376,7 +375,7 @@ public class JdbcProjectLoaderTest {
@Test
public void testProjectProperties() throws ProjectManagerException {
ProjectLoader loader = createLoader();
- ((JdbcProjectLoader)loader).setDefaultEncodingType(EncodingType.PLAIN);
+ ((JdbcProjectLoader)loader).setDefaultEncodingType(JdbcProjectLoader.EncodingType.PLAIN);
String projectName = "testProjectProperties";
String projectDescription = "This is my new project";
User user = new User("testUser");
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
index 02b7855..23473f7 100644
--- a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -22,7 +22,7 @@ import azkaban.sla.SLA;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLALoader;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
import azkaban.utils.Props;
diff --git a/unit/java/azkaban/test/utils/db/H2TableSetupTest.java b/unit/java/azkaban/test/utils/db/H2TableSetupTest.java
new file mode 100644
index 0000000..f914fd1
--- /dev/null
+++ b/unit/java/azkaban/test/utils/db/H2TableSetupTest.java
@@ -0,0 +1,41 @@
+package azkaban.test.utils.db;
+
+import java.io.File;
+import java.io.IOException;
+import javax.sql.DataSource;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.utils.db.DataSourceUtils;
+import azkaban.utils.db.H2TableSetup;
+
+public class H2TableSetupTest {
+ private static DataSource datasource;
+
+ @BeforeClass
+ public static void setupDB() throws IOException {
+ File dbDir = new File("h2dbtest");
+ if (dbDir.exists()) {
+ FileUtils.deleteDirectory(dbDir);
+ }
+
+ dbDir.mkdir();
+ datasource = DataSourceUtils.getH2DataSource("h2dbtest/h2db");
+ }
+
+ @AfterClass
+ public static void teardownDB() {
+ }
+
+ @Test
+ public void queryTables() throws Exception {
+ H2TableSetup setup = new H2TableSetup(datasource);
+ setup.createProjectTables();
+ setup.createExecutionTables();
+ setup.createOtherTables();
+ setup.setupTables();
+ }
+}
diff --git a/unit/project/testfailure/failflow.job b/unit/project/testfailure/failflow.job
new file mode 100644
index 0000000..a65b7c4
--- /dev/null
+++ b/unit/project/testfailure/failflow.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+fail=false
+dependencies=myjob2-fail30,myjob2
unit/project/testfailure/myjob1.job 4(+4 -0)
diff --git a/unit/project/testfailure/myjob1.job b/unit/project/testfailure/myjob1.job
new file mode 100644
index 0000000..660e2a9
--- /dev/null
+++ b/unit/project/testfailure/myjob1.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=10
+fail=false
unit/project/testfailure/myjob2.job 4(+4 -0)
diff --git a/unit/project/testfailure/myjob2.job b/unit/project/testfailure/myjob2.job
new file mode 100644
index 0000000..f5a05ed
--- /dev/null
+++ b/unit/project/testfailure/myjob2.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=30
+fail=false
diff --git a/unit/project/testfailure/myjob2-fail30.job b/unit/project/testfailure/myjob2-fail30.job
new file mode 100644
index 0000000..03540bb
--- /dev/null
+++ b/unit/project/testfailure/myjob2-fail30.job
@@ -0,0 +1,6 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=10
+fail=true
+passRetry=2
+dependencies=myjob3
unit/project/testfailure/myjob3.job 5(+5 -0)
diff --git a/unit/project/testfailure/myjob3.job b/unit/project/testfailure/myjob3.job
new file mode 100644
index 0000000..a1d7ded
--- /dev/null
+++ b/unit/project/testfailure/myjob3.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+dependencies=myjob1
+fail=false
unit/project/testfailure/test.jar 0(+0 -0)
diff --git a/unit/project/testfailure/test.jar b/unit/project/testfailure/test.jar
new file mode 100644
index 0000000..4f1955a
Binary files /dev/null and b/unit/project/testfailure/test.jar differ
diff --git a/unit/project/testfailure/testfailure.zip b/unit/project/testfailure/testfailure.zip
new file mode 100644
index 0000000..dc0b640
Binary files /dev/null and b/unit/project/testfailure/testfailure.zip differ