azkaban-memoizeit

Changes

bin/azkaban-executor-shutdown.sh 10(+0 -10)

build.properties 2(+1 -1)

build.xml 120(+109 -11)

src/java/azkaban/utils/db/DataSourceUtils.java 56(+0 -56)

src/java/azkaban/utils/db/H2TableSetup.java 178(+0 -178)

src/java/azkaban/utils/db/TableData.java 26(+0 -26)

src/java/azkaban/webapp/AzkabanAdminServer.java 252(+0 -252)

src/java/azkaban/webapp/servlet/admin/InitialSetupServlet.java 278(+0 -278)

src/java/azkaban/webapp/servlet/admin/velocity/setup-db.vm 89(+0 -89)

src/java/azkaban/webapp/servlet/admin/velocity/setup-initial.vm 57(+0 -57)

src/java/azkaban/webapp/servlet/admin/velocity/setup-usermanager.vm 70(+0 -70)

src/java/azkaban/webapp/servlet/admin/velocity/title.vm 20(+0 -20)

src/sql/create_active_flow.sql 7(+0 -7)

src/sql/create_all.sql 170(+0 -170)

src/sql/create_project_files.sql 9(+0 -9)

src/sql/create_project_flow_table.sql 11(+0 -11)

src/sql/create_project_permissions.sql 9(+0 -9)

src/sql/create_project_properties.sql 10(+0 -10)

src/sql/create_project_table.sql 14(+0 -14)

src/sql/create_project_version_table.sql 12(+0 -12)

src/sql/create_schedule_table.sql 17(+0 -17)

src/sql/create_settings.sql 8(+0 -8)

src/sql/create_sla_table.sql 9(+0 -9)

src/sql/update_2.0_to_2.1.sql 33(+0 -33)

unit/java/azkaban/test/utils/db/H2TableSetupTest.java 41(+0 -41)

Details

build.properties 2(+1 -1)

diff --git a/build.properties b/build.properties
index a72a058..aa6322d 100644
--- a/build.properties
+++ b/build.properties
@@ -1,3 +1,3 @@
 name=azkaban
-version=2.1
+version=2.2
 spec.file=azkaban.spec

build.xml 120(+109 -11)

diff --git a/build.xml b/build.xml
index 244b7e4..86db40b 100644
--- a/build.xml
+++ b/build.xml
@@ -8,9 +8,14 @@
 	<property name="dist.packages.dir" value="${basedir}/dist/packages" />
 	<property name="dist.web.package.dir" value="${dist.packages.dir}/azkaban-web-server" />
 	<property name="dist.exec.package.dir" value="${dist.packages.dir}/azkaban-exec-server" />
+	<property name="dist.solo.package.dir" value="${dist.packages.dir}/azkaban-solo-server" />
 	<property name="dist.sql.package.dir" value="${dist.packages.dir}/sql" />	
 
 	<property name="conf.dir" value="${basedir}/conf" />
+	<property name="web.package.dir" value="${basedir}/src/package/webserver" />
+	<property name="exec.package.dir" value="${basedir}/src/package/execserver" />
+	<property name="solo.package.dir" value="${basedir}/src/package/soloserver" />
+	
 	<property name="lib.dir" value="${basedir}/lib" />
 	<property name="bin.dir" value="${basedir}/bin" />
 	<property name="java.src.dir" value="${basedir}/src/java" />
@@ -67,24 +72,60 @@
 		</jar>
 	</target>
 	
+	<target name="create-update-script" description="Prepare the creation of the Azkaban Scripts">
+		<!-- Generic update table scripts -->
+		<concat destfile="${dist.sql.package.dir}/update-all-sql-${updateVersion}.sql" fixlastline="yes">
+			<fileset dir="${sql.src.dir}" >
+				<include name="update.*.${updateVersion}.sql"/>
+			</fileset>	
+		</concat>
+	</target>
+	
+	<target name="create-update-script-2.1" description="Prepare the creation of the Azkaban Scripts">
+		<!-- 2.1 added the active_sla table -->
+		<concat destfile="${dist.sql.package.dir}/update-all-sql-2.1.sql" fixlastline="yes">
+			<fileset dir="${sql.src.dir}" >
+				<include name="create.active_sla.sql"/>
+				<include name="update.*.2.1.sql"/>
+			</fileset>	
+		</concat>
+	</target>
+	
+	<target name="create-update-script-2.2" description="Prepare the creation of the Azkaban Scripts">
+		<!-- 2.2 added the properties table -->
+		<concat destfile="${dist.sql.package.dir}/update-all-sql-2.2.sql" fixlastline="yes">
+			<fileset dir="${sql.src.dir}" >
+				<include name="create.properties.sql"/>
+				<include name="update.*.2.2.sql"/>
+			</fileset>	
+		</concat>
+	</target>
+	
 	<target name="package-sql-scripts" description="Creates a package of sql">
 		<delete dir="${dist.sql.package.dir}" />
 		<mkdir dir="${dist.sql.package.dir}" />
 
 		<concat destfile="${dist.sql.package.dir}/create-all-sql-${version}.sql" fixlastline="yes">
 			<fileset dir="${sql.src.dir}" >
-				<exclude name="update*.sql"/>
+				<exclude name="update.*.sql"/>
+				<exclude name="database.properties"/>
 			</fileset>	
 		</concat>
 		
+		<!-- Collect various update scripts. -->
+		<!-- Not sure how to do this better yet. -->
+		<antcall target="create-update-script-2.1"></antcall>
+		<antcall target="create-update-script-2.2"></antcall>
+		<!-- End script collection-->
+		
 		<copy todir="${dist.sql.package.dir}" >
 			<fileset dir="${sql.src.dir}" />
 		</copy>
+		<echo file="${dist.sql.package.dir}/database.properties" append="true">version=${version}</echo>
 
-        <tar destfile="${dist.sql.package.dir}/${name}-sql-script-${version}.tar.gz" compression="gzip" longfile="gnu">
-        	<tarfileset dir="${dist.sql.package.dir}" prefix="azkaban-${version}" filemode="755" />
+		<tar destfile="${dist.sql.package.dir}/${name}-sql-script-${version}.tar.gz" compression="gzip" longfile="gnu">
+			<tarfileset dir="${dist.sql.package.dir}" prefix="azkaban-${version}" filemode="755" />
 		</tar>
-
 	</target>
 
 	<target name="package-web-server" depends="jars" description="Creates a package for the webserver">
@@ -107,7 +148,7 @@
 		
 		<!-- Copy bin files for web server only-->
 		<copy todir="${dist.web.package.dir}/bin" >
-			<fileset dir="${bin.dir}" includes="**/azkaban-web*"/>
+			<fileset dir="${web.package.dir}/bin"/>
 		</copy>
 		
 		<!-- Copy web files -->
@@ -117,12 +158,16 @@
 		
 		<!-- Copy conf create table scripts -->
 		<copy todir="${dist.web.package.dir}/conf" >
-			<fileset dir="${conf.dir}" />
+			<fileset dir="${web.package.dir}/conf" />
 		</copy>
 		
 		<!-- Tarball it -->
 		<tar destfile="${dist.web.package.dir}/${name}-web-server-${version}.tar.gz" compression="gzip" longfile="gnu">
-			<tarfileset dir="${dist.web.package.dir}" prefix="azkaban-${version}" filemode="755" />
+			<tarfileset dir="${dist.web.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+			
+			<tarfileset dir="${dist.web.package.dir}" prefix="azkaban-${version}" includes="**">
+				<exclude name="bin/*"/>
+			</tarfileset>
 		</tar>
 	</target>
 	
@@ -145,20 +190,73 @@
 		
 		<!-- Copy bin files for exec server only-->
 		<copy todir="${dist.exec.package.dir}/bin" >
-			<fileset dir="${bin.dir}" includes="**/azkaban-executor*"/>
+			<fileset dir="${exec.package.dir}/bin"/>
 		</copy>
 
 		<!-- Copy conf files -->
 		<copy todir="${dist.exec.package.dir}/conf" >
-			<fileset dir="${conf.dir}" />
+			<fileset dir="${exec.package.dir}/conf" />
 		</copy>
 		
 		<!-- Tarball it -->
 		<tar destfile="${dist.exec.package.dir}/${name}-executor-server-${version}.tar.gz" compression="gzip" longfile="gnu">
-			<tarfileset dir="${dist.exec.package.dir}" prefix="azkaban-${version}" filemode="755" />
+			<tarfileset dir="${dist.exec.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+			
+			<tarfileset dir="${dist.exec.package.dir}" prefix="azkaban-${version}" includes="**">
+				<exclude name="bin/*"/>
+			</tarfileset>
+		</tar>
+	</target>
+	
+	<target name="package-solo-server" depends="jars" description="Creates a package for the solo server">
+		<delete dir="${dist.solo.package.dir}" />
+		<mkdir dir="${dist.solo.package.dir}" />
+		<mkdir dir="${dist.solo.package.dir}/conf" />
+		<mkdir dir="${dist.solo.package.dir}/bin" />
+		<mkdir dir="${dist.solo.package.dir}/lib" />
+		<mkdir dir="${dist.solo.package.dir}/plugins" />
+		<mkdir dir="${dist.solo.package.dir}/extlib" />
+		<mkdir dir="${dist.solo.package.dir}/sql" />
+				
+		<!-- Copy Azkaban jars and libs-->
+		<copy file="${azkaban.jar}" todir="${dist.solo.package.dir}/lib" />
+		<copy todir="${dist.solo.package.dir}/lib" >
+			<fileset dir="${lib.dir}" >
+				<exclude name="hadoop-core*.jar"/>
+			</fileset>
+		</copy>
+		
+		<!-- Copy bin files for exec server only-->
+		<copy todir="${dist.solo.package.dir}/bin" >
+			<fileset dir="${solo.package.dir}/bin"/>
+		</copy>
+
+		<!-- Copy conf files -->
+		<copy todir="${dist.solo.package.dir}/conf" >
+			<fileset dir="${solo.package.dir}/conf" />
+		</copy>
+		
+		<!-- Copy web files -->
+		<copy todir="${dist.solo.package.dir}/web" >
+			<fileset dir="${web.src.dir}" />
+		</copy>
+		
+		<!-- Copy sql files -->
+		<copy todir="${dist.solo.package.dir}/sql" >
+			<fileset dir="${sql.src.dir}" />
+		</copy>
+		<echo file="${dist.solo.package.dir}/sql/database.properties" append="true">version=${version}</echo>
+			
+		<!-- Tarball it -->
+		<tar destfile="${dist.solo.package.dir}/${name}-solo-server-${version}.tar.gz" compression="gzip" longfile="gnu">
+			<tarfileset dir="${dist.solo.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+			
+			<tarfileset dir="${dist.solo.package.dir}" prefix="azkaban-${version}" includes="**">
+				<exclude name="bin/*"/>
+			</tarfileset>
 		</tar>
 	</target>
 	
-	<target name="package-all" depends="package-exec-server, package-web-server, package-sql-scripts" description="Create all packages">
+	<target name="package-all" depends="package-exec-server, package-web-server, package-solo-server, package-sql-scripts" description="Create all packages">
 	</target>
 </project>
