azkaban-memoizeit

Changes

.classpath 1(+1 -0)

Details

.classpath 1(+1 -0)

diff --git a/.classpath b/.classpath
index 3265399..230b137 100644
--- a/.classpath
+++ b/.classpath
@@ -31,5 +31,6 @@
 	<classpathentry kind="lib" path="lib/commons-dbcp-1.4.jar"/>
 	<classpathentry kind="lib" path="extlib/mysql-connector-java-5.1.16-bin.jar"/>
 	<classpathentry kind="lib" path="lib/commons-pool-1.6.jar"/>
+	<classpathentry kind="lib" path="lib/h2-1.3.170.jar"/>
 	<classpathentry kind="output" path="dist/classes"/>
 </classpath>
diff --git a/lib/h2-1.3.170.jar b/lib/h2-1.3.170.jar
new file mode 100644
index 0000000..ad61ab8
Binary files /dev/null and b/lib/h2-1.3.170.jar differ
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 4f5fd02..d4dffef 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -296,7 +296,7 @@ public class AzkabanExecutorServer {
 		logger.info("Registering MBeans...");
 		mbeanServer = ManagementFactory.getPlatformMBeanServer();
 
-		registerMbean("jetty", new JmxJettyServer(server));
+		registerMbean("executorJetty", new JmxJettyServer(server));
 		registerMbean("flowRunnerManager", new JmxFlowRunnerManager(runnerManager));
 	}
 	
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 4a13a4b..8ca895e 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -81,6 +81,7 @@ public abstract class FlowWatcher {
 		cancelWatch = true;
 		
 		for(BlockingStatus status : map.values()) {
+			logger.info("Unblocking " + status.getJobId());
 			status.changeStatus(Status.KILLED);
 			status.unblock();
 		}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 005e720..fb9caf6 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -725,7 +725,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		public synchronized void handleEvent(Event event) {
 			JobRunner runner = (JobRunner)event.getRunner();
 			
-			if (event.getType() == Type.JOB_FINISHED) {
+			if (event.getType() == Type.JOB_STATUS_CHANGED) {
+				updateFlow();
+			}
+			else if (event.getType() == Type.JOB_FINISHED) {
 				synchronized(mainSyncObj) {
 					ExecutableNode node = runner.getNode();
 					activeJobRunners.remove(node.getJobId());
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 5f8cade..9096b2f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -231,6 +231,8 @@ public class JobRunner extends EventHandler implements Runnable {
 							break;
 						}
 					}
+					writeStatus();	
+					fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED));
 				}
 				if (watcher.isWatchCancelled()) {
 					logger.info("Job was cancelled while waiting on pipeline. Quiting.");
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index dbcc02b..e67a7e2 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -15,8 +15,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.sql.DataSource;
-
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
@@ -24,7 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.GZIPUtils;
@@ -33,53 +31,13 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 
-public class JdbcExecutorLoader implements ExecutorLoader {
+public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLoader {
 	private static final Logger logger = Logger.getLogger(JdbcExecutorLoader.class);
-	
-	/**
-	 * Used for when we store text data. Plain uses UTF8 encoding.
-	 */
-	public static enum EncodingType {
-		PLAIN(1), GZIP(2);
-
-		private int numVal;
-
-		EncodingType(int numVal) {
-			this.numVal = numVal;
-		}
-
-		public int getNumVal() {
-			return numVal;
-		}
 
-		public static EncodingType fromInteger(int x) {
-			switch (x) {
-			case 1:
-				return PLAIN;
-			case 2:
-				return GZIP;
-			default:
-				return PLAIN;
-			}
-		}
-	}
-	
-	private DataSource dataSource;
 	private EncodingType defaultEncodingType = EncodingType.GZIP;
 	
 	public JdbcExecutorLoader(Props props) {
-		String databaseType = props.getString("database.type");
-
-		if (databaseType.equals("mysql")) {
-			int port = props.getInt("mysql.port");
-			String host = props.getString("mysql.host");
-			String database = props.getString("mysql.database");
-			String user = props.getString("mysql.user");
-			String password = props.getString("mysql.password");
-			int numConnections = props.getInt("mysql.numconnections");
-			
-			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
-		}
+		super(props);
 	}
 
 	public EncodingType getDefaultEncodingType() {
@@ -168,7 +126,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public ExecutableFlow fetchExecutableFlow(int id) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
 		try {
@@ -181,7 +139,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
 
 		try {
@@ -194,7 +152,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public int fetchNumExecutableFlows() throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		IntHandler intHandler = new IntHandler();
 		try {
@@ -207,7 +165,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		IntHandler intHandler = new IntHandler();
 		try {
@@ -220,7 +178,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		IntHandler intHandler = new IntHandler();
 		try {
@@ -233,7 +191,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
 		try {
@@ -246,7 +204,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 		
@@ -338,7 +296,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			params.add(num);
 		}
 		
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
 		try {
@@ -352,7 +310,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	@Override
 	public void addActiveExecutableReference(ExecutionReference reference) throws ExecutorManagerException {
 		final String INSERT = "INSERT INTO active_executing_flows (exec_id, host, port, update_time) values (?,?,?,?)";
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		try {
 			runner.update(INSERT, reference.getExecId(), reference.getHost(), reference.getPort(), reference.getUpdateTime());
@@ -365,7 +323,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	public void removeActiveExecutableReference(int execid) throws ExecutorManagerException {
 		final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
 		
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			runner.update(DELETE, execid);
 		} catch (SQLException e) {
@@ -377,7 +335,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	public boolean updateExecutableReference(int execId, long updateTime) throws ExecutorManagerException {
 		final String DELETE = "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
 		
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		int updateNum = 0;
 		try {
 			updateNum = runner.update(DELETE, updateTime, execId);
@@ -404,7 +362,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 		}
 		
 		ExecutableFlow flow = node.getFlow();
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			runner.update(
 					INSERT_EXECUTION_NODE, 
@@ -439,7 +397,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			}
 		}
 		
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			runner.update(
 					UPSERT_EXECUTION_NODE, 
@@ -457,7 +415,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		try {
 			List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, new FetchExecutableJobHandler(), execId, jobId);
@@ -473,7 +431,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		try {
 			List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE, new FetchExecutableJobHandler(), execId, jobId);
@@ -489,7 +447,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
 			return props.getFirst();
@@ -501,7 +459,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
 			return props.getFirst();
@@ -513,7 +471,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
 			return props;
@@ -525,7 +483,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		try {
 			List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE, new FetchExecutableJobHandler(), projectId, jobId, skip, size);
@@ -541,7 +499,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int length) throws ExecutorManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 
 		FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
 		
@@ -638,8 +596,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	private Connection getConnection() throws ExecutorManagerException {
 		Connection connection = null;
 		try {
-			connection = dataSource.getConnection();
-			connection.setAutoCommit(false);
+			connection = super.getDBConnection(false);
 		} catch (Exception e) {
 			DbUtils.closeQuietly(connection);
 			throw new ExecutorManagerException("Error getting DB connection.", e);
@@ -919,7 +876,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	public int removeExecutionLogsByTime(long millis) throws ExecutorManagerException {
 		final String DELETE_BY_TIME = "DELETE FROM execution_logs WHERE upload_time < ?";
 		
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		int updateNum = 0;
 		try {
 			updateNum = runner.update(DELETE_BY_TIME, millis);
diff --git a/src/java/azkaban/executor/RemoteExecutorConnector.java b/src/java/azkaban/executor/RemoteExecutorConnector.java
new file mode 100644
index 0000000..6bbfe34
--- /dev/null
+++ b/src/java/azkaban/executor/RemoteExecutorConnector.java
@@ -0,0 +1,5 @@
+package azkaban.executor;
+
+public class RemoteExecutorConnector {
+	
+}
\ No newline at end of file
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 18d8cf3..972b531 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -17,8 +17,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.sql.DataSource;
-
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
@@ -29,7 +27,7 @@ import azkaban.flow.Flow;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
 import azkaban.user.User;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Md5Hasher;
@@ -38,60 +36,20 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Triple;
 
-public class JdbcProjectLoader implements ProjectLoader {
-	
-	/**
-	 * Used for when we store text data. Plain uses UTF8 encoding.
-	 */
-	public static enum EncodingType {
-		PLAIN(1), GZIP(2);
-
-		private int numVal;
-
-		EncodingType(int numVal) {
-			this.numVal = numVal;
-		}
-
-		public int getNumVal() {
-			return numVal;
-		}
-
-		public static EncodingType fromInteger(int x) {
-			switch (x) {
-			case 1:
-				return PLAIN;
-			case 2:
-				return GZIP;
-			default:
-				return PLAIN;
-			}
-		}
-	}
-	
+public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoader {
 	private static final Logger logger = Logger.getLogger(JdbcProjectLoader.class);
+
 	private static final int CHUCK_SIZE = 1024*1024*10;
 	private File tempDir;
-	
-	private DataSource dataSource;
+
 	private EncodingType defaultEncodingType = EncodingType.GZIP;
 	
 	public JdbcProjectLoader(Props props) {
+		super(props);
 		tempDir = new File(props.getString("project.temp.dir", "temp"));
 		if (!tempDir.exists()) {
 			tempDir.mkdirs();
 		}
-		String databaseType = props.getString("database.type");
-	
-		if (databaseType.equals("mysql")) {
-			int port = props.getInt("mysql.port");
-			String host = props.getString("mysql.host");
-			String database = props.getString("mysql.database");
-			String user = props.getString("mysql.user");
-			String password = props.getString("mysql.password");
-			int numConnections = props.getInt("mysql.numconnections");
-			
-			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
-		}
 	}
 	
 	@Override
@@ -240,7 +198,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 		// Insert project
 		try {
 			long time = System.currentTimeMillis();
-			int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description, defaultEncodingType.numVal, null);
+			int i = runner.update(connection, INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description, defaultEncodingType.getNumVal(), null);
 			if (i == 0) {
 				throw new ProjectManagerException("No projects have been inserted.");
 			}
@@ -468,7 +426,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	@Override
 	public void changeProjectVersion(Project project, int version, String user) throws ProjectManagerException {
 		long timestamp = System.currentTimeMillis();
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			final String UPDATE_PROJECT_VERSION = "UPDATE projects SET version=?,modified_time=?,last_modified_by=? WHERE id=?";
 			
@@ -484,18 +442,32 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public void updatePermission(Project project, String name, Permission perm, boolean isGroup) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
-		long updateTime = System.currentTimeMillis();
-		final String INSERT_PROJECT_PERMISSION = 
-				"INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)" +
-				"ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
-
-		try {
-			runner.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
-		} catch (SQLException e) {
-			logger.error(e);
-			throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, e);
+		if (this.allowsOnDuplicateKey()) {
+			long updateTime = System.currentTimeMillis();
+			final String INSERT_PROJECT_PERMISSION = 
+					"INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)" +
+					"ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
+	
+			try {
+				runner.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+			} catch (SQLException e) {
+				logger.error(e);
+				throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, e);
+			}
+		}
+		else {
+			long updateTime = System.currentTimeMillis();
+			final String MERGE_PROJECT_PERMISSION = 
+					"MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
+		
+			try {
+				runner.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+			} catch (SQLException e) {
+				logger.error(e);
+				throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, e);
+			}
 		}
 		
 		if (isGroup) {
@@ -542,7 +514,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 		}
 
 		try {
-			runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.numVal, data, project.getId());
+			runner.update(connection, UPDATE_PROJECT_SETTINGS, encType.getNumVal(), data, project.getId());
 			connection.commit();
 		} catch (SQLException e) {
 			throw new ProjectManagerException("Error updating project " + project.getName() + " version " + project.getVersion(), e);
@@ -551,7 +523,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public void removePermission(Project project, String name, boolean isGroup) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		final String DELETE_PROJECT_PERMISSION = "DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
 		
 		try {
@@ -572,7 +544,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	@Override
 	public List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId) throws ProjectManagerException {
 		ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		List<Triple<String, Boolean,Permission>> permissions = null;
 		try {
 			permissions = runner.query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, projectId);
@@ -586,7 +558,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public void removeProject(Project project, String user) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		long updateTime = System.currentTimeMillis();
 		final String UPDATE_INACTIVE_PROJECT = "UPDATE projects SET active=false,modified_time=?,last_modified_by=? WHERE id=?";
@@ -600,7 +572,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 
 	@Override
 	public boolean postEvent(Project project, EventType type, String user, String message) {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		final String INSERT_PROJECT_EVENTS = 
 				"INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
@@ -623,7 +595,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	 * @throws ProjectManagerException
 	 */
 	public List<ProjectLogEvent> getProjectEvents(Project project, int num, int skip) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
 		List<ProjectLogEvent> events = null;
@@ -638,7 +610,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 
 	@Override
 	public void updateDescription(Project project, String description, String user) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		final String UPDATE_PROJECT_DESCRIPTION =
 				"UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
@@ -656,7 +628,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 
 	@Override
 	public int getLatestProjectVersion(Project project) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		IntHander handler = new IntHander();
 		try {
@@ -739,7 +711,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public Flow fetchFlow(Project project, String flowId) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
 		
 		try {
@@ -757,7 +729,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public List<Flow> fetchAllProjectFlows(Project project) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
 		
 		List<Flow> flows = null;
@@ -867,7 +839,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 
 	@Override
 	public Props fetchProjectProperty(int projectId, int projectVer, String propsName) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
 		try {
@@ -886,7 +858,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public Props fetchProjectProperty(Project project, String propsName) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
 		try {
@@ -964,7 +936,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	
 	@Override
 	public Map<String,Props> fetchProjectProperties(int projectId, int version) throws ProjectManagerException {
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		
 		ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
 		try {
@@ -1278,8 +1250,7 @@ public class JdbcProjectLoader implements ProjectLoader {
 	private Connection getConnection() throws ProjectManagerException {
 		Connection connection = null;
 		try {
-			connection = dataSource.getConnection();
-			connection.setAutoCommit(false);
+			connection = super.getDBConnection(false);
 		} catch (Exception e) {
 			DbUtils.closeQuietly(connection);
 			throw new ProjectManagerException("Error getting DB connection.", e);
@@ -1287,6 +1258,5 @@ public class JdbcProjectLoader implements ProjectLoader {
 		
 		return connection;
 	}
-
 }
 	
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 5227a90..4b70fbc 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -16,7 +16,6 @@
 
 package azkaban.scheduler;
 
-
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -25,8 +24,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import javax.sql.DataSource;
-
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
@@ -35,42 +32,15 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
 
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 
-public class JdbcScheduleLoader implements ScheduleLoader {
-
+public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLoader {
 	private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
-	
-	public static enum EncodingType {
-		PLAIN(1), GZIP(2);
-
-		private int numVal;
 
-		EncodingType(int numVal) {
-			this.numVal = numVal;
-		}
-
-		public int getNumVal() {
-			return numVal;
-		}
-
-		public static EncodingType fromInteger(int x) {
-			switch (x) {
-			case 1:
-				return PLAIN;
-			case 2:
-				return GZIP;
-			default:
-				return PLAIN;
-			}
-		}
-	}
-	
-	private DataSource dataSource;
 	private EncodingType defaultEncodingType = EncodingType.GZIP;
 	
 	private static final String scheduleTableName = "schedules";
@@ -90,18 +60,6 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 	private static String UPDATE_NEXT_EXEC_TIME = 
 			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
 
-	private Connection getConnection() throws ScheduleManagerException {
-		Connection connection = null;
-		try {
-			connection = dataSource.getConnection();
-		} catch (Exception e) {
-			DbUtils.closeQuietly(connection);
-			throw new ScheduleManagerException("Error getting DB connection.", e);
-		}
-		
-		return connection;
-	}
-	
 	public EncodingType getDefaultEncodingType() {
 		return defaultEncodingType;
 	}
@@ -111,18 +69,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 	}
 	
 	public JdbcScheduleLoader(Props props) {
-		String databaseType = props.getString("database.type");
-		
-		if (databaseType.equals("mysql")) {
-			int port = props.getInt("mysql.port");
-			String host = props.getString("mysql.host");
-			String database = props.getString("mysql.database");
-			String user = props.getString("mysql.user");
-			String password = props.getString("mysql.password");
-			int numConnections = props.getInt("mysql.numconnections");
-			
-			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
-		}
+		super(props);
 	}
 
 	@Override
@@ -178,8 +125,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 	public void removeSchedule(Schedule s) throws ScheduleManagerException {		
 		logger.info("Removing schedule " + s.getScheduleName() + " from db.");
 
-		QueryRunner runner = new QueryRunner(dataSource);
-	
+		QueryRunner runner = createQueryRunner();
 		try {
 			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
 			if (removes == 0) {
@@ -214,7 +160,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 			throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
 		}
 		
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 		try {
 			int inserts =  runner.update( 
 					INSERT_SCHEDULE, 
@@ -281,7 +227,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 			throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
 		}
 
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 	
 		try {
 			int updates =  runner.update( 
@@ -362,4 +308,16 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 		}
 		
 	}
+	
+	private Connection getConnection() throws ScheduleManagerException {
+		Connection connection = null;
+		try {
+			connection = super.getDBConnection(false);
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new ScheduleManagerException("Error getting DB connection.", e);
+		}
+		
+		return connection;
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
index 0ab7b3b..ffa4221 100644
--- a/src/java/azkaban/sla/JdbcSLALoader.java
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -8,8 +8,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import javax.sql.DataSource;
-
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
@@ -17,44 +15,13 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.sla.SLA.SlaRule;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.AbstractJdbcLoader;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
-public class JdbcSLALoader implements SLALoader {
-
+public class JdbcSLALoader extends AbstractJdbcLoader implements SLALoader {
 	private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
-	
-	/**
-	 * Used for when we store text data. Plain uses UTF8 encoding.
-	 */
-	public static enum EncodingType {
-		PLAIN(1), GZIP(2);
-
-		private int numVal;
-
-		EncodingType(int numVal) {
-			this.numVal = numVal;
-		}
-
-		public int getNumVal() {
-			return numVal;
-		}
-
-		public static EncodingType fromInteger(int x) {
-			switch (x) {
-			case 1:
-				return PLAIN;
-			case 2:
-				return GZIP;
-			default:
-				return PLAIN;
-			}
-		}
-	}
-	
-	private DataSource dataSource;
 	private EncodingType defaultEncodingType = EncodingType.GZIP;
 	
 	private static String slaTblName = "active_sla";
@@ -73,28 +40,18 @@ public class JdbcSLALoader implements SLALoader {
 			"DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
 	
 	public JdbcSLALoader(Props props) {
-		String databaseType = props.getString("database.type");
-
-		if (databaseType.equals("mysql")) {
-			int port = props.getInt("mysql.port");
-			String host = props.getString("mysql.host");
-			String database = props.getString("mysql.database");
-			String user = props.getString("mysql.user");
-			String password = props.getString("mysql.password");
-			int numConnections = props.getInt("mysql.numconnections");
-			
-			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
-		}
+		super(props);
 	}
 
 	private Connection getConnection() throws SLAManagerException {
 		Connection connection = null;
 		try {
-			connection = dataSource.getConnection();
+			connection = super.getDBConnection(false);
 		} catch (Exception e) {
 			DbUtils.closeQuietly(connection);
 			throw new SLAManagerException("Error getting DB connection.", e);
 		}
+		
 		return connection;
 	}
 	
@@ -166,7 +123,7 @@ public class JdbcSLALoader implements SLALoader {
 
 		logger.info("Removing SLA " + s.toString() + " from db.");
 
-		QueryRunner runner = new QueryRunner(dataSource);
+		QueryRunner runner = createQueryRunner();
 	
 		try {
 			int removes =  runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
@@ -322,5 +279,5 @@ public class JdbcSLALoader implements SLALoader {
 		}
 		
 	}
-	
+
 }
diff --git a/src/java/azkaban/utils/db/AbstractJdbcLoader.java b/src/java/azkaban/utils/db/AbstractJdbcLoader.java
new file mode 100644
index 0000000..fabfbac
--- /dev/null
+++ b/src/java/azkaban/utils/db/AbstractJdbcLoader.java
@@ -0,0 +1,102 @@
+package azkaban.utils.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+
+import azkaban.utils.Props;
+import azkaban.utils.db.H2TableSetup;
+
+public abstract class AbstractJdbcLoader {
+	private boolean allowsOnDuplicateKey = false;
+	
+	/**
+	 * Used for when we store text data. Plain uses UTF8 encoding.
+	 */
+	public static enum EncodingType {
+		PLAIN(1), GZIP(2);
+
+		private int numVal;
+
+		EncodingType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static EncodingType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return PLAIN;
+			case 2:
+				return GZIP;
+			default:
+				return PLAIN;
+			}
+		}
+	}
+	
+	private DataSource dataSource;
+	
+	public static void setupTables(Props props) throws SQLException, IOException {
+		String databaseType = props.getString("database.type");
+		
+		if (databaseType.equals("h2")) {
+			String path = props.getString("h2.path");
+			DataSource dataSource = DataSourceUtils.getH2DataSource(path);
+			H2TableSetup tableSetup = new H2TableSetup(dataSource);
+			
+			tableSetup.createProjectTables();
+			tableSetup.createExecutionTables();
+			tableSetup.createOtherTables();
+		}
+	}
+	
+	public AbstractJdbcLoader(Props props) {
+		String databaseType = props.getString("database.type");
+		
+		if (databaseType.equals("mysql")) {
+			int port = props.getInt("mysql.port");
+			String host = props.getString("mysql.host");
+			String database = props.getString("mysql.database");
+			String user = props.getString("mysql.user");
+			String password = props.getString("mysql.password");
+			int numConnections = props.getInt("mysql.numconnections");
+			
+			allowsOnDuplicateKey = true;
+			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		}
+		else if (databaseType.equals("h2")) {
+			String path = props.getString("h2.path");
+			dataSource = DataSourceUtils.getH2DataSource(path);
+		}
+	}
+	
+	protected Connection getDBConnection(boolean autoCommit) throws IOException {
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+			connection.setAutoCommit(autoCommit);
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new IOException("Error getting DB connection.", e);
+		}
+		
+		return connection;
+	}
+
+	protected QueryRunner createQueryRunner()  {
+		return new QueryRunner(dataSource);
+	}
+	
+	protected boolean allowsOnDuplicateKey() {
+		return allowsOnDuplicateKey;
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/active_executing_flows.sql b/src/java/azkaban/utils/db/h2/active_executing_flows.sql
new file mode 100644
index 0000000..960ebc5
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/active_executing_flows.sql
@@ -0,0 +1,7 @@
+CREATE TABLE active_executing_flows (
+	exec_id INT,
+	host VARCHAR(255),
+	port INT,
+	update_time BIGINT,
+	PRIMARY KEY (exec_id)
+);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/active_sla.sql b/src/java/azkaban/utils/db/h2/active_sla.sql
new file mode 100644
index 0000000..1bbc1b8
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/active_sla.sql
@@ -0,0 +1,9 @@
+CREATE TABLE active_sla (
+	exec_id INT NOT NULL,
+	job_name VARCHAR(128) NOT NULL,
+	check_time BIGINT NOT NULL,
+	rule TINYINT NOT NULL,
+	enc_type TINYINT,
+	options LONGBLOB NOT NULL,
+	primary key(exec_id, job_name)
+);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/execution_flows.sql b/src/java/azkaban/utils/db/h2/execution_flows.sql
new file mode 100644
index 0000000..12bfa46
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/execution_flows.sql
@@ -0,0 +1,20 @@
+CREATE TABLE execution_flows (
+	exec_id INT NOT NULL AUTO_INCREMENT,
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	flow_id VARCHAR(128) NOT NULL,
+	status TINYINT,
+	submit_user VARCHAR(64),
+	submit_time BIGINT,
+	update_time BIGINT,
+	start_time BIGINT,
+	end_time BIGINT,
+	enc_type TINYINT,
+	flow_data LONGBLOB,
+	PRIMARY KEY (exec_id)
+);
+
+CREATE INDEX ex_flows_start_time ON execution_flows(start_time);
+CREATE INDEX ex_flows_end_time ON execution_flows(end_time);
+CREATE INDEX ex_flows_time_range ON execution_flows(start_time, end_time);
+CREATE INDEX ex_flows_flows ON execution_flows(project_id, flow_id);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/execution_jobs.sql b/src/java/azkaban/utils/db/h2/execution_jobs.sql
new file mode 100644
index 0000000..a62d3a9
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/execution_jobs.sql
@@ -0,0 +1,19 @@
+CREATE TABLE execution_jobs (
+	exec_id INT NOT NULL,
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	flow_id VARCHAR(128) NOT NULL,
+	job_id VARCHAR(128) NOT NULL,
+	attempt INT,
+	start_time BIGINT,
+	end_time BIGINT,
+	status TINYINT,
+	input_params LONGBLOB,
+	output_params LONGBLOB,
+	attachments LONGBLOB,
+	PRIMARY KEY (exec_id, job_id, attempt)
+);
+
+CREATE INDEX exec_job ON execution_jobs(exec_id, job_id);
+CREATE INDEX exec_id ON execution_jobs(exec_id);
+CREATE INDEX ex_job_id ON execution_jobs(project_id, job_id);
diff --git a/src/java/azkaban/utils/db/h2/execution_logs.sql b/src/java/azkaban/utils/db/h2/execution_logs.sql
new file mode 100644
index 0000000..0aa6a36
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/execution_logs.sql
@@ -0,0 +1,14 @@
+CREATE TABLE execution_logs (
+	exec_id INT NOT NULL,
+	name VARCHAR(128),
+	attempt INT,
+	enc_type TINYINT,
+	start_byte INT,
+	end_byte INT,
+	log LONGBLOB,
+	upload_time BIGINT,
+	PRIMARY KEY (exec_id, name, attempt, start_byte)
+);
+
+CREATE INDEX ex_log_attempt ON execution_logs(exec_id, name, attempt);
+CREATE INDEX ex_log_index ON execution_logs(exec_id, name);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_events.sql b/src/java/azkaban/utils/db/h2/project_events.sql
new file mode 100644
index 0000000..dd24d5f
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_events.sql
@@ -0,0 +1,9 @@
+CREATE TABLE project_events (
+	project_id INT NOT NULL,
+	event_type TINYINT NOT NULL,
+	event_time BIGINT NOT NULL,
+	username VARCHAR(64),
+	message VARCHAR(512)
+);
+
+CREATE INDEX log ON project_events(project_id, event_time);
diff --git a/src/java/azkaban/utils/db/h2/project_files.sql b/src/java/azkaban/utils/db/h2/project_files.sql
new file mode 100644
index 0000000..7dc3737
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_files.sql
@@ -0,0 +1,10 @@
+CREATE TABLE project_files (
+	project_id INT NOT NULL,
+	version INT not NULL,
+	chunk INT,
+	size INT,
+	file LONGBLOB,
+	PRIMARY KEY (project_id, version, chunk)
+);
+
+CREATE INDEX file_version ON project_files(project_id, version);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_flows.sql b/src/java/azkaban/utils/db/h2/project_flows.sql
new file mode 100644
index 0000000..bbd524a
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_flows.sql
@@ -0,0 +1,11 @@
+CREATE TABLE project_flows (
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	flow_id VARCHAR(128),
+	modified_time BIGINT NOT NULL,
+	encoding_type TINYINT,
+	json BLOB,
+	PRIMARY KEY (project_id, version, flow_id)
+);
+
+CREATE INDEX flow_index ON project_flows(project_id, version);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_permissions.sql b/src/java/azkaban/utils/db/h2/project_permissions.sql
new file mode 100644
index 0000000..f49d0b8
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_permissions.sql
@@ -0,0 +1,10 @@
+CREATE TABLE project_permissions (
+	project_id VARCHAR(64) NOT NULL,
+	modified_time BIGINT NOT NULL,
+	name VARCHAR(64) NOT NULL,
+	permissions INT NOT NULL,
+	isGroup BOOLEAN NOT NULL,
+	PRIMARY KEY (project_id, name)
+);
+
+CREATE INDEX permission_index ON project_permissions(project_id);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_properties.sql b/src/java/azkaban/utils/db/h2/project_properties.sql
new file mode 100644
index 0000000..1c602bb
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_properties.sql
@@ -0,0 +1,11 @@
+CREATE TABLE project_properties (
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	name VARCHAR(128),
+	modified_time BIGINT NOT NULL,
+	encoding_type TINYINT,
+	property BLOB,
+	PRIMARY KEY (project_id, version, name)
+);
+
+CREATE INDEX properties_index ON project_properties(project_id, version);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/project_versions.sql b/src/java/azkaban/utils/db/h2/project_versions.sql
new file mode 100644
index 0000000..1c764a2
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/project_versions.sql
@@ -0,0 +1,13 @@
+CREATE TABLE project_versions (
+	project_id INT NOT NULL,
+	version INT not NULL,
+	upload_time BIGINT NOT NULL,
+	uploader VARCHAR(64) NOT NULL,
+	file_type VARCHAR(16),
+	file_name VARCHAR(128),
+	md5 BINARY(16),
+	num_chunks INT,
+	PRIMARY KEY (project_id, version)
+);
+
+CREATE INDEX version_index ON project_versions(project_id);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/projects.sql b/src/java/azkaban/utils/db/h2/projects.sql
new file mode 100644
index 0000000..475615a
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/projects.sql
@@ -0,0 +1,15 @@
+CREATE TABLE projects (
+	id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+	name VARCHAR(64) NOT NULL,
+	active BOOLEAN,
+	modified_time BIGINT NOT NULL,
+	create_time BIGINT NOT NULL,
+	version INT,
+	last_modified_by VARCHAR(64) NOT NULL,
+	description VARCHAR(255),
+	enc_type TINYINT,
+	settings_blob LONGBLOB,
+	UNIQUE INDEX project_id (id)
+);
+
+CREATE INDEX project_name ON projects(name);
\ No newline at end of file
diff --git a/src/java/azkaban/utils/db/h2/schedules.sql b/src/java/azkaban/utils/db/h2/schedules.sql
new file mode 100644
index 0000000..1924ecd
--- /dev/null
+++ b/src/java/azkaban/utils/db/h2/schedules.sql
@@ -0,0 +1,16 @@
+CREATE TABLE schedules (
+	project_id INT NOT NULL,
+	project_name VARCHAR(128) NOT NULL,
+	flow_name VARCHAR(128) NOT NULL,
+	status VARCHAR(16),
+	first_sched_time BIGINT,
+	timezone VARCHAR(64),
+	period VARCHAR(16),
+	last_modify_time BIGINT,
+	next_exec_time BIGINT,
+	submit_time BIGINT,
+	submit_user VARCHAR(128),
+	enc_type TINYINT,
+	schedule_options LONGBLOB,
+	primary key(project_id, flow_name)
+);
diff --git a/src/java/azkaban/utils/db/H2TableSetup.java b/src/java/azkaban/utils/db/H2TableSetup.java
new file mode 100644
index 0000000..5c576f2
--- /dev/null
+++ b/src/java/azkaban/utils/db/H2TableSetup.java
@@ -0,0 +1,178 @@
+package azkaban.utils.db;
+
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+public class H2TableSetup {
+	private static final Logger logger = Logger.getLogger(H2TableSetup.class);
+	
+	private DataSource dataSource;
+	private Map<String, TableData> tables = new HashMap<String,TableData>();
+	
+	public H2TableSetup(DataSource source) throws SQLException {
+		this.dataSource = source;
+		setupTables();
+	}
+	
+	public void setupTables() throws SQLException {
+		Connection conn = dataSource.getConnection();
+		ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), null, null, new String[]{"TABLE"});
+		
+		while(rs.next()) {
+			TableData data = new TableData(rs.getString("TABLE_NAME").toLowerCase());
+			data.setSchema(rs.getString("TABLE_SCHEMA"));
+			tables.put(data.getName(), data);
+			System.out.println(data.toString());
+		}
+		
+		conn.close();
+	}
+	
+	public void createExecutionTables() throws SQLException, IOException {
+		Connection conn = null;
+		try {
+			conn = dataSource.getConnection();
+
+			if (!tables.containsKey("execution_flows")) {
+				logger.info("Creating execution_flows table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/execution_flows.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("execution_jobs")) {
+				logger.info("Creating execution_jobs table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/execution_jobs.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("execution_logs")) {
+				logger.info("Creating execution_logs table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/execution_logs.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("active_executing_flows")) {
+				logger.info("Creating active_executing_flows table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/active_executing_flows.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+		}
+		finally {
+			conn.close();
+		}
+	}
+	
+	public void createOtherTables() throws SQLException, IOException {
+		Connection conn = null;
+		try {
+			conn = dataSource.getConnection();
+
+			if (!tables.containsKey("active_sla")) {
+				logger.info("Creating active_sla table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/active_sla.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("schedules")) {
+				logger.info("Creating schedules table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/schedules.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+		}
+		finally {
+			conn.close();
+		}
+	}
+	
+	public void createProjectTables() throws SQLException, IOException {
+		Connection conn = null;
+		try {
+			conn = dataSource.getConnection();
+
+			if (!tables.containsKey("projects")) {
+				logger.info("Creating projects table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/projects.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("project_versions")) {
+				logger.info("Creating project_versions table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_versions.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("project_events")) {
+				logger.info("Creating project_events table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_events.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("project_properties")) {
+				logger.info("Creating project_properties table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_properties.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("project_files")) {
+				logger.info("Creating project_files table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_files.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("project_permissions")) {
+				logger.info("Creating project_permissions table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_permissions.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+			if (!tables.containsKey("project_flows")) {
+				logger.info("Creating project_flows table.");
+				URL url = this.getClass().getClassLoader().getResource("azkaban/utils/db/h2/project_flows.sql");
+				String query = IOUtils.toString(url.openStream());
+				QueryRunner runner = new QueryRunner();
+				runner.update(conn, query);
+				conn.commit();
+			}
+		}
+		finally {
+			conn.close();
+		}
+	}
+}
diff --git a/src/java/azkaban/utils/db/TableData.java b/src/java/azkaban/utils/db/TableData.java
new file mode 100644
index 0000000..c957744
--- /dev/null
+++ b/src/java/azkaban/utils/db/TableData.java
@@ -0,0 +1,26 @@
+package azkaban.utils.db;
+
+public class TableData {
+	private String name;
+	private String schema;
+	
+	public TableData(String name) {
+		this.name = name;
+	}
+
+	public String getName() {
+		return name;
+	}
+	
+	public void setSchema(String schema) {
+		this.schema = schema;
+	}
+	
+	public String getSchema() {
+		return this.schema;
+	}
+
+	public String toString() {
+		return name + ": " + schema;
+	}
+}
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 7934a19..6f1c0c5 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -197,13 +197,14 @@ public class FileIOUtils {
 		
 		return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
 	}
-	
+
 	public static JobMetaData readUtf8MetaDataFile(File file, int fileOffset, int length) throws IOException {
 		byte[] buffer = new byte[length];
 		FileInputStream fileStream = new FileInputStream(file);
 		
 		long skipped = fileStream.skip(fileOffset);
 		if (skipped < fileOffset) {
+			fileStream.close();
 			return new JobMetaData(fileOffset, 0, "");
 		}
 		
diff --git a/src/java/azkaban/webapp/AzkabanSingleServer.java b/src/java/azkaban/webapp/AzkabanSingleServer.java
new file mode 100644
index 0000000..45bef7b
--- /dev/null
+++ b/src/java/azkaban/webapp/AzkabanSingleServer.java
@@ -0,0 +1,18 @@
+package azkaban.webapp;
+
+
+import org.apache.log4j.Logger;
+
+import azkaban.execapp.AzkabanExecutorServer;
+
+public class AzkabanSingleServer {
+	private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
+	public static void main(String[] args) throws Exception {
+		logger.info("Starting Azkaban Server");
+		
+		AzkabanWebServer.main(args);
+		logger.info("Azkaban Web Server started...");
+		AzkabanExecutorServer.main(args);
+		logger.info("Azkaban Exec Server started...");
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index e3fe50f..c8f6244 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -43,6 +43,7 @@ import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
 import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
 import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
 import org.mortbay.jetty.security.SslSocketConnector;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.DefaultServlet;
@@ -66,6 +67,7 @@ import azkaban.sla.SLAManager;
 import azkaban.sla.SLAManagerException;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
+import azkaban.utils.db.AbstractJdbcLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -120,7 +122,7 @@ public class AzkabanWebServer implements AzkabanServer {
 	private static AzkabanWebServer app;
 
 	private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
-	// private static final int DEFAULT_PORT_NUMBER = 8081;
+	private static final int DEFAULT_PORT_NUMBER = 8081;
 	private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
 	private static final int DEFAULT_THREAD_NUMBER = 20;
 	private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
@@ -202,7 +204,6 @@ public class AzkabanWebServer implements AzkabanServer {
 				logger.error("Could not instantiate UserManager "+ userManagerClass.getName());
 				throw new RuntimeException(e);
 			}
-
 		} 
 		else {
 			manager = new XmlUserManager(props);
@@ -210,7 +211,7 @@ public class AzkabanWebServer implements AzkabanServer {
 
 		return manager;
 	}
-
+	
 	private ProjectManager loadProjectManager(Props props) {
 		logger.info("Loading JDBC for project management");
 
@@ -396,25 +397,36 @@ public class AzkabanWebServer implements AzkabanServer {
 			return;
 		}
 
-		// int portNumber =
-		// azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
-		int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",
-				DEFAULT_SSL_PORT_NUMBER);
-		int maxThreads = azkabanSettings.getInt("jetty.maxThreads",
-				DEFAULT_THREAD_NUMBER);
-
-		logger.info("Setting up Jetty Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
-		final Server server = new Server();
-		SslSocketConnector secureConnector = new SslSocketConnector();
-		secureConnector.setPort(sslPortNumber);
-		secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
-		secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
-		secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
-		secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
-		secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
-		secureConnector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+		AbstractJdbcLoader.setupTables(azkabanSettings);
 		
-		server.addConnector(secureConnector);
+		int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
+		int port;
+		boolean usingSSL = false;
+		final Server server = new Server();
+		if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
+			int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port", DEFAULT_SSL_PORT_NUMBER);
+			port = sslPortNumber;
+			usingSSL = true;
+			logger.info("Setting up Jetty Https Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
+			
+			SslSocketConnector secureConnector = new SslSocketConnector();
+			secureConnector.setPort(sslPortNumber);
+			secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
+			secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
+			secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
+			secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
+			secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
+			secureConnector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+			
+			server.addConnector(secureConnector);
+		}
+		else {
+			port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
+			SocketConnector connector = new SocketConnector();
+			connector.setPort(port);
+			connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
+			server.addConnector(connector);
+		}
 		app = new AzkabanWebServer(server, azkabanSettings);
 		
 		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
@@ -445,8 +457,6 @@ public class AzkabanWebServer implements AzkabanServer {
 		String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
 		app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine()));
 
-		//root.addServlet(new ServletHolder(new HdfsBrowserServlet()), "/hdfs/*");
-		
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
 		try {
 			server.start();
@@ -471,7 +481,7 @@ public class AzkabanWebServer implements AzkabanServer {
 				logger.info("kk thx bye.");
 			}
 		});
-		logger.info("Server running on port " + sslPortNumber + ".");
+		logger.info("Server running on " + (usingSSL ? "ssl" : "") + " port " + port + ".");
 	}
 
 	private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
diff --git a/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java b/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java
index 744b4f7..c087f86 100644
--- a/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java
+++ b/src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java
@@ -39,7 +39,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 import azkaban.webapp.AzkabanAdminServer;
diff --git a/src/sql/create_settings.sql b/src/sql/create_settings.sql
new file mode 100644
index 0000000..b8a73a7
--- /dev/null
+++ b/src/sql/create_settings.sql
@@ -0,0 +1,8 @@
+CREATE TABLE properties (
+	name VARCHAR(64) NOT NULL,
+	modified_time BIGINT NOT NULL,
+	property BLOB,
+	PRIMARY KEY (name)
+) ENGINE=InnoDB;
+
+INSERT INTO properties (name, modified_time, property) VALUES ( UNIX_TIMESTAMP(), "2.1")
\ No newline at end of file
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index f1717b2..7ead267 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -22,7 +22,7 @@ import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
 import azkaban.sla.SlaOptions;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
 import azkaban.utils.Props;
 
 public class JdbcScheduleLoaderTest {
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 903491a..3603608 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -31,7 +31,7 @@ import azkaban.executor.JdbcExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
diff --git a/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java b/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
index acb21a3..008a812 100644
--- a/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
+++ b/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
@@ -22,7 +22,6 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.JdbcProjectLoader;
-import azkaban.project.JdbcProjectLoader.EncodingType;
 import azkaban.project.Project;
 import azkaban.project.ProjectFileHandler;
 import azkaban.project.ProjectLoader;
@@ -31,7 +30,7 @@ import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.ProjectManagerException;
 import azkaban.user.Permission;
 import azkaban.user.User;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -307,7 +306,7 @@ public class JdbcProjectLoaderTest {
 	@Test
 	public void testFlowUpload() throws ProjectManagerException {
 		ProjectLoader loader = createLoader();
-		((JdbcProjectLoader)loader).setDefaultEncodingType(EncodingType.GZIP);
+		((JdbcProjectLoader)loader).setDefaultEncodingType(JdbcProjectLoader.EncodingType.GZIP);
 		String projectName = "mytestFlowUpload1";
 		String projectDescription = "This is my new project";
 		User user = new User("testUser");
@@ -340,7 +339,7 @@ public class JdbcProjectLoaderTest {
 	@Test
 	public void testFlowUploadPlain() throws ProjectManagerException {
 		ProjectLoader loader = createLoader();
-		((JdbcProjectLoader)loader).setDefaultEncodingType(EncodingType.PLAIN);
+		((JdbcProjectLoader)loader).setDefaultEncodingType(JdbcProjectLoader.EncodingType.PLAIN);
 		String projectName = "mytestFlowUpload2";
 		String projectDescription = "This is my new project";
 		User user = new User("testUser");
@@ -376,7 +375,7 @@ public class JdbcProjectLoaderTest {
 	@Test
 	public void testProjectProperties() throws ProjectManagerException {
 		ProjectLoader loader = createLoader();
-		((JdbcProjectLoader)loader).setDefaultEncodingType(EncodingType.PLAIN);
+		((JdbcProjectLoader)loader).setDefaultEncodingType(JdbcProjectLoader.EncodingType.PLAIN);
 		String projectName = "testProjectProperties";
 		String projectDescription = "This is my new project";
 		User user = new User("testUser");
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
index 02b7855..23473f7 100644
--- a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -22,7 +22,7 @@ import azkaban.sla.SLA;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLALoader;
-import azkaban.utils.DataSourceUtils;
+import azkaban.utils.db.DataSourceUtils;
 import azkaban.utils.Props;
 
 
diff --git a/unit/java/azkaban/test/utils/db/H2TableSetupTest.java b/unit/java/azkaban/test/utils/db/H2TableSetupTest.java
new file mode 100644
index 0000000..f914fd1
--- /dev/null
+++ b/unit/java/azkaban/test/utils/db/H2TableSetupTest.java
@@ -0,0 +1,41 @@
+package azkaban.test.utils.db;
+
+import java.io.File;
+import java.io.IOException;
+import javax.sql.DataSource;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.utils.db.DataSourceUtils;
+import azkaban.utils.db.H2TableSetup;
+
+public class H2TableSetupTest {
+	private static DataSource datasource;
+	
+	@BeforeClass
+	public static void setupDB() throws IOException {
+		File dbDir = new File("h2dbtest");
+		if (dbDir.exists()) {
+			FileUtils.deleteDirectory(dbDir);
+		}
+		
+		dbDir.mkdir();
+		datasource = DataSourceUtils.getH2DataSource("h2dbtest/h2db");
+	}
+	
+	@AfterClass
+	public static void teardownDB() {
+	}
+	
+	@Test
+	public void queryTables() throws Exception {
+		H2TableSetup setup = new H2TableSetup(datasource);
+		setup.createProjectTables();
+		setup.createExecutionTables();
+		setup.createOtherTables();
+		setup.setupTables();
+	}
+}
diff --git a/unit/project/testfailure/failflow.job b/unit/project/testfailure/failflow.job
new file mode 100644
index 0000000..a65b7c4
--- /dev/null
+++ b/unit/project/testfailure/failflow.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+fail=false
+dependencies=myjob2-fail30,myjob2
diff --git a/unit/project/testfailure/myjob1.job b/unit/project/testfailure/myjob1.job
new file mode 100644
index 0000000..660e2a9
--- /dev/null
+++ b/unit/project/testfailure/myjob1.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=10
+fail=false
diff --git a/unit/project/testfailure/myjob2.job b/unit/project/testfailure/myjob2.job
new file mode 100644
index 0000000..f5a05ed
--- /dev/null
+++ b/unit/project/testfailure/myjob2.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=30
+fail=false
diff --git a/unit/project/testfailure/myjob2-fail30.job b/unit/project/testfailure/myjob2-fail30.job
new file mode 100644
index 0000000..03540bb
--- /dev/null
+++ b/unit/project/testfailure/myjob2-fail30.job
@@ -0,0 +1,6 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=10
+fail=true
+passRetry=2
+dependencies=myjob3
diff --git a/unit/project/testfailure/myjob3.job b/unit/project/testfailure/myjob3.job
new file mode 100644
index 0000000..a1d7ded
--- /dev/null
+++ b/unit/project/testfailure/myjob3.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+dependencies=myjob1
+fail=false
diff --git a/unit/project/testfailure/test.jar b/unit/project/testfailure/test.jar
new file mode 100644
index 0000000..4f1955a
Binary files /dev/null and b/unit/project/testfailure/test.jar differ
diff --git a/unit/project/testfailure/testfailure.zip b/unit/project/testfailure/testfailure.zip
new file mode 100644
index 0000000..dc0b640
Binary files /dev/null and b/unit/project/testfailure/testfailure.zip differ