diff --git a/src/java/azkaban/database/AzkabanDatabaseSetup.java b/src/java/azkaban/database/AzkabanDatabaseSetup.java
new file mode 100644
index 0000000..c42ad33
--- /dev/null
+++ b/src/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -0,0 +1,367 @@
+package azkaban.database;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import azkaban.database.DataSourceUtils.PropertyType;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.Props;
+
+public class AzkabanDatabaseSetup {
+	private static final Logger logger = Logger.getLogger(AzkabanDatabaseSetup.class);
+	public static final String DATABASE_CHECK_VERSION = "database.check.version";
+	public static final String DATABASE_AUTO_UPDATE_TABLES = "database.auto.update.tables";
+	public static final String DATABASE_SQL_SCRIPT_DIR = "database.sql.scripts.dir";
+	
+	private static final String DEFAULT_SCRIPT_PATH = "sql";
+	private static final String CREATE_SCRIPT_PREFIX = "create.";
+	private static final String UPDATE_SCRIPT_PREFIX = "update.";
+	private static final String SQL_SCRIPT_SUFFIX = ".sql";
+	
+	private static String FETCH_PROPERTY_BY_TYPE = "SELECT name, value FROM properties WHERE type=?";
+	private static final String INSERT_DB_PROPERTY = "INSERT INTO properties (name, type, value, modified_time) values (?,?,?,?)";
+	private static final String UPDATE_DB_PROPERTY = "UPDATE properties SET value=?,modified_time=? WHERE name=? AND type=?";
+	
+	private AzkabanDataSource dataSource;
+	private Map<String, String> tables;
+	private Map<String, String> installedVersions;
+	private Set<String> missingTables;
+	private Map<String, List<String>> upgradeList;
+	private Props dbProps;
+	private String version;
+	private boolean needsUpdating;
+	
+	private String scriptPath = null;
+	
+	public AzkabanDatabaseSetup(Props props) {
+		this(DataSourceUtils.getDataSource(props));
+		this.scriptPath = props.getString(DATABASE_SQL_SCRIPT_DIR, DEFAULT_SCRIPT_PATH);
+	}
+	
+	public AzkabanDatabaseSetup(AzkabanDataSource ds) {
+		this.dataSource = ds;
+		if (scriptPath == null) {
+			scriptPath = DEFAULT_SCRIPT_PATH;
+		}
+	}
+	
+	public void loadTableInfo() throws IOException, SQLException {
+		tables = new HashMap<String, String>();
+		installedVersions = new HashMap<String, String>();
+		missingTables = new HashSet<String>();
+		upgradeList = new HashMap<String, List<String>>();
+		
+		dbProps = loadDBProps();
+		version = dbProps.getString("version");
+		
+		loadInstalledTables();
+		loadTableVersion();
+		findMissingTables();
+		findOutOfDateTables();
+
+		needsUpdating = !upgradeList.isEmpty() || !missingTables.isEmpty();
+	}
+
+	public boolean needsUpdating() {
+		if (version == null) {
+			throw new RuntimeException("Uninitialized. Call loadTableInfo first.");
+		}
+		
+		return needsUpdating;
+	}
+	
+	public void printUpgradePlan() {
+		if (!tables.isEmpty()) {
+			logger.info("The following are installed tables");
+			for (Map.Entry<String, String> installedTable: tables.entrySet()) {
+				logger.info(" " + installedTable.getKey() + " version:" + installedTable.getValue());
+			}
+		}
+		else {
+			logger.info("No installed tables found.");
+		}
+		
+		if (!missingTables.isEmpty()) {
+			logger.info("The following are missing tables that need to be installed");
+			for (String table: missingTables) {
+				logger.info(" " + table);
+			}
+		}
+		else {
+			logger.info("There are no missing tables.");
+		}
+		
+		if (!upgradeList.isEmpty()) {
+			logger.info("The following tables need to be updated.");
+			for (Map.Entry<String, List<String>> upgradeTable: upgradeList.entrySet()) {
+				String tableInfo = " " +  upgradeTable.getKey() + " versions:";
+				for (String upVersion: upgradeTable.getValue()) {
+					tableInfo += upVersion + ",";
+				}
+				
+				logger.info(tableInfo);
+			}
+		}
+		else {
+			logger.info("No tables need to be updated.");
+		}
+	}
+	
+	public void updateDatabase(boolean createTable, boolean updateTable) throws SQLException, IOException {
+		// We call this because it has an unitialize check.
+		if (!needsUpdating()) {
+			logger.info("Nothing to be done.");
+			return;
+		}
+		
+		if (createTable && !missingTables.isEmpty()) {
+			createNewTables();
+		}
+		if (updateTable && !upgradeList.isEmpty()) {
+			updateTables();
+		}
+	}
+	
+	private Props loadDBProps() throws IOException {
+		File dbPropsFile = new File(this.scriptPath, "database.properties");
+		
+		if (!dbPropsFile.exists()) {
+			throw new IOException("Cannot find 'database.properties' file in " + dbPropsFile.getPath());
+		}
+		
+		Props props = new Props(null, dbPropsFile);
+		return props;
+	}
+	
+	private void loadTableVersion() throws SQLException {
+		logger.info("Searching for table versions in the properties table");
+		if (tables.containsKey("properties")) {
+			// Load version from settings
+			QueryRunner runner = new QueryRunner(dataSource);
+			Map<String,String> map = runner.query(FETCH_PROPERTY_BY_TYPE, new PropertiesHandler(), PropertyType.DB.getNumVal());
+			for (String key: map.keySet()) {
+				String value = map.get(key);
+				if (key.endsWith(".version")) {
+					String tableName = key.substring(0, key.length() - ".version".length());
+					installedVersions.put(tableName, value);
+					if (tables.containsKey(tableName)) {
+						tables.put(tableName, value);
+					}
+				}
+			}
+		}
+		else {
+			logger.info("Properties table doesn't exist.");
+		}
+	}
+	
+	private void loadInstalledTables() throws SQLException {
+		logger.info("Searching for installed tables");
+		Connection conn = null;
+		try {
+			conn = dataSource.getConnection();
+			ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), null, null, new String[]{"TABLE"});
+	
+			while(rs.next()) {
+				tables.put(rs.getString("TABLE_NAME").toLowerCase(), "2.1");
+			}
+		}
+		finally {
+			DbUtils.commitAndCloseQuietly(conn);
+		}
+	}
+	
+	private void findMissingTables() {
+		File directory = new File(scriptPath);
+		File[] createScripts = directory.listFiles(new FileIOUtils.PrefixSuffixFileFilter(CREATE_SCRIPT_PREFIX, SQL_SCRIPT_SUFFIX));
+		
+		for (File script: createScripts) {
+			String name = script.getName();
+			String[] nameSplit = name.split("\\.");
+			String tableName = nameSplit[1];
+			
+			if (!tables.containsKey(tableName)) {
+				missingTables.add(tableName);
+			}
+		}
+	}
+	
+	private void findOutOfDateTables() {
+		for (String key : tables.keySet()) {
+			String version = tables.get(key);
+			
+			List<String> upgradeVersions = findOutOfDateTable(key, version);
+			if (upgradeVersions != null && !upgradeVersions.isEmpty()) {
+				upgradeList.put(key, upgradeVersions);
+			}
+		}
+	}
+	
+	private List<String> findOutOfDateTable(String table, String version) {
+		File directory = new File(scriptPath);
+		ArrayList<String> versions = new ArrayList<String>();
+		
+		File[] createScripts = directory.listFiles(new FileIOUtils.PrefixSuffixFileFilter(UPDATE_SCRIPT_PREFIX + table, SQL_SCRIPT_SUFFIX));
+		if (createScripts.length == 0) {
+			return null;
+		}
+		
+		String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + version;
+		for (File file: createScripts) {
+			String fileName = file.getName();
+			if (fileName.compareTo(updateFileNameVersion) > 0) {
+				String[] split = fileName.split("\\.");
+				String versionNum = "";
+				
+				for (int i = 2; i < split.length - 1; ++i) {
+					try {
+						Integer.parseInt(split[i]);
+						versionNum += split[i] + ".";
+					}
+					catch (NumberFormatException e) {
+						break;
+					}
+				}
+				if (versionNum.endsWith(".")) {
+					versionNum = versionNum.substring(0, versionNum.length() - 1);
+					
+					if (versionNum.compareTo(version) > 0) {
+						versions.add(versionNum);
+					}
+				}
+			}
+		}
+		
+		Collections.sort(versions);
+		return versions;
+	}
+	
+	private void createNewTables() throws SQLException, IOException {
+		Connection conn = dataSource.getConnection();
+		conn.setAutoCommit(false);
+		try {
+			// Make sure that properties table is created first.
+			if (missingTables.contains("properties")) {
+				runTableScripts(conn, "properties", version, dataSource.getDBType(), false);
+			}
+			for (String table: missingTables) {
+				if (!table.equals("properties")) {
+					runTableScripts(conn, table, version, dataSource.getDBType(), false);
+				}
+			}
+		}
+		finally {
+			conn.close();
+		}
+	}
+	
+	private void updateTables() throws SQLException, IOException {
+		Connection conn = dataSource.getConnection();
+		conn.setAutoCommit(false);
+		try {
+			// Make sure that properties table is created first.
+			if (upgradeList.containsKey("properties")) {
+				for (String version: upgradeList.get("properties")) {
+					runTableScripts(conn, "properties", version, dataSource.getDBType(), true);
+				}
+			}
+			for (String table: upgradeList.keySet()) {
+				if (!table.equals("properties")) {
+					for (String version: upgradeList.get(table)) {
+						runTableScripts(conn, table, version, dataSource.getDBType(), true);
+					}
+				}
+			}
+		}
+		finally {
+			conn.close();
+		}
+	}
+	
+	private void runTableScripts(Connection conn, String table, String version, String dbType, boolean update) throws IOException, SQLException {
+		String scriptName = "";
+		if (update) {
+			scriptName = "update." + table + "." + version;
+			logger.info("Update table " + table + " to version " + version);
+		}
+		else {
+			scriptName = "create." + table;
+			logger.info("Creating new table " + table + " version " + version);
+		}
+		
+		String dbSpecificScript = scriptName + "." + dbType + ".sql";
+		
+		File script = new File(scriptPath, dbSpecificScript);
+		if (!script.exists()) {
+			String dbScript = scriptName + ".sql"; 
+			script = new File(scriptPath, dbScript);
+			
+			if (!script.exists()) {
+				throw new IOException("Creation files do not exist for table " + table);
+			}
+		}
+
+		BufferedInputStream buff = null;
+		try {
+			buff = new BufferedInputStream(new FileInputStream(script));
+			String queryStr = IOUtils.toString(buff);
+			
+			String[] splitQuery = queryStr.split(";\\s*\n");
+			
+			QueryRunner runner = new QueryRunner();
+			
+			for (String query: splitQuery) {
+				runner.update(conn, query);
+			}
+			
+			// If it's properties, then we want to commit the table before we update it
+			if (table.equals("properties")) {
+				conn.commit();
+			}
+			
+			String propertyName = table + ".version";
+			if (!installedVersions.containsKey(table)) {
+				runner.update(conn, INSERT_DB_PROPERTY, propertyName, DataSourceUtils.PropertyType.DB.getNumVal(), version, System.currentTimeMillis());
+			}
+			else {
+				runner.update(conn, UPDATE_DB_PROPERTY, version, System.currentTimeMillis(), propertyName, DataSourceUtils.PropertyType.DB.getNumVal());
+			}
+			conn.commit();
+		} 
+		finally {
+			IOUtils.closeQuietly(buff);
+		}
+	}
+	
+	public static class PropertiesHandler implements ResultSetHandler<Map<String, String>> {
+		@Override
+		public Map<String, String> handle(ResultSet rs) throws SQLException {
+			Map<String, String> results = new HashMap<String, String>();
+			while(rs.next()) {
+				String key = rs.getString(1);
+				String value = rs.getString(2);
+				results.put(key, value);
+			}
+			
+			return results;
+		}
+	}
+}
diff --git a/src/java/azkaban/database/AzkabanDatabaseUpdater.java b/src/java/azkaban/database/AzkabanDatabaseUpdater.java
new file mode 100644
index 0000000..1a37170
--- /dev/null
+++ b/src/java/azkaban/database/AzkabanDatabaseUpdater.java
@@ -0,0 +1,82 @@
+package azkaban.database;
+
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+import org.apache.log4j.Logger;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.utils.Props;
+import azkaban.webapp.AzkabanServer;
+
+public class AzkabanDatabaseUpdater {
+	private static final Logger logger = Logger.getLogger(AzkabanDatabaseUpdater.class);
+
+	public static void main(String[] args) throws Exception {
+		OptionParser parser = new OptionParser();
+
+		OptionSpec<String> scriptDirectory = parser
+				.acceptsAll(Arrays.asList("s", "script"), "Directory of update scripts.")
+				.withRequiredArg()
+				.describedAs("script").ofType(String.class);
+		
+		OptionSpec<Void> updateOption = 
+				parser.acceptsAll(Arrays.asList("u", "update"), "Will update if necessary");
+		
+		Props props = AzkabanServer.loadProps(args, parser);
+		
+		if (props == null) {
+			logger.error("Properties not found. Need it to connect to the db.");
+			logger.error("Exiting...");
+			return;
+		}
+		
+		OptionSet options = parser.parse(args);
+		boolean updateDB = false;
+		if (options.has(updateOption)) {
+			updateDB = true;
+		}
+		else {
+			logger.info("Running DatabaseUpdater in test mode");
+		}
+		
+		String scriptDir = "sql";
+		if (options.has(scriptDirectory)) {
+			scriptDir = options.valueOf(scriptDirectory);
+		}
+		
+		runDatabaseUpdater(props, scriptDir, updateDB);
+	}
+	
+	public static void runDatabaseUpdater(Props props, String sqlDir, boolean updateDB) throws IOException, SQLException {
+		logger.info("Use scripting directory " + sqlDir);
+		
+		if (updateDB) {
+			logger.info("Will auto update any changes.");
+		}
+		else {
+			logger.info("Running DatabaseUpdater in test mode. Use -u to update");
+		}
+		
+		AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(props);
+		setup.loadTableInfo();
+		if (!setup.needsUpdating()) {
+			logger.info("Everything looks up to date.");
+			return;
+		}
+
+		logger.info("Need to update the db.");
+		setup.printUpgradePlan();
+		
+		if (updateDB) {
+			logger.info("Updating DB");
+			setup.updateDatabase(true,true);
+		}
+	}
+}
diff --git a/src/java/azkaban/database/AzkabanDataSource.java b/src/java/azkaban/database/AzkabanDataSource.java
new file mode 100644
index 0000000..077e6cd
--- /dev/null
+++ b/src/java/azkaban/database/AzkabanDataSource.java
@@ -0,0 +1,9 @@
+package azkaban.database;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+public abstract class AzkabanDataSource extends BasicDataSource {
+	public abstract boolean allowsOnDuplicateKey();
+	
+	public abstract String getDBType();
+}
\ No newline at end of file
diff --git a/src/java/azkaban/database/DataSourceUtils.java b/src/java/azkaban/database/DataSourceUtils.java
new file mode 100644
index 0000000..9c4b74a
--- /dev/null
+++ b/src/java/azkaban/database/DataSourceUtils.java
@@ -0,0 +1,159 @@
+
+package azkaban.database;
+
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.QueryRunner;
+
+import azkaban.utils.Props;
+
+public class DataSourceUtils {
+	
+	/**
+	 * Property types
+	 */
+	public static enum PropertyType {
+		DB(1);
+
+		private int numVal;
+
+		PropertyType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static PropertyType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return DB;
+			default:
+				return DB;
+			}
+		}
+	}
+	
+	/**
+	 * Create Datasource from parameters in the properties
+	 * 
+	 * @param props
+	 * @return
+	 */
+	public static AzkabanDataSource getDataSource(Props props) {
+		String databaseType = props.getString("database.type");
+		
+		AzkabanDataSource dataSource = null;
+		if (databaseType.equals("mysql")) {
+			int port = props.getInt("mysql.port");
+			String host = props.getString("mysql.host");
+			String database = props.getString("mysql.database");
+			String user = props.getString("mysql.user");
+			String password = props.getString("mysql.password");
+			int numConnections = props.getInt("mysql.numconnections");
+
+			dataSource = getMySQLDataSource(host, port, database, user, password, numConnections);
+		}
+		else if (databaseType.equals("h2")) {
+			String path = props.getString("h2.path");
+			dataSource = getH2DataSource(path);
+		}
+		
+		return dataSource;
+	}
+	
+	/**
+	 * Create a MySQL DataSource
+	 * 
+	 * @param host
+	 * @param port
+	 * @param dbName
+	 * @param user
+	 * @param password
+	 * @param numConnections
+	 * @return
+	 */
+	public static AzkabanDataSource getMySQLDataSource(String host, Integer port, String dbName, String user, String password, Integer numConnections) {
+		return new MySQLBasicDataSource(host, port, dbName, user, password, numConnections);
+	}
+
+	/**
+	 * Create H2 DataSource
+	 * @param file
+	 * @return
+	 */
+	public static AzkabanDataSource getH2DataSource(String file) {
+		return new EmbeddedH2BasicDataSource(file);
+	}
+	
+	/**
+	 * Hidden datasource
+	 */
+	private DataSourceUtils() {
+	}
+	
+	/**
+	 * MySQL data source based on AzkabanDataSource
+	 *
+	 */
+	public static class MySQLBasicDataSource extends AzkabanDataSource {
+		private MySQLBasicDataSource(String host, int port, String dbName, String user, String password, int numConnections) {
+			super();
+			
+			String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
+			setDriverClassName("com.mysql.jdbc.Driver");
+			setUsername(user);
+			setPassword(password);
+			setUrl(url);
+			setMaxActive(numConnections);
+			setValidationQuery("/* ping */ select 1");
+			setTestOnBorrow(true);
+		}
+
+		@Override
+		public boolean allowsOnDuplicateKey() {
+			return true;
+		}
+
+		@Override
+		public String getDBType() {
+			return "mysql";
+		}
+	}
+	
+	/**
+	 * H2 Datasource
+	 *
+	 */
+	public static class EmbeddedH2BasicDataSource extends AzkabanDataSource {
+		private EmbeddedH2BasicDataSource(String filePath) {
+			super();
+			String url = "jdbc:h2:file:" + filePath;
+			setDriverClassName("org.h2.Driver");
+			setUrl(url);
+		}
+
+		@Override
+		public boolean allowsOnDuplicateKey() {
+			return false;
+		}
+		
+		@Override
+		public String getDBType() {
+			return "h2";
+		}
+	}
+	
+	public static void testConnection(DataSource ds) throws SQLException {
+		QueryRunner runner = new QueryRunner(ds);
+		runner.update("SHOW TABLES");
+	}
+	
+	public static void testMySQLConnection(String host, Integer port, String dbName, String user, String password, Integer numConnections) throws SQLException {
+		DataSource ds = new MySQLBasicDataSource(host, port, dbName, user, password, numConnections);
+		testConnection(ds);
+	}
+}
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index d4dffef..fb67970 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -21,7 +21,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -44,12 +43,9 @@ import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanServer;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
 
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-
 public class AzkabanExecutorServer {
 	private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
 	private static final int MAX_FORM_CONTENT_SIZE = 10*1024*1024;
@@ -166,35 +162,8 @@ public class AzkabanExecutorServer {
 	 * @throws IOException
 	 */
 	public static void main(String[] args) throws Exception {
-		OptionParser parser = new OptionParser();
-
-		OptionSpec<String> configDirectory = parser
-				.acceptsAll(Arrays.asList("c", "conf"),
-						"The conf directory for Azkaban.").withRequiredArg()
-				.describedAs("conf").ofType(String.class);
-
 		logger.error("Starting Jetty Azkaban Executor...");
-
-		// Grabbing the azkaban settings from the conf directory.
-		Props azkabanSettings = null;
-		OptionSet options = parser.parse(args);
-		if (options.has(configDirectory)) {
-			String path = options.valueOf(configDirectory);
-			logger.info("Loading azkaban settings file from " + path);
-			File confDir = new File(path);
-			if (!confDir.exists()) {
-				logger.error("Conf directory " + path + " doesn't exist.");
-			}
-			else if (!confDir.isDirectory()) {
-				logger.error("Conf directory " + path + " isn't a directory.");
-			}
-			else {
-				azkabanSettings = loadAzkabanConfigurationFromDirectory(confDir);
-			}
-		} else {
-			logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
-			azkabanSettings = loadConfigurationFromAzkabanHome();
-		}
+		Props azkabanSettings = AzkabanServer.loadProps(args);
 
 		if (azkabanSettings == null) {
 			logger.error("Azkaban Properties not loaded.");
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 883a19f..b449654 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -34,6 +34,7 @@ import azkaban.utils.JSONUtils;
 public class ExecutableFlow {
 	private int executionId = -1;
 	private String flowId;
+	private int scheduleId = -1;
 	private int projectId;
 	private int version;
 
@@ -57,6 +58,7 @@ public class ExecutableFlow {
 	
 	public ExecutableFlow(Flow flow) {
 		this.projectId = flow.getProjectId();
+		this.scheduleId = -1;
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
 		this.setFlow(flow);
@@ -64,6 +66,7 @@ public class ExecutableFlow {
 	
 	public ExecutableFlow(int executionId, Flow flow) {
 		this.projectId = flow.getProjectId();
+		this.scheduleId = -1;
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
 		this.executionId = executionId;
@@ -208,6 +211,14 @@ public class ExecutableFlow {
 		this.projectId = projectId;
 	}
 
+	public int getScheduleId() {
+		return scheduleId;
+	}
+
+	public void setScheduleId(int scheduleId) {
+		this.scheduleId = scheduleId;
+	}
+
 	public String getExecutionPath() {
 		return executionPath;
 	}
@@ -255,6 +266,9 @@ public class ExecutableFlow {
 		flowObj.put("executionPath", executionPath);
 		flowObj.put("flowId", flowId);
 		flowObj.put("projectId", projectId);
+		if(scheduleId >= 0) {
+			flowObj.put("scheduleId", scheduleId);
+		}
 		flowObj.put("submitTime", submitTime);
 		flowObj.put("startTime", startTime);
 		flowObj.put("endTime", endTime);
@@ -370,6 +384,9 @@ public class ExecutableFlow {
 		exFlow.executionPath = (String)flowObj.get("executionPath");
 		exFlow.flowId = (String)flowObj.get("flowId");
 		exFlow.projectId = (Integer)flowObj.get("projectId");
+		if (flowObj.containsKey("scheduleId")) {
+			exFlow.scheduleId = (Integer)flowObj.get("scheduleId");
+		}
 		exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
 		exFlow.startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
 		exFlow.endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index a49b4fb..c6c6f41 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -8,33 +8,19 @@ import javax.mail.MessagingException;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.utils.AbstractMailer;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
-public class ExecutorMailer {
+public class ExecutorMailer extends AbstractMailer {
 	private static Logger logger = Logger.getLogger(ExecutorMailer.class);
 	
 	private boolean testMode = false;
-	private String clientHostname;
-	private String clientPortNumber;
-	
-	private String mailHost;
-	private String mailUser;
-	private String mailPassword;
-	private String mailSender;
-	private String azkabanName;
 	
 	public ExecutorMailer(Props props) {
-		this.azkabanName = props.getString("azkaban.name", "azkaban");
-		this.mailHost = props.getString("mail.host", "localhost");
-		this.mailUser = props.getString("mail.user", "");
-		this.mailPassword = props.getString("mail.password", "");
-		this.mailSender = props.getString("mail.sender", "");
-		
-		this.clientHostname = props.getString("jetty.hostname", "localhost");
-		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-		
+		super(props);
+
 		testMode = props.getBoolean("test.mode", false);
 	}
 	
@@ -44,13 +30,12 @@ public class ExecutorMailer {
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+			EmailMessage message = super.createEmailMessage(
+					"Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(), 
+					"text/html", 
+					emailList);
 			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + getAzkabanName() + "</h2>");
 			
 			if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
 				message.println("This flow is set to cancel all currently running jobs.");
@@ -68,7 +53,7 @@ public class ExecutorMailer {
 			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
 			message.println("</table>");
 			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
 			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
 			
 			message.println("");
@@ -98,20 +83,20 @@ public class ExecutorMailer {
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+			EmailMessage message = super.createEmailMessage(
+					"Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(), 
+					"text/html", 
+					emailList);
 			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName() + "</h2>");
 			message.println("<table>");
 			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
 			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
 			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
 			message.println("</table>");
 			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			
+			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
 			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
 			
 			message.println("");
@@ -127,8 +112,6 @@ public class ExecutorMailer {
 			
 			message.println("</ul>");
 			
-			
-			
 			if (!testMode) {
 				try {
 					message.sendEmail();
@@ -146,20 +129,19 @@ public class ExecutorMailer {
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+			EmailMessage message = super.createEmailMessage(
+					"Flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName(), 
+					"text/html", 
+					emailList);
 			
-			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName() + "</h2>");
 			message.println("<table>");
 			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
 			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
 			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
 			message.println("</table>");
 			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
 			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
 			
 			if (!testMode) {
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index e67a7e2..33a9f76 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
-import azkaban.utils.db.AbstractJdbcLoader;
+import azkaban.database.AbstractJdbcLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.GZIPUtils;
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 0526c4a..1a0b25d 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -66,13 +66,16 @@ public class JobTypeManager
 		loadDefaultTypes();
 		
 		if(jobtypePluginDir != null) {
+			File pluginDir = new File(jobtypePluginDir);
+			if (pluginDir.exists()) {
 			logger.info("job type plugin directory set. Loading extra job types.");
-			try {
-				loadPluginJobTypes();
-			}
-			catch (Exception e) {
-				logger.info("Plugin jobtypes failed to load. " + e.getCause());
-				throw new JobTypeManagerException(e);
+				try {
+					loadPluginJobTypes();
+				}
+				catch (Exception e) {
+					logger.info("Plugin jobtypes failed to load. " + e.getCause());
+					throw new JobTypeManagerException(e);
+				}
 			}
 		}
 		
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 972b531..7825694 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -27,7 +27,7 @@ import azkaban.flow.Flow;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
 import azkaban.user.User;
-import azkaban.utils.db.AbstractJdbcLoader;
+import azkaban.database.AbstractJdbcLoader;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Md5Hasher;
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 4b70fbc..809aeef 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -32,7 +32,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
 
-import azkaban.utils.db.AbstractJdbcLoader;
+import azkaban.database.AbstractJdbcLoader;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
@@ -46,19 +46,19 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 	private static final String scheduleTableName = "schedules";
 
 	private static String SELECT_ALL_SCHEDULES =
-			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
+			"SELECT schedule_id, project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
 	
 	private static String INSERT_SCHEDULE = 
 			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
 	
 	private static String REMOVE_SCHEDULE_BY_KEY = 
-			"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
+			"DELETE FROM " + scheduleTableName + " WHERE schedule_id=?";
 	
 	private static String UPDATE_SCHEDULE_BY_KEY = 
-			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
+			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE schedule_id=?";
 	
 	private static String UPDATE_NEXT_EXEC_TIME = 
-			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE schedule_id=?";
 
 	public EncodingType getDefaultEncodingType() {
 		return defaultEncodingType;
@@ -127,7 +127,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 
 		QueryRunner runner = createQueryRunner();
 		try {
-			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
+			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getScheduleId());
 			if (removes == 0) {
 				throw new ScheduleManagerException("No schedule has been removed.");
 			}
@@ -177,6 +177,15 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 					s.getSubmitUser(),
 					encType.getNumVal(),
 					data);
+			
+			long id = runner.query(LastInsertID.LAST_INSERT_ID, new LastInsertID());
+
+			if (id == -1l) {
+				throw new ScheduleManagerException("Execution id is not properly created.");
+			}
+			logger.info("Schedule given " + s.getScheduleIdentityPair() + " given id " + id);
+			s.setScheduleId((int)id);
+			
 			if (inserts == 0) {
 				throw new ScheduleManagerException("No schedule has been inserted.");
 			}
@@ -194,7 +203,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 		QueryRunner runner = new QueryRunner();
 		try {
 			
-			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
+			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getScheduleId()); 
 		} catch (SQLException e) {
 			e.printStackTrace();
 			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
@@ -242,8 +251,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 					s.getSubmitUser(), 	
 					encType.getNumVal(),
 					data,
-					s.getProjectId(), 
-					s.getFlowName());
+					s.getScheduleId());
 			if (updates == 0) {
 				throw new ScheduleManagerException("No schedule has been updated.");
 			}
@@ -253,6 +261,22 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 		}
 	}
 
+	
+	private static class LastInsertID implements ResultSetHandler<Long> {
+		private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
+		
+		@Override
+		public Long handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return -1l;
+			}
+
+			long id = rs.getLong(1);
+			return id;
+		}
+		
+	}
+	
 	public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
 		@Override
 		public List<Schedule> handle(ResultSet rs) throws SQLException {
@@ -262,19 +286,20 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 			
 			ArrayList<Schedule> schedules = new ArrayList<Schedule>();
 			do {
-				int projectId = rs.getInt(1);
-				String projectName = rs.getString(2);
-				String flowName = rs.getString(3);
-				String status = rs.getString(4);
-				long firstSchedTime = rs.getLong(5);
-				DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
-				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
-				long lastModifyTime = rs.getLong(8);
-				long nextExecTime = rs.getLong(9);
-				long submitTime = rs.getLong(10);
-				String submitUser = rs.getString(11);
-				int encodingType = rs.getInt(12);
-				byte[] data = rs.getBytes(13);
+				int scheduleId = rs.getInt(1);
+				int projectId = rs.getInt(2);
+				String projectName = rs.getString(3);
+				String flowName = rs.getString(4);
+				String status = rs.getString(5);
+				long firstSchedTime = rs.getLong(6);
+				DateTimeZone timezone = DateTimeZone.forID(rs.getString(7));
+				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(8));
+				long lastModifyTime = rs.getLong(9);
+				long nextExecTime = rs.getLong(10);
+				long submitTime = rs.getLong(11);
+				String submitUser = rs.getString(12);
+				int encodingType = rs.getInt(13);
+				byte[] data = rs.getBytes(14);
 				
 				Object optsObj = null;
 				if (data != null) {
@@ -296,7 +321,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 					}
 				}
 				
-				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+				Schedule s = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
 				if (optsObj != null) {
 					s.createAndSetScheduleOptions(optsObj);
 				}
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index b27cc96..f11c0fe 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -40,7 +40,8 @@ public class Schedule{
 //	private long flowGuid;
 	
 //	private String scheduleId;
-	
+
+	private int scheduleId;
 	private int projectId;
 	private String projectName;
 	private String flowName;
@@ -57,6 +58,7 @@ public class Schedule{
 	private SlaOptions slaOptions;
 	
 	public Schedule(
+						int scheduleId,
 						int projectId,
 						String projectName,
 						String flowName,
@@ -69,22 +71,26 @@ public class Schedule{
 						long submitTime,
 						String submitUser
 						) {
-		this.projectId = projectId;
-		this.projectName = projectName;
-		this.flowName = flowName;
-		this.firstSchedTime = firstSchedTime;
-		this.timezone = timezone;
-		this.lastModifyTime = lastModifyTime;
-		this.period = period;
-		this.nextExecTime = nextExecTime;
-		this.submitUser = submitUser;
-		this.status = status;
-		this.submitTime = submitTime;
-		this.executionOptions = null;
-		this.slaOptions = null;
+
+		this(scheduleId, 
+				projectId, 
+				projectName, 
+				flowName, 
+				status,
+				firstSchedTime, 
+				timezone,
+				period,
+				lastModifyTime,
+				nextExecTime,
+				submitTime,
+				submitUser,
+				null,
+				null
+				);
 	}
 
 	public Schedule(
+						int scheduleId,
 						int projectId,
 						String projectName,
 						String flowName,
@@ -99,22 +105,24 @@ public class Schedule{
 						ExecutionOptions executionOptions,
 						SlaOptions slaOptions
 			) {
-		this.projectId = projectId;
-		this.projectName = projectName;
-		this.flowName = flowName;
-		this.firstSchedTime = firstSchedTime;
-		this.timezone = DateTimeZone.forID(timezoneId);
-		this.lastModifyTime = lastModifyTime;
-		this.period = parsePeriodString(period);
-		this.nextExecTime = nextExecTime;
-		this.submitUser = submitUser;
-		this.status = status;
-		this.submitTime = submitTime;
-		this.executionOptions = executionOptions;
-		this.slaOptions = slaOptions;
+		this(scheduleId, projectId, 
+				projectName, 
+				flowName, 
+				status,
+				firstSchedTime, 
+				DateTimeZone.forID(timezoneId),
+				parsePeriodString(period),
+				lastModifyTime,
+				nextExecTime,
+				submitTime,
+				submitUser,
+				executionOptions,
+				slaOptions
+				);
 	}
 
 	public Schedule(
+						int scheduleId,
 						int projectId,
 						String projectName,
 						String flowName,
@@ -129,6 +137,7 @@ public class Schedule{
 						ExecutionOptions executionOptions,
 						SlaOptions slaOptions
 						) {
+		this.scheduleId = scheduleId;
 		this.projectId = projectId;
 		this.projectName = projectName;
 		this.flowName = flowName;
@@ -169,9 +178,17 @@ public class Schedule{
 				new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
 	}
 	
-	public Pair<Integer, String> getScheduleId() {
+	public Pair<Integer, String> getScheduleIdentityPair() {
 		return new Pair<Integer, String>(getProjectId(), getFlowName());
 	}
+
+	public void setScheduleId(int scheduleId) {
+		this.scheduleId = scheduleId;
+	}
+	
+	public int getScheduleId() {
+		return scheduleId;
+	}
 	
 	public int getProjectId() {
 		return projectId;
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 5688f66..00c4f46 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -19,9 +19,11 @@ package azkaban.scheduler;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -36,15 +38,13 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
-
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
-
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLAManager;
 import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SLAManager;
 import azkaban.sla.SlaOptions;
 import azkaban.utils.Pair;
 
@@ -60,7 +60,8 @@ public class ScheduleManager {
 	private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
 	private ScheduleLoader loader;
 
-	private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
+	private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
+	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
 	private final ScheduleRunner runner;
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
@@ -126,20 +127,48 @@ public class ScheduleManager {
 	 * @param id
 	 * @return
 	*/
-	public Schedule getSchedule(int projectId, String flowId) {
-		return scheduleIDMap.get(new Pair<Integer,String>(projectId, flowId));
+	public Set<Schedule> getSchedules(int projectId, String flowId) {
+		return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+	}
+
+	/**
+	 * Returns the scheduled flow for the scheduleId
+	 * 
+	 * @param id
+	 * @return
+	*/
+	public Schedule getSchedule(int scheduleId) {
+		return scheduleIDMap.get(scheduleId);
 	}
 
+
 	/**
 	 * Removes the flow from the schedule if it exists.
 	 * 
 	 * @param id
 	 */
-	public synchronized void removeSchedule(int projectId, String flowId) {
-		Pair<Integer,String> scheduleId = new Pair<Integer,String>(projectId, flowId);
-		
-		Schedule sched = scheduleIDMap.get(scheduleId);
-		scheduleIDMap.remove(scheduleId);
+	public synchronized void removeSchedules(int projectId, String flowId) {
+		Set<Schedule> schedules = getSchedules(projectId, flowId);
+		for(Schedule sched : schedules) {
+			removeSchedule(sched);
+		}
+	}
+	/**
+	 * Removes the flow from the schedule if it exists.
+	 * 
+	 * @param id
+	 */
+	public synchronized void removeSchedule(Schedule sched) {
+
+		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
+		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
+		if(schedules != null) {
+			schedules.remove(sched);
+			if(schedules.size() == 0) {
+				scheduleIdentityPairMap.remove(identityPairMap);
+			}
+		}
+		scheduleIDMap.remove(sched.getScheduleId());
 		
 		runner.removeRunnerSchedule(sched);
 		try {
@@ -173,6 +202,7 @@ public class ScheduleManager {
 	// }
 
 	public Schedule scheduleFlow(
+			final int scheduleId,
 			final int projectId,
 			final String projectName,
 			final String flowName,
@@ -185,10 +215,11 @@ public class ScheduleManager {
 			final long submitTime,
 			final String submitUser
 			) {
-		return scheduleFlow(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
 	}
 	
 	public Schedule scheduleFlow(
+			final int scheduleId,
 			final int projectId,
 			final String projectName,
 			final String flowName,
@@ -203,7 +234,7 @@ public class ScheduleManager {
 			ExecutionOptions execOptions,
 			SlaOptions slaOptions
 			) {
-		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -225,6 +256,12 @@ public class ScheduleManager {
 		s.updateTime();
 		this.runner.addRunnerSchedule(s);
 		scheduleIDMap.put(s.getScheduleId(), s);
+		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+		if(schedules == null) {
+			schedules = new HashSet<Schedule>();
+			scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
+		}
+		schedules.add(s);
 	}
 
 	/**
@@ -233,7 +270,7 @@ public class ScheduleManager {
 	 * @param flow
 	 */
 	public synchronized void insertSchedule(Schedule s) {
-		boolean exist = scheduleIDMap.containsKey(s.getScheduleId());
+		boolean exist = scheduleIdentityPairMap.containsKey(s.getScheduleIdentityPair());
 		if(s.updateTime()) {
 			internalSchedule(s);
 			try {
@@ -370,6 +407,7 @@ public class ScheduleManager {
 
 									// Create ExecutableFlow
 									ExecutableFlow exflow = new ExecutableFlow(flow);
+									exflow.setScheduleId(runningSched.getScheduleId());
 									exflow.setSubmitUser(runningSched.getSubmitUser());
 									exflow.addAllProxyUsers(project.getProxyUsers());
 									
@@ -441,7 +479,7 @@ public class ScheduleManager {
 									loader.updateSchedule(runningSched);
 								}
 								else {
-									removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
+									removeSchedule(runningSched);
 								}								
 							} else {
 								// wait until flow run
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
index ffa4221..367dec3 100644
--- a/src/java/azkaban/sla/JdbcSLALoader.java
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -15,7 +15,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.sla.SLA.SlaRule;
-import azkaban.utils.db.AbstractJdbcLoader;
+import azkaban.database.AbstractJdbcLoader;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index 8916e71..086c234 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -7,35 +7,18 @@ import javax.mail.MessagingException;
 import org.apache.log4j.Logger;
 
 import azkaban.sla.SLA;
+import azkaban.utils.AbstractMailer;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
-import azkaban.utils.Utils;
 
-public class SlaMailer {
+public class SlaMailer extends AbstractMailer {
 	private static Logger logger = Logger.getLogger(SlaMailer.class);
 	
 	private boolean testMode = false;
-	@SuppressWarnings("unused")
-	private String clientHostname;
-	@SuppressWarnings("unused")
-	private String clientPortNumber;
-	
-	private String mailHost;
-	private String mailUser;
-	private String mailPassword;
-	private String mailSender;
-	private String azkabanName;
 	
 	public SlaMailer(Props props) {
-		this.azkabanName = props.getString("azkaban.name", "azkaban");
-		this.mailHost = props.getString("mail.host", "localhost");
-		this.mailUser = props.getString("mail.user", "");
-		this.mailPassword = props.getString("mail.password", "");
-		this.mailSender = props.getString("mail.sender", "");
-		
-		this.clientHostname = props.getString("jetty.hostname", "localhost");
-		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-		
+		super(props);
+
 		testMode = props.getBoolean("test.mode", false);
 	}
 	
@@ -43,12 +26,8 @@ public class SlaMailer {
 		List<String> emailList = s.getEmails();
 		
 		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("SLA violation on " + azkabanName);
-			
+			EmailMessage message = super.createEmailMessage("SLA violation on " + getAzkabanName(), "text/html", emailList);
+
 //			message.println("<h2 style=\"color:#FF0000\"> Execution '" + s.getExecId() + "' of flow '" + flow.getFlowId() + "' failed to meet SLA on " + azkabanName + "</h2>");
 //			message.println("<table>");
 //			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
@@ -56,7 +35,7 @@ public class SlaMailer {
 //			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
 //			message.println("</table>");
 //			message.println("");
-//			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+//			String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
 //			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
 //			
 //			message.println("");
diff --git a/src/java/azkaban/utils/AbstractMailer.java b/src/java/azkaban/utils/AbstractMailer.java
new file mode 100644
index 0000000..85f589c
--- /dev/null
+++ b/src/java/azkaban/utils/AbstractMailer.java
@@ -0,0 +1,70 @@
+package azkaban.utils;
+
+import java.util.Collection;
+
+public class AbstractMailer {
+	private String clientHostname;
+	private int clientPort;
+	private boolean usesSSL;
+	
+	private String mailHost;
+	private String mailUser;
+	private String mailPassword;
+	private String mailSender;
+	private String azkabanName;
+	
+	private String referenceURL;
+	
+	public AbstractMailer(Props props) {
+		this.azkabanName = props.getString("azkaban.name", "azkaban");
+		this.mailHost = props.getString("mail.host", "localhost");
+		this.mailUser = props.getString("mail.user", "");
+		this.mailPassword = props.getString("mail.password", "");
+		this.mailSender = props.getString("mail.sender", "");
+
+		this.clientHostname = props.get("server.hostname");
+		this.clientPort = props.getInt("server.port");
+		this.usesSSL = props.getBoolean("server.useSSL");
+		
+		if (usesSSL) {
+			referenceURL = "https://" + clientHostname + (clientPort==443 ? "/" : ":" + clientPort + "/");
+		}
+		else  {
+			referenceURL = "http://" + clientHostname + (clientPort==80 ? "/" : ":" + clientPort + "/");
+		}
+	}
+	
+	public String getReferenceURL() {
+		return referenceURL;
+	}
+	
+	protected EmailMessage createEmailMessage(String subject, String mimetype, Collection<String> emailList) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+		message.addAllToAddress(emailList);
+		message.setMimeType(mimetype);
+		message.setSubject(subject);
+		
+		return message;
+	}
+	
+	public String getAzkabanName() {
+		return azkabanName;
+	}
+	
+	public String getMailHost() {
+		return mailHost;
+	}
+	
+	public String getMailUser() {
+		return mailUser;
+	}
+	
+	public String getMailPassword() {
+		return mailPassword;
+	}
+	
+	public String getMailSender() {
+		return mailSender;
+	}
+}
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 6f1c0c5..d1b275f 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -3,6 +3,7 @@ package azkaban.utils;
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -20,6 +21,32 @@ import org.apache.commons.io.IOUtils;
  * Runs a few unix commands. Created this so that I can move to JNI in the future.
  */
 public class FileIOUtils {
+	
+	public static class PrefixSuffixFileFilter implements FileFilter {
+		private String prefix;
+		private String suffix;
+		
+		public PrefixSuffixFileFilter(String prefix, String suffix) {
+			this.prefix = prefix;
+			this.suffix = suffix;
+		}
+
+		@Override
+		public boolean accept(File pathname) {			
+			if (!pathname.isFile() || pathname.isHidden()) {
+				return false;
+			}
+
+			String name = pathname.getName();
+			int length = name.length();
+			if (suffix.length() > length || prefix.length() > length ) {
+				return false;
+			}
+			
+			return name.startsWith(prefix) && name.endsWith(suffix);
+		}
+	}
+	
 	public static String getSourcePathFromClass(Class<?> containedClass) {
 		File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
 
diff --git a/src/java/azkaban/webapp/AzkabanServer.java b/src/java/azkaban/webapp/AzkabanServer.java
index b88c9a8..5da23ee 100644
--- a/src/java/azkaban/webapp/AzkabanServer.java
+++ b/src/java/azkaban/webapp/AzkabanServer.java
@@ -1,17 +1,122 @@
 package azkaban.webapp;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 
 import azkaban.user.UserManager;
 import azkaban.utils.Props;
 import azkaban.webapp.session.SessionCache;
 
-public interface AzkabanServer {
-	public Props getServerProps();
+
+public abstract class AzkabanServer {
+	private static final Logger logger = Logger.getLogger(AzkabanServer.class);
+	public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+	public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
+	public static final String DEFAULT_CONF_PATH = "conf";
+	
+	public static Props loadProps(String[] args) {
+		return loadProps(args, new OptionParser());
+	}
+	
+	public static Props loadProps(String[] args, OptionParser parser) {;
+		OptionSpec<String> configDirectory = parser
+				.acceptsAll(Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
+				.withRequiredArg()
+				.describedAs("conf").ofType(String.class);
+
+		// Grabbing the azkaban settings from the conf directory.
+		Props azkabanSettings = null;
+		OptionSet options = parser.parse(args);
+		
+		if (options.has(configDirectory)) {
+			String path = options.valueOf(configDirectory);
+			logger.info("Loading azkaban settings file from " + path);
+			File dir = new File(path);
+			if (!dir.exists()) {
+				logger.error("Conf directory " + path + " doesn't exist.");
+			}
+			else if (!dir.isDirectory()) {
+				logger.error("Conf directory " + path + " isn't a directory.");
+			}
+			else {
+				azkabanSettings = loadAzkabanConfigurationFromDirectory(dir);
+			}
+		} 
+		else {
+			logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
+			azkabanSettings = loadConfigurationFromAzkabanHome();
+		}
+		
+		return azkabanSettings;
+	}
+
+	private static Props loadAzkabanConfigurationFromDirectory(File dir) {
+		File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+		File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+		
+		Props props = null;
+		try {
+			// This is purely optional
+			if (azkabanPrivatePropsFile.exists() && azkabanPrivatePropsFile.isFile()) {
+				logger.info("Loading azkaban private properties file" );
+				props = new Props(null, azkabanPrivatePropsFile);
+			}
+
+			if (azkabanPropsFile.exists() && azkabanPropsFile.isFile()) {
+				logger.info("Loading azkaban properties file" );
+				props = new Props(props, azkabanPropsFile);
+			}
+		} catch (FileNotFoundException e) {
+			logger.error("File not found. Could not load azkaban config file", e);
+		} catch (IOException e) {
+			logger.error("File found, but error reading. Could not load azkaban config file", e);
+		}
+		
+		return props;
+	}
 	
-	public VelocityEngine getVelocityEngine();
+	/**
+	 * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+	 * 
+	 * @return
+	 */
+	private static Props loadConfigurationFromAzkabanHome() {
+		String azkabanHome = System.getenv("AZKABAN_HOME");
+
+		if (azkabanHome == null) {
+			logger.error("AZKABAN_HOME not set. Will try default.");
+			return null;
+		}
+
+		if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
+			logger.error(azkabanHome + " is not a readable directory.");
+			return null;
+		}
+
+		File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+		if (!confPath.exists() || !confPath.isDirectory()
+				|| !confPath.canRead()) {
+			logger.error(azkabanHome + " does not contain a readable conf directory.");
+			return null;
+		}
 
-	public SessionCache getSessionCache();
+		return loadAzkabanConfigurationFromDirectory(confPath);
+	}
+	
+	public abstract Props getServerProps();
+	
+	public abstract SessionCache getSessionCache();
+	
+	public abstract VelocityEngine getVelocityEngine();
 	
-	public UserManager getUserManager();
+	public abstract UserManager getUserManager();
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanSingleServer.java b/src/java/azkaban/webapp/AzkabanSingleServer.java
index 45bef7b..b08139d 100644
--- a/src/java/azkaban/webapp/AzkabanSingleServer.java
+++ b/src/java/azkaban/webapp/AzkabanSingleServer.java
@@ -3,13 +3,31 @@ package azkaban.webapp;
 
 import org.apache.log4j.Logger;
 
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.execapp.AzkabanExecutorServer;
+import azkaban.utils.Props;
 
 public class AzkabanSingleServer {
 	private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
 	public static void main(String[] args) throws Exception {
 		logger.info("Starting Azkaban Server");
 		
+		Props props = AzkabanServer.loadProps(args);
+		if (props == null) {
+			logger.error("Properties not found. Need it to connect to the db.");
+			logger.error("Exiting...");
+			return;
+		}
+
+		boolean checkversion = props.getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, true);
+
+		if (checkversion) {
+			boolean updateDB = props.getBoolean(AzkabanDatabaseSetup.DATABASE_AUTO_UPDATE_TABLES, true);
+			String scriptDir = props.getString(AzkabanDatabaseSetup.DATABASE_SQL_SCRIPT_DIR, "sql");
+			AzkabanDatabaseUpdater.runDatabaseUpdater(props, scriptDir, updateDB);
+		}
+		
 		AzkabanWebServer.main(args);
 		logger.info("Azkaban Web Server started...");
 		AzkabanExecutorServer.main(args);
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index c8f6244..edbf237 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -25,7 +25,6 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -50,7 +49,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
-
+import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
@@ -67,7 +66,6 @@ import azkaban.sla.SLAManager;
 import azkaban.sla.SLAManagerException;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
-import azkaban.utils.db.AbstractJdbcLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -84,10 +82,6 @@ import azkaban.webapp.servlet.ProjectManagerServlet;
 import azkaban.webapp.servlet.ViewerPlugin;
 import azkaban.webapp.session.SessionCache;
 
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-
 /**
  * The Azkaban Jetty server class
  * 
@@ -108,14 +102,13 @@ import joptsimple.OptionSpec;
  * keystore password jetty.truststore - Jetty truststore jetty.trustpassword -
  * Jetty truststore password
  */
-public class AzkabanWebServer implements AzkabanServer {
+public class AzkabanWebServer extends AzkabanServer {
 	private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
 	
 	public static final String AZKABAN_HOME = "AZKABAN_HOME";
 	public static final String DEFAULT_CONF_PATH = "conf";
 	public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
 	public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
-	public static final String JDO_PROPERTIES_FILE = "jdo.properties";
 
 	private static final int MAX_FORM_CONTENT_SIZE = 10*1024*1024;
 	private static final int MAX_HEADER_BUFFER_SIZE = 10*1024*1024;
@@ -130,7 +123,7 @@ public class AzkabanWebServer implements AzkabanServer {
 	private static final String DEFAULT_STATIC_DIR = "";
 
 	private final VelocityEngine velocityEngine;
-
+	
 	private final Server server;
 	private UserManager userManager;
 	private ProjectManager projectManager;
@@ -360,36 +353,8 @@ public class AzkabanWebServer implements AzkabanServer {
 	 * @param args
 	 */
 	public static void main(String[] args) throws Exception {
-		OptionParser parser = new OptionParser();
-
-		OptionSpec<String> configDirectory = parser
-				.acceptsAll(Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
-				.withRequiredArg()
-				.describedAs("conf").ofType(String.class);
-
-		logger.error("Starting Jetty Azkaban...");
-
-		// Grabbing the azkaban settings from the conf directory.
-		Props azkabanSettings = null;
-		OptionSet options = parser.parse(args);
-		if (options.has(configDirectory)) {
-			String path = options.valueOf(configDirectory);
-			logger.info("Loading azkaban settings file from " + path);
-			File dir = new File(path);
-			if (!dir.exists()) {
-				logger.error("Conf directory " + path + " doesn't exist.");
-			}
-			else if (!dir.isDirectory()) {
-				logger.error("Conf directory " + path + " isn't a directory.");
-			}
-			else {
-				azkabanSettings = loadAzkabanConfigurationFromDirectory(dir);
-			}
-		} 
-		else {
-			logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
-			azkabanSettings = loadConfigurationFromAzkabanHome();
-		}
+		logger.error("Starting Jetty Azkaban Executor...");
+		Props azkabanSettings = AzkabanServer.loadProps(args);
 
 		if (azkabanSettings == null) {
 			logger.error("Azkaban Properties not loaded.");
@@ -397,16 +362,15 @@ public class AzkabanWebServer implements AzkabanServer {
 			return;
 		}
 
-		AbstractJdbcLoader.setupTables(azkabanSettings);
-		
 		int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
+
+		boolean ssl;
 		int port;
-		boolean usingSSL = false;
 		final Server server = new Server();
 		if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
 			int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port", DEFAULT_SSL_PORT_NUMBER);
 			port = sslPortNumber;
-			usingSSL = true;
+			ssl = true;
 			logger.info("Setting up Jetty Https Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
 			
 			SslSocketConnector secureConnector = new SslSocketConnector();
@@ -421,14 +385,34 @@ public class AzkabanWebServer implements AzkabanServer {
 			server.addConnector(secureConnector);
 		}
 		else {
+			ssl = false;
 			port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
 			SocketConnector connector = new SocketConnector();
 			connector.setPort(port);
 			connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
 			server.addConnector(connector);
 		}
+		
+		String hostname = azkabanSettings.getString("jetty.hostname", "localhost");
+		azkabanSettings.put("server.hostname", hostname);
+		azkabanSettings.put("server.port", port);
+		azkabanSettings.put("server.useSSL", String.valueOf(ssl));
+
 		app = new AzkabanWebServer(server, azkabanSettings);
 		
+		boolean checkDB = azkabanSettings.getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, false);
+		if (checkDB) {
+			AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(azkabanSettings);
+			setup.loadTableInfo();
+			if(setup.needsUpdating()) {
+				logger.error("Database is out of date.");
+				setup.printUpgradePlan();
+				
+				logger.error("Exiting with error.");
+				System.exit(-1);
+			}
+		}
+		
 		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
 		server.setThreadPool(httpThreadPool);
 
@@ -481,7 +465,7 @@ public class AzkabanWebServer implements AzkabanServer {
 				logger.info("kk thx bye.");
 			}
 		});
-		logger.info("Server running on " + (usingSSL ? "ssl" : "") + " port " + port + ".");
+		logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port + ".");
 	}
 
 	private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index ea19307..a542a82 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -566,19 +566,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 		ret.put("nodeStatus", nodeStatus);
 		ret.put("disabled", options.getDisabledJobs());
-		
-		Schedule sflow = scheduleManager.getSchedule(project.getId(), exflow.getFlowId());
-		
-		for (Schedule sched: scheduleManager.getSchedules()) {
-			if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(exflow.getFlowId())) {
-				sflow = sched;
-				break;
-			}
-		}
-		
-		if (sflow != null) {
-			ret.put("scheduled", sflow.getNextExecTime());
-		}
 	}
 	
 	private void ajaxCancelFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 764374f..50afa9b 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -109,16 +109,16 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
 		try {
 			
-			int projectId = getIntParam(req, "projectId");
-			String flowName = getParam(req, "flowName");
+			int scheduleId = getIntParam(req, "scheduleId");
 			
-			Project project = projectManager.getProject(projectId);
+			Schedule sched = scheduleManager.getSchedule(scheduleId);
+			
+			Project project = projectManager.getProject(sched.getProjectId());
 			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
 				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
 				return;
 			}
 			
-			Schedule sched = scheduleManager.getSchedule(projectId, flowName);
 			
 			SlaOptions slaOptions= new SlaOptions();
 			
@@ -155,7 +155,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			scheduleManager.insertSchedule(sched);
 
 			if(slaOptions != null) {
-				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + flowName + " has been added/changed.");
+				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
 			}
 			
 		} catch (ServletException e) {
@@ -204,26 +204,24 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-		int projId;
-		String flowName;
+		int scheduleId;
 		try {
-			projId = getIntParam(req, "projId");
-			flowName = getParam(req, "flowName");
+			scheduleId = getIntParam(req, "scheduleId");
+			
+			Schedule sched = scheduleManager.getSchedule(scheduleId);
 			
-			Project project = getProjectAjaxByPermission(ret, projId, user, Type.READ);
+			Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
 			if (project == null) {
-				ret.put("error", "Error loading project. Project " + projId + " doesn't exist");
+				ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
 				return;
 			}
 			
-			Flow flow = project.getFlow(flowName);
+			Flow flow = project.getFlow(sched.getFlowName());
 			if (flow == null) {
-				ret.put("error", "Error loading flow. Flow " + flowName + " doesn't exist in " + projId);
+				ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
 				return;
 			}
 			
-			Schedule sched = scheduleManager.getSchedule(projId, flowName);
-			
 			SlaOptions slaOptions = sched.getSlaOptions();
 			ExecutionOptions flowOptions = sched.getExecutionOptions();
 			
@@ -345,73 +343,17 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	private void ajaxLoadFlows(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
-		// Very long day...
-//		long day = getLongParam(req, "day");
-//		boolean loadPrevious = getIntParam(req, "loadPrev") != 0;
-
+		
 		List<Schedule> schedules = scheduleManager.getSchedules();
 		// See if anything is scheduled
 		if (schedules.size() <= 0)
 			return;
-//
-//		// Since size is larger than 0, there's at least one element.
-//		DateTime date = new DateTime(day);
-//		// Get only the day component while stripping the time component. This
-//		// gives us 12:00:00AM of that day
-//		DateTime start = date.withTime(0, 0, 0, 0);
-//		// Next day
-//		DateTime end = start.plusDays(1);
-//		// Get microseconds
-//		long startTime = start.getMillis();
-//		long endTime = end.getMillis();
 
 		List<HashMap<String, String>> output = new ArrayList<HashMap<String, String>>();
 		ret.put("items", output);
 
 		for (Schedule schedule : schedules) {
 			writeScheduleData(output, schedule);
-//			long length = 2*3600*1000; //TODO: This is temporary
-//			long firstTime = schedule.getFirstSchedTime();
-//			long period = 0;
-//
-//			if (schedule.getPeriod() != null) {
-//				period = start.plus(schedule.getPeriod()).getMillis() - startTime;
-//
-//				// Shift time until we're past the start time
-//				if (period > 0) {
-//					// Calculate next execution time efficiently
-//					long periods = (startTime - firstTime) / period;
-//					// Take into account items that ends in the date specified, but does not start on that date
-//					if(loadPrevious)
-//					{
-//						periods = (startTime - firstTime - length) / period;
-//					}
-//					if(periods < 0){
-//						periods = 0;
-//					}
-//					firstTime += period * periods;
-//					// Increment in case we haven't arrived yet. This will apply
-//					// to most of the cases
-//					while ((loadPrevious && firstTime < startTime) || (!loadPrevious && firstTime + length < startTime)) {
-//						firstTime += period;
-//					}
-//				}
-//			}
-//
-//			// Bad or no period
-//			if (period <= 0) {
-//				// Single instance case
-//				if (firstTime >= startTime && firstTime < endTime) {
-//					writeScheduleData(output, schedule, firstTime, length, startTime, endTime);
-//				}
-//			}
-//			else {
-//				// Repetitive schedule, firstTime is assumed to be after startTime
-//				while (firstTime < endTime) {
-//					writeScheduleData(output, schedule, firstTime, length, startTime, endTime);
-//					firstTime += period;
-//				}
-//			}
 		}
 	}
 
@@ -433,32 +375,34 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
-		int projectId = getIntParam(req, "projectId");
-		String flowName = getParam(req, "flowName");
-		Schedule sched = scheduleManager.getSchedule(projectId, flowName);
-
-//		int projectId = sched.getProjectId();
+		int scheduleId = getIntParam(req, "scheduleId");
+		Schedule sched = scheduleManager.getSchedule(scheduleId);
+		if(sched == null) {
+			ret.put("message", "Schedule with ID " + scheduleId + " does not exist");
+			ret.put("status", "error");
+			return;
+		}
 
-		Project project = projectManager.getProject(projectId);
+		Project project = projectManager.getProject(sched.getProjectId());
 		
 		if (project == null) {
-			ret.put("message", "Project " + projectId + " does not exist");
+			ret.put("message", "Project " + sched.getProjectId() + " does not exist");
 			ret.put("status", "error");
 			return;
 		}
 		
 		if(!hasPermission(project, user, Type.SCHEDULE)) {
 			ret.put("status", "error");
-			ret.put("message", "Permission denied. Cannot remove schedule " + projectId + "."  + flowName);
+			ret.put("message", "Permission denied. Cannot remove schedule with id " + scheduleId);
 			return;
 		}
 
-		scheduleManager.removeSchedule(projectId, flowName);
+		scheduleManager.removeSchedule(sched);
 		logger.info("User '" + user.getUserId() + " has removed schedule " + sched.getScheduleName());
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + sched.toString() + " has been removed.");
 		
 		ret.put("status", "success");
-		ret.put("message", "flow " + flowName + " removed from Schedules.");
+		ret.put("message", "flow " + sched.getFlowName() + " removed from Schedules.");
 		return;
 	}
 
@@ -509,7 +453,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 		}
 		
-		Schedule sched = scheduleManager.getSchedule(projectId, flowName);
+		// Schedule sched = scheduleManager.getSchedule(projectId, flowName);
 		ExecutionOptions flowOptions = null;
 		try {
 			flowOptions = HttpRequestUtils.parseFlowOptions(req);
@@ -518,12 +462,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 		}
 		SlaOptions slaOptions = null;
-		if(sched != null) {
-			if(sched.getSlaOptions() != null) {
-				slaOptions = sched.getSlaOptions();
-			}
-		}
-		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+		
+		Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index f65bdfb..4e2fbd8 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -69,6 +69,7 @@
 					<thead>
 						<tr>
 							<!--th class="execid">Execution Id</th-->
+							<th>ID</th>
 							<th>Flow</th>
 							<th>Project</th>
 							<th>Submitted By</th>
@@ -84,6 +85,7 @@
 #foreach($sched in $schedules)
 						<tr class="row" >
 
+							<td>${sched.scheduleId}</td>
 							<td class="tb-name">
 								<a href="${context}/manager?project=${sched.projectName}&flow=${sched.flowName}">${sched.flowName}</a>
 							</td>
@@ -95,8 +97,8 @@
 							<td>$utils.formatDateTime(${sched.nextExecTime})</td>
 							<td>$utils.formatPeriod(${sched.period})</td>
 							<td>#if(${sched.slaOptions}) true #else false #end</td>
-							<td><button id="removeSchedBtn" onclick="removeSched(${sched.projectId}, '${sched.flowName}')" >Remove Schedule</button></td>
-							<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.projectId}, '${sched.flowName}')" >Set SLA</button></td>
+							<td><button id="removeSchedBtn" onclick="removeSched(${sched.scheduleId})" >Remove Schedule</button></td>
+							<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.scheduleId}, '${sched.flowName}')" >Set SLA</button></td>
 						</tr>
 #end
 #else
diff --git a/src/package/execserver/bin/azkaban-executor-shutdown.sh b/src/package/execserver/bin/azkaban-executor-shutdown.sh
new file mode 100755
index 0000000..76fd200
--- /dev/null
+++ b/src/package/execserver/bin/azkaban-executor-shutdown.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+azkaban_dir=$(dirname $0)/..
+
+executorport=`cat $azkaban_dir/conf/azkaban.properties | grep executor.port | cut -d = -f 2`
+echo "Shutting down current running AzkabanExecutorServer at port $executorport"
+
+proc=`cat $azkaban_dir/currentpid`
+
+kill $proc
+
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/execserver/conf/azkaban.private.properties b/src/package/execserver/conf/azkaban.private.properties
new file mode 100644
index 0000000..cce1792
--- /dev/null
+++ b/src/package/execserver/conf/azkaban.private.properties
@@ -0,0 +1 @@
+# Optional Properties that are hidden to the executions
\ No newline at end of file
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
new file mode 100644
index 0000000..f42ea60
--- /dev/null
+++ b/src/package/execserver/conf/azkaban.properties
@@ -0,0 +1,19 @@
+#Azkaban
+default.timezone.id=America/Los_Angeles
+
+#Loader for projects
+executor.global.properties=conf/global.properties
+azkaban.project.dir=projects
+
+database.type=mysql
+mysql.port=3306
+mysql.host=localhost
+mysql.database=azkaban2
+mysql.user=azkaban
+mysql.password=azkaban
+mysql.numconnections=100
+
+# Azkaban Executor settings
+executor.maxThreads=50
+executor.port=12321
+executor.flow.threads=30
\ No newline at end of file
diff --git a/src/package/soloserver/bin/azkaban-solo-shutdown.sh b/src/package/soloserver/bin/azkaban-solo-shutdown.sh
new file mode 100755
index 0000000..e97bc32
--- /dev/null
+++ b/src/package/soloserver/bin/azkaban-solo-shutdown.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+azkaban_dir=$(dirname $0)/..
+
+proc=`cat $azkaban_dir/currentpid`
+echo "killing AzkabanSingleServer"
+kill $proc
+
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/soloserver/conf/azkaban.private.properties b/src/package/soloserver/conf/azkaban.private.properties
new file mode 100644
index 0000000..cce1792
--- /dev/null
+++ b/src/package/soloserver/conf/azkaban.private.properties
@@ -0,0 +1 @@
+# Optional Properties that are hidden to the executions
\ No newline at end of file
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
new file mode 100644
index 0000000..ac36e26
--- /dev/null
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -0,0 +1,42 @@
+#Azkaban Personalization Settings
+azkaban.name=Local
+azkaban.label=My Local Azkaban
+azkaban.color=#FF3601
+web.resource.dir=web/
+default.timezone.id=America/Los_Angeles
+
+#Azkaban UserManager class
+user.manager.class=azkaban.user.XmlUserManager
+user.manager.xml.file=conf/azkaban-users.xml
+
+#Loader for projects
+executor.global.properties=conf/global.properties
+azkaban.project.dir=projects
+
+database.sql.scripts.dir=sql
+database.check.version=true
+database.auto.update.tables=true
+database.type=h2
+h2.path=data/azkaban
+h2.create.tables=true
+
+# Velocity dev mode
+velocity.dev.mode=false
+
+# Azkaban Jetty server properties. Ignored in tomcat
+jetty.use.ssl=false
+jetty.maxThreads=25
+jetty.port=8081
+
+# Azkaban Executor settings
+executor.maxThreads=50
+executor.port=12321
+executor.flow.threads=30
+
+# mail settings
+mail.sender=
+mail.host=
+job.failure.email=
+job.success.email=
+
+lockdown.create.projects=false
\ No newline at end of file
diff --git a/src/package/soloserver/conf/global.properties b/src/package/soloserver/conf/global.properties
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/package/soloserver/conf/global.properties
diff --git a/src/package/webserver/bin/azkaban-web-start.sh b/src/package/webserver/bin/azkaban-web-start.sh
new file mode 100755
index 0000000..bdd19e0
--- /dev/null
+++ b/src/package/webserver/bin/azkaban-web-start.sh
@@ -0,0 +1,36 @@
+azkaban_dir=$(dirname $0)/..
+
+if [[ -z "$tmpdir" ]]; then
+tmpdir=temp
+fi
+
+for file in $azkaban_dir/lib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $azkaban_dir/extlib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $azkaban_dir/plugins/*/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+echo $azkaban_dir;
+echo $CLASSPATH;
+
+executorport=`cat $azkaban_dir/conf/azkaban.properties | grep executor.port | cut -d = -f 2`
+serverpath=`pwd`
+
+if [ -z $AZKABAN_OPTS ]; then
+  AZKABAN_OPTS=-Xmx3G
+fi
+AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
+
+java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanWebServer -conf $azkaban_dir/conf $@ &
+
+echo $! > currentpid
+
diff --git a/src/package/webserver/conf/azkaban-users.xml b/src/package/webserver/conf/azkaban-users.xml
new file mode 100644
index 0000000..b30da41
--- /dev/null
+++ b/src/package/webserver/conf/azkaban-users.xml
@@ -0,0 +1,5 @@
+<azkaban-users>
+	<user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />
+	
+	<role name="admin" permissions="ADMIN" />
+</azkaban-users>
diff --git a/src/sql/create.properties.sql b/src/sql/create.properties.sql
new file mode 100644
index 0000000..aaa37ec
--- /dev/null
+++ b/src/sql/create.properties.sql
@@ -0,0 +1,7 @@
+CREATE TABLE properties (
+	name VARCHAR(64) NOT NULL,
+	type INT NOT NULL,
+	modified_time BIGINT NOT NULL,
+	value VARCHAR(256),
+	PRIMARY KEY (name, type)
+);
\ No newline at end of file
diff --git a/src/sql/database.properties b/src/sql/database.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/sql/database.properties
@@ -0,0 +1 @@
+
diff --git a/src/sql/update.execution_jobs.2.1.sql b/src/sql/update.execution_jobs.2.1.sql
new file mode 100644
index 0000000..b7313ad
--- /dev/null
+++ b/src/sql/update.execution_jobs.2.1.sql
@@ -0,0 +1,4 @@
+ALTER TABLE execution_jobs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_jobs DROP PRIMARY KEY;
+ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
+ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
diff --git a/src/sql/update.execution_logs.2.1.sql b/src/sql/update.execution_logs.2.1.sql
new file mode 100644
index 0000000..5c2dc0b
--- /dev/null
+++ b/src/sql/update.execution_logs.2.1.sql
@@ -0,0 +1,7 @@
+ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1420099200000;
+UPDATE execution_logs SET upload_time=(UNIX_TIMESTAMP()*1000) WHERE upload_time=1420099200000;
+
+ALTER TABLE execution_logs DROP PRIMARY KEY;
+ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
+ALTER TABLE execution_logs ADD INDEX ex_log_attempt (exec_id, name, attempt)
diff --git a/src/sql/update.project_events.2.1.sql b/src/sql/update.project_events.2.1.sql
new file mode 100644
index 0000000..14d7554
--- /dev/null
+++ b/src/sql/update.project_events.2.1.sql
@@ -0,0 +1 @@
+ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);
diff --git a/src/sql/update.projects.2.1.sql b/src/sql/update.projects.2.1.sql
new file mode 100644
index 0000000..a2a6e48
--- /dev/null
+++ b/src/sql/update.projects.2.1.sql
@@ -0,0 +1,2 @@
+ALTER TABLE projects ADD COLUMN enc_type TINYINT;
+ALTER TABLE projects ADD COLUMN settings_blob LONGBLOB;
diff --git a/src/sql/update.schedules.2.1.sql b/src/sql/update.schedules.2.1.sql
new file mode 100644
index 0000000..fbf38ce
--- /dev/null
+++ b/src/sql/update.schedules.2.1.sql
@@ -0,0 +1,2 @@
+ALTER TABLE schedules ADD COLUMN enc_type TINYINT;
+ALTER TABLE schedules ADD COLUMN schedule_options LONGBLOB;
diff --git a/src/sql/update.schedules.2.2.sql b/src/sql/update.schedules.2.2.sql
new file mode 100644
index 0000000..98849db
--- /dev/null
+++ b/src/sql/update.schedules.2.2.sql
@@ -0,0 +1,3 @@
+ALTER TABLE schedules DROP PRIMARY KEY;
+ALTER TABLE schedules ADD COLUMN schedule_id INT PRIMARY KEY NOT NULL AUTO_INCREMENT;
+ALTER TABLE schedules ADD INDEX project_id (project_id, flow_name);
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index a6f1e83..f507dc1 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -1,12 +1,12 @@
 $.namespace('azkaban');
 
 
-function removeSched(projectId, flowName) {
+function removeSched(scheduleId) {
 	var scheduleURL = contextURL + "/schedule"
 	var redirectURL = contextURL + "/schedule"
 	$.post(
 			scheduleURL,
-			{"action":"removeSched", "projectId":projectId, "flowName":flowName},
+			{"action":"removeSched", "scheduleId":scheduleId},
 			function(data) {
 				if (data.error) {
 //                 alert(data.error)
@@ -21,12 +21,12 @@ function removeSched(projectId, flowName) {
 	)
 }
 
-function removeSla(projectId, flowName) {
+function removeSla(scheduleId) {
 	var scheduleURL = contextURL + "/schedule"
 	var redirectURL = contextURL + "/schedule"
 	$.post(
 			scheduleURL,
-			{"action":"removeSla", "projectId":projectId, "flowName":flowName},
+			{"action":"removeSla", "scheduleId":scheduleId},
 			function(data) {
 				if (data.error) {
 //                 alert(data.error)
@@ -68,9 +68,8 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		}
 		
 	},
-	initFromSched: function(projId, flowName) {
-		this.projectId = projId;
-		this.flowName = flowName;
+	initFromSched: function(scheduleId, flowName) {
+		this.scheduleId = scheduleId;
 		
 		var scheduleURL = contextURL + "/schedule"
 		this.scheduleURL = scheduleURL;
@@ -83,7 +82,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		var ruleBoxOptions = ["SUCCESS", "FINISH"];
 		this.ruleBoxOptions = ruleBoxOptions;
 		
-		var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
+		var fetchScheduleData = {"scheduleId": this.scheduleId, "ajax":"slaInfo"};
 		
 		$.get(
 				this.scheduleURL,
@@ -194,7 +193,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		var redirectURL = this.scheduleURL;
 		$.post(
 				scheduleURL,
-				{"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
+				{"action":"removeSla", "scheduleId":this.scheduleId},
 				function(data) {
 				if (data.error) {
 						$('#errorMsg').text(data.error)
@@ -225,8 +224,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		}
 
 		var slaData = {
-			projectId: this.projectId,
-			flowName: this.flowName,
+			scheduleId: this.scheduleId,
 			ajax: "setSla",			
 			slaEmails: slaEmails,
 			settings: settings
diff --git a/unit/conf/dbtesth2/azkaban.properties b/unit/conf/dbtesth2/azkaban.properties
new file mode 100644
index 0000000..63ed5a9
--- /dev/null
+++ b/unit/conf/dbtesth2/azkaban.properties
@@ -0,0 +1,3 @@
+database.sql.scripts.dir=unit/sql
+database.type=h2
+h2.path=h2dbtest/h2db
diff --git a/unit/conf/dbtestmysql/azkaban.properties b/unit/conf/dbtestmysql/azkaban.properties
new file mode 100644
index 0000000..e974067
--- /dev/null
+++ b/unit/conf/dbtestmysql/azkaban.properties
@@ -0,0 +1,8 @@
+database.type=mysql
+mysql.port=3306
+mysql.host=localhost
+mysql.database=azkabanunittest
+mysql.user=root
+database.sql.scripts.dir=unit/sql
+mysql.password=
+mysql.numconnections=10
\ No newline at end of file
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 7ead267..5fc7235 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -22,7 +22,7 @@ import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
 import azkaban.sla.SlaOptions;
-import azkaban.utils.db.DataSourceUtils;
+import azkaban.database.DataSourceUtils;
 import azkaban.utils.Props;
 
 public class JdbcScheduleLoaderTest {
@@ -127,12 +127,12 @@ public class JdbcScheduleLoaderTest {
 		slaOptions.setSlaEmails(emails);
 		slaOptions.setSettings(slaSets);
 		
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
-		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
+		Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 		
 		loader.insertSchedule(s1);
 		loader.insertSchedule(s2);
@@ -194,14 +194,14 @@ public class JdbcScheduleLoaderTest {
 		
 		System.out.println("the flow options are " + flowOptions);
 		System.out.println("the sla options are " + slaOptions);
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 
 		loader.insertSchedule(s1);
 		
 		emails.add("email3");
 		slaOptions.setSlaEmails(emails);
 		
-		Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
 
 		loader.updateSchedule(s2);
 		
@@ -253,7 +253,7 @@ public class JdbcScheduleLoaderTest {
 			slaOptions.setSlaEmails(emails);
 			slaOptions.setSettings(slaSets);
 			
-			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+			Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 			schedules.add(s);
 			try {
 				loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java b/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java
new file mode 100644
index 0000000..23d548c
--- /dev/null
+++ b/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java
@@ -0,0 +1,129 @@
+package azkaban.test.database;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import junit.framework.Assert;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.Props;
+
+public class AzkabanDatabaseSetupTest {
+	@BeforeClass
+	public static void setupDB() throws IOException, SQLException {
+		File dbDir = new File("h2dbtest");
+		if (dbDir.exists()) {
+			FileUtils.deleteDirectory(dbDir);
+		}
+		
+		dbDir.mkdir();
+		
+		clearUnitTestDB();
+	}
+	
+	@AfterClass
+	public static void teardownDB() {
+	}
+	
+	@Test
+	public void testH2Query() throws Exception {
+		Props h2Props = getH2Props();
+		AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(h2Props);
+		
+		// First time will create the tables
+		setup.loadTableInfo();
+		setup.printUpgradePlan();
+		setup.updateDatabase(true, true);
+		Assert.assertTrue(setup.needsUpdating());
+		
+		// Second time will update some tables. This is only for testing purpose and obviously we
+		// wouldn't set things up this way.
+		setup.loadTableInfo();
+		setup.printUpgradePlan();
+		setup.updateDatabase(true, true);
+		Assert.assertTrue(setup.needsUpdating());
+		
+		// Nothing to be done
+		setup.loadTableInfo();
+		setup.printUpgradePlan();
+		Assert.assertFalse(setup.needsUpdating());
+	}
+	
+	@Test
+	public void testMySQLQuery() throws Exception {
+		Props mysqlProps = getMySQLProps();
+		AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(mysqlProps);
+		
+		// First time will create the tables
+		setup.loadTableInfo();
+		setup.printUpgradePlan();
+		setup.updateDatabase(true, true);
+		Assert.assertTrue(setup.needsUpdating());
+		
+		// Second time will update some tables. This is only for testing purpose and obviously we
+		// wouldn't set things up this way.
+		setup.loadTableInfo();
+		setup.printUpgradePlan();
+		setup.updateDatabase(true, true);
+		Assert.assertTrue(setup.needsUpdating());
+		
+		// Nothing to be done
+		setup.loadTableInfo();
+		setup.printUpgradePlan();
+		Assert.assertFalse(setup.needsUpdating());
+	}
+	
+	private static Props getH2Props() {
+		Props props = new Props();
+		props.put("database.type", "h2");
+		props.put("h2.path", "h2dbtest/h2db");
+		props.put("database.sql.scripts.dir", "unit/sql");
+		
+		return props;
+	}
+	
+	private static Props getMySQLProps() {
+		Props props = new Props();
+
+		props.put("database.type", "mysql");
+		props.put("mysql.port", "3306");
+		props.put("mysql.host", "localhost");
+		props.put("mysql.database", "azkabanunittest");
+		props.put("mysql.user", "root");
+		props.put("database.sql.scripts.dir", "unit/sql");
+		props.put("mysql.password", "");
+		props.put("mysql.numconnections", 10);
+		
+		return props;
+	}
+	
+	private static void clearUnitTestDB() throws SQLException {
+		Props props = new Props();
+
+		props.put("database.type", "mysql");
+		props.put("mysql.host", "localhost");
+		props.put("mysql.port", "3306");
+		props.put("mysql.database", "");
+		props.put("mysql.user", "root");
+		props.put("mysql.password", "");
+		props.put("mysql.numconnections", 10);
+		
+		DataSource datasource = DataSourceUtils.getDataSource(props);
+		QueryRunner runner = new QueryRunner(datasource);
+		try {
+			runner.update("drop database azkabanunittest");
+		} catch (SQLException e) {
+		}
+		runner.update("create database azkabanunittest");
+	}
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/database/AzkabanDatabaseUpdaterTest.java b/unit/java/azkaban/test/database/AzkabanDatabaseUpdaterTest.java
new file mode 100644
index 0000000..9948db3
--- /dev/null
+++ b/unit/java/azkaban/test/database/AzkabanDatabaseUpdaterTest.java
@@ -0,0 +1,93 @@
+package azkaban.test.database;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.database.AzkabanDatabaseUpdater;
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.Props;
+
+public class AzkabanDatabaseUpdaterTest {
+	@BeforeClass
+	public static void setupDB() throws IOException, SQLException {
+		File dbDir = new File("h2dbtest");
+		if (dbDir.exists()) {
+			FileUtils.deleteDirectory(dbDir);
+		}
+		
+		dbDir.mkdir();
+		
+		clearUnitTestDB();
+	}
+	
+	@AfterClass
+	public static void teardownDB() {
+	}
+	
+	@Test
+	public void testMySQLAutoCreate() throws Exception {
+		String confDir = "unit/conf/dbtestmysql";
+		System.out.println("1.***Now testing check");
+		AzkabanDatabaseUpdater.main(new String[]{"-c",confDir});
+		
+		System.out.println("2.***Now testing update");
+		AzkabanDatabaseUpdater.main(new String[]{ "-u", "-c",confDir});
+		
+		System.out.println("3.***Now testing check again");
+		AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+		
+		System.out.println("4.***Now testing update again");
+		AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir, "-u"});
+		
+		System.out.println("5.***Now testing check again");
+		AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+	}
+	
+	@Test
+	public void testH2AutoCreate() throws Exception {
+		String confDir = "unit/conf/dbtesth2";
+		System.out.println("1.***Now testing check");
+		AzkabanDatabaseUpdater.main(new String[]{"-c",confDir});
+		
+		System.out.println("2.***Now testing update");
+		AzkabanDatabaseUpdater.main(new String[]{ "-u", "-c",confDir});
+		
+		System.out.println("3.***Now testing check again");
+		AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+		
+		System.out.println("4.***Now testing update again");
+		AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir, "-u"});
+		
+		System.out.println("5.***Now testing check again");
+		AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+	}
+	
+	private static void clearUnitTestDB() throws SQLException {
+		Props props = new Props();
+
+		props.put("database.type", "mysql");
+		props.put("mysql.host", "localhost");
+		props.put("mysql.port", "3306");
+		props.put("mysql.database", "");
+		props.put("mysql.user", "root");
+		props.put("mysql.password", "");
+		props.put("mysql.numconnections", 10);
+		
+		DataSource datasource = DataSourceUtils.getDataSource(props);
+		QueryRunner runner = new QueryRunner(datasource);
+		try {
+			runner.update("drop database azkabanunittest");
+		} catch (SQLException e) {
+		}
+		runner.update("create database azkabanunittest");
+	}
+}
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 3603608..ba89a20 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -31,7 +31,7 @@ import azkaban.executor.JdbcExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 
-import azkaban.utils.db.DataSourceUtils;
+import azkaban.database.DataSourceUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
diff --git a/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java b/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
index 008a812..85f874c 100644
--- a/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
+++ b/unit/java/azkaban/test/project/JdbcProjectLoaderTest.java
@@ -30,7 +30,7 @@ import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.ProjectManagerException;
 import azkaban.user.Permission;
 import azkaban.user.User;
-import azkaban.utils.db.DataSourceUtils;
+import azkaban.database.DataSourceUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
index 23473f7..373bfac 100644
--- a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -22,11 +22,9 @@ import azkaban.sla.SLA;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLALoader;
-import azkaban.utils.db.DataSourceUtils;
+import azkaban.database.DataSourceUtils;
 import azkaban.utils.Props;
 
-
-
 public class JdbcSLALoaderTest {
 	private static boolean testDBExists;
 	//@TODO remove this and turn into local host.
diff --git a/unit/sql/create.properties.sql b/unit/sql/create.properties.sql
new file mode 100644
index 0000000..aaa37ec
--- /dev/null
+++ b/unit/sql/create.properties.sql
@@ -0,0 +1,7 @@
+CREATE TABLE properties (
+	name VARCHAR(64) NOT NULL,
+	type INT NOT NULL,
+	modified_time BIGINT NOT NULL,
+	value VARCHAR(256),
+	PRIMARY KEY (name, type)
+);
\ No newline at end of file
diff --git a/unit/sql/database.properties b/unit/sql/database.properties
new file mode 100644
index 0000000..7b323f0
--- /dev/null
+++ b/unit/sql/database.properties
@@ -0,0 +1 @@
+version=2.2
diff --git a/unit/sql/update.execution_jobs.2.1.sql b/unit/sql/update.execution_jobs.2.1.sql
new file mode 100644
index 0000000..b7313ad
--- /dev/null
+++ b/unit/sql/update.execution_jobs.2.1.sql
@@ -0,0 +1,4 @@
+ALTER TABLE execution_jobs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_jobs DROP PRIMARY KEY;
+ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
+ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
diff --git a/unit/sql/update.execution_jobs.2.3.sql b/unit/sql/update.execution_jobs.2.3.sql
new file mode 100644
index 0000000..8c43495
--- /dev/null
+++ b/unit/sql/update.execution_jobs.2.3.sql
@@ -0,0 +1 @@
+INSERT INTO properties (name,value,modified_time,type) VALUES ('test4', 'value1', 0, 99);
\ No newline at end of file
diff --git a/unit/sql/update.execution_logs.2.1.sql b/unit/sql/update.execution_logs.2.1.sql
new file mode 100644
index 0000000..5c2dc0b
--- /dev/null
+++ b/unit/sql/update.execution_logs.2.1.sql
@@ -0,0 +1,7 @@
+ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1420099200000;
+UPDATE execution_logs SET upload_time=(UNIX_TIMESTAMP()*1000) WHERE upload_time=1420099200000;
+
+ALTER TABLE execution_logs DROP PRIMARY KEY;
+ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
+ALTER TABLE execution_logs ADD INDEX ex_log_attempt (exec_id, name, attempt)
diff --git a/unit/sql/update.execution_logs.2.4.sql b/unit/sql/update.execution_logs.2.4.sql
new file mode 100644
index 0000000..f0a7aae
--- /dev/null
+++ b/unit/sql/update.execution_logs.2.4.sql
@@ -0,0 +1 @@
+INSERT INTO properties (name,value,modified_time,type) VALUES ('test', 'value1', 0, 99);
\ No newline at end of file
diff --git a/unit/sql/update.execution_logs.2.7.sql b/unit/sql/update.execution_logs.2.7.sql
new file mode 100644
index 0000000..9974cc7
--- /dev/null
+++ b/unit/sql/update.execution_logs.2.7.sql
@@ -0,0 +1 @@
+INSERT INTO properties (name,value,modified_time,type) VALUES ('test1','value1', 0, 99);
\ No newline at end of file
diff --git a/unit/sql/update.project_events.2.1.sql b/unit/sql/update.project_events.2.1.sql
new file mode 100644
index 0000000..14d7554
--- /dev/null
+++ b/unit/sql/update.project_events.2.1.sql
@@ -0,0 +1 @@
+ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);