azkaban-memoizeit
Changes
build.properties 2(+1 -1)
build.xml 120(+109 -11)
src/java/azkaban/executor/ExecutorMailer.java 64(+23 -41)
src/java/azkaban/sla/SlaMailer.java 35(+7 -28)
src/java/azkaban/utils/AbstractMailer.java 70(+70 -0)
src/java/azkaban/webapp/AzkabanServer.java 115(+110 -5)
src/java/azkaban/webapp/AzkabanWebServer.java 71(+29 -42)
src/sql/create.properties.sql 2(+1 -1)
src/sql/create.schedules.sql 5(+4 -1)
src/sql/database.properties 2(+1 -1)
src/sql/update.schedules.2.2.sql 3(+3 -0)
Details
build.properties 2(+1 -1)
diff --git a/build.properties b/build.properties
index a72a058..aa6322d 100644
--- a/build.properties
+++ b/build.properties
@@ -1,3 +1,3 @@
name=azkaban
-version=2.1
+version=2.2
spec.file=azkaban.spec
build.xml 120(+109 -11)
diff --git a/build.xml b/build.xml
index 244b7e4..86db40b 100644
--- a/build.xml
+++ b/build.xml
@@ -8,9 +8,14 @@
<property name="dist.packages.dir" value="${basedir}/dist/packages" />
<property name="dist.web.package.dir" value="${dist.packages.dir}/azkaban-web-server" />
<property name="dist.exec.package.dir" value="${dist.packages.dir}/azkaban-exec-server" />
+ <property name="dist.solo.package.dir" value="${dist.packages.dir}/azkaban-solo-server" />
<property name="dist.sql.package.dir" value="${dist.packages.dir}/sql" />
<property name="conf.dir" value="${basedir}/conf" />
+ <property name="web.package.dir" value="${basedir}/src/package/webserver" />
+ <property name="exec.package.dir" value="${basedir}/src/package/execserver" />
+ <property name="solo.package.dir" value="${basedir}/src/package/soloserver" />
+
<property name="lib.dir" value="${basedir}/lib" />
<property name="bin.dir" value="${basedir}/bin" />
<property name="java.src.dir" value="${basedir}/src/java" />
@@ -67,24 +72,60 @@
</jar>
</target>
+ <target name="create-update-script" description="Prepare the creation of the Azkaban Scripts">
+ <!-- Generic update table scripts -->
+ <concat destfile="${dist.sql.package.dir}/update-all-sql-${updateVersion}.sql" fixlastline="yes">
+ <fileset dir="${sql.src.dir}" >
+ <include name="update.*.${updateVersion}.sql"/>
+ </fileset>
+ </concat>
+ </target>
+
+ <target name="create-update-script-2.1" description="Prepare the creation of the Azkaban Scripts">
+ <!-- 2.1 added the active_sla table -->
+ <concat destfile="${dist.sql.package.dir}/update-all-sql-2.1.sql" fixlastline="yes">
+ <fileset dir="${sql.src.dir}" >
+ <include name="create.active_sla.sql"/>
+ <include name="update.*.2.1.sql"/>
+ </fileset>
+ </concat>
+ </target>
+
+ <target name="create-update-script-2.2" description="Prepare the creation of the Azkaban Scripts">
+ <!-- 2.2 added the properties table -->
+ <concat destfile="${dist.sql.package.dir}/update-all-sql-2.2.sql" fixlastline="yes">
+ <fileset dir="${sql.src.dir}" >
+ <include name="create.properties.sql"/>
+ <include name="update.*.2.2.sql"/>
+ </fileset>
+ </concat>
+ </target>
+
<target name="package-sql-scripts" description="Creates a package of sql">
<delete dir="${dist.sql.package.dir}" />
<mkdir dir="${dist.sql.package.dir}" />
<concat destfile="${dist.sql.package.dir}/create-all-sql-${version}.sql" fixlastline="yes">
<fileset dir="${sql.src.dir}" >
- <exclude name="update*.sql"/>
+ <exclude name="update.*.sql"/>
+ <exclude name="database.properties"/>
</fileset>
</concat>
+ <!-- Collect various update scripts. -->
+ <!-- Not sure how to do this better yet. -->
+ <antcall target="create-update-script-2.1"></antcall>
+ <antcall target="create-update-script-2.2"></antcall>
+ <!-- End script collection-->
+
<copy todir="${dist.sql.package.dir}" >
<fileset dir="${sql.src.dir}" />
</copy>
+ <echo file="${dist.sql.package.dir}/database.properties" append="true">version=${version}</echo>
- <tar destfile="${dist.sql.package.dir}/${name}-sql-script-${version}.tar.gz" compression="gzip" longfile="gnu">
- <tarfileset dir="${dist.sql.package.dir}" prefix="azkaban-${version}" filemode="755" />
+ <tar destfile="${dist.sql.package.dir}/${name}-sql-script-${version}.tar.gz" compression="gzip" longfile="gnu">
+ <tarfileset dir="${dist.sql.package.dir}" prefix="azkaban-${version}" filemode="755" />
</tar>
-
</target>
<target name="package-web-server" depends="jars" description="Creates a package for the webserver">
@@ -107,7 +148,7 @@
<!-- Copy bin files for web server only-->
<copy todir="${dist.web.package.dir}/bin" >
- <fileset dir="${bin.dir}" includes="**/azkaban-web*"/>
+ <fileset dir="${web.package.dir}/bin"/>
</copy>
<!-- Copy web files -->
@@ -117,12 +158,16 @@
<!-- Copy conf create table scripts -->
<copy todir="${dist.web.package.dir}/conf" >
- <fileset dir="${conf.dir}" />
+ <fileset dir="${web.package.dir}/conf" />
</copy>
<!-- Tarball it -->
<tar destfile="${dist.web.package.dir}/${name}-web-server-${version}.tar.gz" compression="gzip" longfile="gnu">
- <tarfileset dir="${dist.web.package.dir}" prefix="azkaban-${version}" filemode="755" />
+ <tarfileset dir="${dist.web.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+
+ <tarfileset dir="${dist.web.package.dir}" prefix="azkaban-${version}" includes="**">
+ <exclude name="bin/*"/>
+ </tarfileset>
</tar>
</target>
@@ -145,20 +190,73 @@
<!-- Copy bin files for exec server only-->
<copy todir="${dist.exec.package.dir}/bin" >
- <fileset dir="${bin.dir}" includes="**/azkaban-executor*"/>
+ <fileset dir="${exec.package.dir}/bin"/>
</copy>
<!-- Copy conf files -->
<copy todir="${dist.exec.package.dir}/conf" >
- <fileset dir="${conf.dir}" />
+ <fileset dir="${exec.package.dir}/conf" />
</copy>
<!-- Tarball it -->
<tar destfile="${dist.exec.package.dir}/${name}-executor-server-${version}.tar.gz" compression="gzip" longfile="gnu">
- <tarfileset dir="${dist.exec.package.dir}" prefix="azkaban-${version}" filemode="755" />
+ <tarfileset dir="${dist.exec.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+
+ <tarfileset dir="${dist.exec.package.dir}" prefix="azkaban-${version}" includes="**">
+ <exclude name="bin/*"/>
+ </tarfileset>
+ </tar>
+ </target>
+
+ <target name="package-solo-server" depends="jars" description="Creates a package for the solo server">
+ <delete dir="${dist.solo.package.dir}" />
+ <mkdir dir="${dist.solo.package.dir}" />
+ <mkdir dir="${dist.solo.package.dir}/conf" />
+ <mkdir dir="${dist.solo.package.dir}/bin" />
+ <mkdir dir="${dist.solo.package.dir}/lib" />
+ <mkdir dir="${dist.solo.package.dir}/plugins" />
+ <mkdir dir="${dist.solo.package.dir}/extlib" />
+ <mkdir dir="${dist.solo.package.dir}/sql" />
+
+ <!-- Copy Azkaban jars and libs-->
+ <copy file="${azkaban.jar}" todir="${dist.solo.package.dir}/lib" />
+ <copy todir="${dist.solo.package.dir}/lib" >
+ <fileset dir="${lib.dir}" >
+ <exclude name="hadoop-core*.jar"/>
+ </fileset>
+ </copy>
+
+ <!-- Copy bin files for exec server only-->
+ <copy todir="${dist.solo.package.dir}/bin" >
+ <fileset dir="${solo.package.dir}/bin"/>
+ </copy>
+
+ <!-- Copy conf files -->
+ <copy todir="${dist.solo.package.dir}/conf" >
+ <fileset dir="${solo.package.dir}/conf" />
+ </copy>
+
+ <!-- Copy web files -->
+ <copy todir="${dist.solo.package.dir}/web" >
+ <fileset dir="${web.src.dir}" />
+ </copy>
+
+ <!-- Copy sql files -->
+ <copy todir="${dist.solo.package.dir}/sql" >
+ <fileset dir="${sql.src.dir}" />
+ </copy>
+ <echo file="${dist.solo.package.dir}/sql/database.properties" append="true">version=${version}</echo>
+
+ <!-- Tarball it -->
+ <tar destfile="${dist.solo.package.dir}/${name}-solo-server-${version}.tar.gz" compression="gzip" longfile="gnu">
+ <tarfileset dir="${dist.solo.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+
+ <tarfileset dir="${dist.solo.package.dir}" prefix="azkaban-${version}" includes="**">
+ <exclude name="bin/*"/>
+ </tarfileset>
</tar>
</target>
- <target name="package-all" depends="package-exec-server, package-web-server, package-sql-scripts" description="Create all packages">
+ <target name="package-all" depends="package-exec-server, package-web-server, package-solo-server, package-sql-scripts" description="Create all packages">
</target>
</project>
diff --git a/src/java/azkaban/database/AzkabanDatabaseSetup.java b/src/java/azkaban/database/AzkabanDatabaseSetup.java
index 24adcfe..c42ad33 100644
--- a/src/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/src/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -15,6 +15,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
@@ -26,7 +27,10 @@ import azkaban.utils.Props;
public class AzkabanDatabaseSetup {
private static final Logger logger = Logger.getLogger(AzkabanDatabaseSetup.class);
- private static final String SCRIPT_PATH_PARAM = "sql.script.path";
+ public static final String DATABASE_CHECK_VERSION = "database.check.version";
+ public static final String DATABASE_AUTO_UPDATE_TABLES = "database.auto.update.tables";
+ public static final String DATABASE_SQL_SCRIPT_DIR = "database.sql.scripts.dir";
+
private static final String DEFAULT_SCRIPT_PATH = "sql";
private static final String CREATE_SCRIPT_PREFIX = "create.";
private static final String UPDATE_SCRIPT_PREFIX = "update.";
@@ -37,14 +41,19 @@ public class AzkabanDatabaseSetup {
private static final String UPDATE_DB_PROPERTY = "UPDATE properties SET value=?,modified_time=? WHERE name=? AND type=?";
private AzkabanDataSource dataSource;
- private Map<String, String> tables = new HashMap<String, String>();
- private Map<String, String> installedVersions = new HashMap<String, String>();
+ private Map<String, String> tables;
+ private Map<String, String> installedVersions;
+ private Set<String> missingTables;
+ private Map<String, List<String>> upgradeList;
+ private Props dbProps;
+ private String version;
+ private boolean needsUpdating;
private String scriptPath = null;
public AzkabanDatabaseSetup(Props props) {
this(DataSourceUtils.getDataSource(props));
- this.scriptPath = props.getString(SCRIPT_PATH_PARAM, DEFAULT_SCRIPT_PATH);
+ this.scriptPath = props.getString(DATABASE_SQL_SCRIPT_DIR, DEFAULT_SCRIPT_PATH);
}
public AzkabanDatabaseSetup(AzkabanDataSource ds) {
@@ -54,10 +63,32 @@ public class AzkabanDatabaseSetup {
}
}
- public void checkTableVersion(boolean autoCreate, boolean autoUpdate) throws IOException, SQLException {
+ public void loadTableInfo() throws IOException, SQLException {
+ tables = new HashMap<String, String>();
+ installedVersions = new HashMap<String, String>();
+ missingTables = new HashSet<String>();
+ upgradeList = new HashMap<String, List<String>>();
+
+ dbProps = loadDBProps();
+ version = dbProps.getString("version");
+
loadInstalledTables();
- // Loads from the table properties
loadTableVersion();
+ findMissingTables();
+ findOutOfDateTables();
+
+ needsUpdating = !upgradeList.isEmpty() || !missingTables.isEmpty();
+ }
+
+ public boolean needsUpdating() {
+ if (version == null) {
+ throw new RuntimeException("Uninitialized. Call loadTableInfo first.");
+ }
+
+ return needsUpdating;
+ }
+
+ public void printUpgradePlan() {
if (!tables.isEmpty()) {
logger.info("The following are installed tables");
for (Map.Entry<String, String> installedTable: tables.entrySet()) {
@@ -65,25 +96,19 @@ public class AzkabanDatabaseSetup {
}
}
else {
- logger.info("No Installed tables found.");
+ logger.info("No installed tables found.");
}
-
- Props dbProps = loadDBProps();
- String version = dbProps.getString("version");
- logger.info("The current version of Azkaban DB is " + version);
- Set<String> missingTables = findMissingTables();
if (!missingTables.isEmpty()) {
- logger.info("The following tables need to be created.");
+ logger.info("The following are missing tables that need to be installed");
for (String table: missingTables) {
logger.info(" " + table);
}
}
else {
- logger.info("No tables need to be created");
+ logger.info("There are no missing tables.");
}
- Map<String, List<String>> upgradeList = findOutOfDateTables();
if (!upgradeList.isEmpty()) {
logger.info("The following tables need to be updated.");
for (Map.Entry<String, List<String>> upgradeTable: upgradeList.entrySet()) {
@@ -98,12 +123,20 @@ public class AzkabanDatabaseSetup {
else {
logger.info("No tables need to be updated.");
}
+ }
+
+ public void updateDatabase(boolean createTable, boolean updateTable) throws SQLException, IOException {
+ // We call this because it has an unitialize check.
+ if (!needsUpdating()) {
+ logger.info("Nothing to be done.");
+ return;
+ }
- if (autoCreate && !missingTables.isEmpty()) {
- createNewTables(missingTables, version);
+ if (createTable && !missingTables.isEmpty()) {
+ createNewTables();
}
- if (autoUpdate && !upgradeList.isEmpty()) {
- updateTables(upgradeList);
+ if (updateTable && !upgradeList.isEmpty()) {
+ updateTables();
}
}
@@ -152,12 +185,11 @@ public class AzkabanDatabaseSetup {
}
}
finally {
- conn.close();
+ DbUtils.commitAndCloseQuietly(conn);
}
}
- private Set<String> findMissingTables() {
- HashSet<String> missingTables = new HashSet<String>();
+ private void findMissingTables() {
File directory = new File(scriptPath);
File[] createScripts = directory.listFiles(new FileIOUtils.PrefixSuffixFileFilter(CREATE_SCRIPT_PREFIX, SQL_SCRIPT_SUFFIX));
@@ -170,23 +202,17 @@ public class AzkabanDatabaseSetup {
missingTables.add(tableName);
}
}
-
- return missingTables;
}
- private Map<String, List<String>> findOutOfDateTables() {
- Map<String, List<String>> tablesToUpgrade = new HashMap<String, List<String>>();
-
+ private void findOutOfDateTables() {
for (String key : tables.keySet()) {
String version = tables.get(key);
List<String> upgradeVersions = findOutOfDateTable(key, version);
if (upgradeVersions != null && !upgradeVersions.isEmpty()) {
- tablesToUpgrade.put(key, upgradeVersions);
+ upgradeList.put(key, upgradeVersions);
}
}
-
- return tablesToUpgrade;
}
private List<String> findOutOfDateTable(String table, String version) {
@@ -228,7 +254,7 @@ public class AzkabanDatabaseSetup {
return versions;
}
- public void createNewTables(Set<String> missingTables, String version) throws SQLException, IOException {
+ private void createNewTables() throws SQLException, IOException {
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
try {
@@ -247,19 +273,19 @@ public class AzkabanDatabaseSetup {
}
}
- public void updateTables(Map<String, List<String>> updateTables) throws SQLException, IOException {
+ private void updateTables() throws SQLException, IOException {
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
try {
// Make sure that properties table is created first.
- if (updateTables.containsKey("properties")) {
- for (String version: updateTables.get("properties")) {
+ if (upgradeList.containsKey("properties")) {
+ for (String version: upgradeList.get("properties")) {
runTableScripts(conn, "properties", version, dataSource.getDBType(), true);
}
}
- for (String table: updateTables.keySet()) {
+ for (String table: upgradeList.keySet()) {
if (!table.equals("properties")) {
- for (String version: updateTables.get(table)) {
+ for (String version: upgradeList.get(table)) {
runTableScripts(conn, table, version, dataSource.getDBType(), true);
}
}
diff --git a/src/java/azkaban/database/AzkabanDatabaseUpdater.java b/src/java/azkaban/database/AzkabanDatabaseUpdater.java
new file mode 100644
index 0000000..1a37170
--- /dev/null
+++ b/src/java/azkaban/database/AzkabanDatabaseUpdater.java
@@ -0,0 +1,82 @@
+package azkaban.database;
+
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+import org.apache.log4j.Logger;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.utils.Props;
+import azkaban.webapp.AzkabanServer;
+
+public class AzkabanDatabaseUpdater {
+ private static final Logger logger = Logger.getLogger(AzkabanDatabaseUpdater.class);
+
+ public static void main(String[] args) throws Exception {
+ OptionParser parser = new OptionParser();
+
+ OptionSpec<String> scriptDirectory = parser
+ .acceptsAll(Arrays.asList("s", "script"), "Directory of update scripts.")
+ .withRequiredArg()
+ .describedAs("script").ofType(String.class);
+
+ OptionSpec<Void> updateOption =
+ parser.acceptsAll(Arrays.asList("u", "update"), "Will update if necessary");
+
+ Props props = AzkabanServer.loadProps(args, parser);
+
+ if (props == null) {
+ logger.error("Properties not found. Need it to connect to the db.");
+ logger.error("Exiting...");
+ return;
+ }
+
+ OptionSet options = parser.parse(args);
+ boolean updateDB = false;
+ if (options.has(updateOption)) {
+ updateDB = true;
+ }
+ else {
+ logger.info("Running DatabaseUpdater in test mode");
+ }
+
+ String scriptDir = "sql";
+ if (options.has(scriptDirectory)) {
+ scriptDir = options.valueOf(scriptDirectory);
+ }
+
+ runDatabaseUpdater(props, scriptDir, updateDB);
+ }
+
+ public static void runDatabaseUpdater(Props props, String sqlDir, boolean updateDB) throws IOException, SQLException {
+ logger.info("Use scripting directory " + sqlDir);
+
+ if (updateDB) {
+ logger.info("Will auto update any changes.");
+ }
+ else {
+ logger.info("Running DatabaseUpdater in test mode. Use -u to update");
+ }
+
+ AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(props);
+ setup.loadTableInfo();
+ if (!setup.needsUpdating()) {
+ logger.info("Everything looks up to date.");
+ return;
+ }
+
+ logger.info("Need to update the db.");
+ setup.printUpgradePlan();
+
+ if (updateDB) {
+ logger.info("Updating DB");
+ setup.updateDatabase(true,true);
+ }
+ }
+}
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index d4dffef..fb67970 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -21,7 +21,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
@@ -44,12 +43,9 @@ import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.utils.Props;
import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanServer;
import azkaban.webapp.servlet.AzkabanServletContextListener;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-
public class AzkabanExecutorServer {
private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
private static final int MAX_FORM_CONTENT_SIZE = 10*1024*1024;
@@ -166,35 +162,8 @@ public class AzkabanExecutorServer {
* @throws IOException
*/
public static void main(String[] args) throws Exception {
- OptionParser parser = new OptionParser();
-
- OptionSpec<String> configDirectory = parser
- .acceptsAll(Arrays.asList("c", "conf"),
- "The conf directory for Azkaban.").withRequiredArg()
- .describedAs("conf").ofType(String.class);
-
logger.error("Starting Jetty Azkaban Executor...");
-
- // Grabbing the azkaban settings from the conf directory.
- Props azkabanSettings = null;
- OptionSet options = parser.parse(args);
- if (options.has(configDirectory)) {
- String path = options.valueOf(configDirectory);
- logger.info("Loading azkaban settings file from " + path);
- File confDir = new File(path);
- if (!confDir.exists()) {
- logger.error("Conf directory " + path + " doesn't exist.");
- }
- else if (!confDir.isDirectory()) {
- logger.error("Conf directory " + path + " isn't a directory.");
- }
- else {
- azkabanSettings = loadAzkabanConfigurationFromDirectory(confDir);
- }
- } else {
- logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
- azkabanSettings = loadConfigurationFromAzkabanHome();
- }
+ Props azkabanSettings = AzkabanServer.loadProps(args);
if (azkabanSettings == null) {
logger.error("Azkaban Properties not loaded.");
src/java/azkaban/executor/ExecutorMailer.java 64(+23 -41)
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index a49b4fb..c6c6f41 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -8,33 +8,19 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.utils.AbstractMailer;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
import azkaban.utils.Utils;
-public class ExecutorMailer {
+public class ExecutorMailer extends AbstractMailer {
private static Logger logger = Logger.getLogger(ExecutorMailer.class);
private boolean testMode = false;
- private String clientHostname;
- private String clientPortNumber;
-
- private String mailHost;
- private String mailUser;
- private String mailPassword;
- private String mailSender;
- private String azkabanName;
public ExecutorMailer(Props props) {
- this.azkabanName = props.getString("azkaban.name", "azkaban");
- this.mailHost = props.getString("mail.host", "localhost");
- this.mailUser = props.getString("mail.user", "");
- this.mailPassword = props.getString("mail.password", "");
- this.mailSender = props.getString("mail.sender", "");
-
- this.clientHostname = props.getString("jetty.hostname", "localhost");
- this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-
+ super(props);
+
testMode = props.getBoolean("test.mode", false);
}
@@ -44,13 +30,12 @@ public class ExecutorMailer {
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+ EmailMessage message = super.createEmailMessage(
+ "Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(),
+ "text/html",
+ emailList);
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + getAzkabanName() + "</h2>");
if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
message.println("This flow is set to cancel all currently running jobs.");
@@ -68,7 +53,7 @@ public class ExecutorMailer {
message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
message.println("</table>");
message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
message.println("");
@@ -98,20 +83,20 @@ public class ExecutorMailer {
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+ EmailMessage message = super.createEmailMessage(
+ "Flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName(),
+ "text/html",
+ emailList);
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + getAzkabanName() + "</h2>");
message.println("<table>");
message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
message.println("</table>");
message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+
+ String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
message.println("");
@@ -127,8 +112,6 @@ public class ExecutorMailer {
message.println("</ul>");
-
-
if (!testMode) {
try {
message.sendEmail();
@@ -146,20 +129,19 @@ public class ExecutorMailer {
int execId = flow.getExecutionId();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+ EmailMessage message = super.createEmailMessage(
+ "Flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName(),
+ "text/html",
+ emailList);
- message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+ message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + getAzkabanName() + "</h2>");
message.println("<table>");
message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
message.println("</table>");
message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
if (!testMode) {
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 0526c4a..1a0b25d 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -66,13 +66,16 @@ public class JobTypeManager
loadDefaultTypes();
if(jobtypePluginDir != null) {
+ File pluginDir = new File(jobtypePluginDir);
+ if (pluginDir.exists()) {
logger.info("job type plugin directory set. Loading extra job types.");
- try {
- loadPluginJobTypes();
- }
- catch (Exception e) {
- logger.info("Plugin jobtypes failed to load. " + e.getCause());
- throw new JobTypeManagerException(e);
+ try {
+ loadPluginJobTypes();
+ }
+ catch (Exception e) {
+ logger.info("Plugin jobtypes failed to load. " + e.getCause());
+ throw new JobTypeManagerException(e);
+ }
}
}
src/java/azkaban/sla/SlaMailer.java 35(+7 -28)
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index 8916e71..086c234 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -7,35 +7,18 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
import azkaban.sla.SLA;
+import azkaban.utils.AbstractMailer;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
-import azkaban.utils.Utils;
-public class SlaMailer {
+public class SlaMailer extends AbstractMailer {
private static Logger logger = Logger.getLogger(SlaMailer.class);
private boolean testMode = false;
- @SuppressWarnings("unused")
- private String clientHostname;
- @SuppressWarnings("unused")
- private String clientPortNumber;
-
- private String mailHost;
- private String mailUser;
- private String mailPassword;
- private String mailSender;
- private String azkabanName;
public SlaMailer(Props props) {
- this.azkabanName = props.getString("azkaban.name", "azkaban");
- this.mailHost = props.getString("mail.host", "localhost");
- this.mailUser = props.getString("mail.user", "");
- this.mailPassword = props.getString("mail.password", "");
- this.mailSender = props.getString("mail.sender", "");
-
- this.clientHostname = props.getString("jetty.hostname", "localhost");
- this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-
+ super(props);
+
testMode = props.getBoolean("test.mode", false);
}
@@ -43,12 +26,8 @@ public class SlaMailer {
List<String> emailList = s.getEmails();
if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("SLA violation on " + azkabanName);
-
+ EmailMessage message = super.createEmailMessage("SLA violation on " + getAzkabanName(), "text/html", emailList);
+
// message.println("<h2 style=\"color:#FF0000\"> Execution '" + s.getExecId() + "' of flow '" + flow.getFlowId() + "' failed to meet SLA on " + azkabanName + "</h2>");
// message.println("<table>");
// message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
@@ -56,7 +35,7 @@ public class SlaMailer {
// message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
// message.println("</table>");
// message.println("");
-// String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+// String executionUrl = super.getReferenceURL() + "executor?" + "execid=" + execId;
// message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
//
// message.println("");
src/java/azkaban/utils/AbstractMailer.java 70(+70 -0)
diff --git a/src/java/azkaban/utils/AbstractMailer.java b/src/java/azkaban/utils/AbstractMailer.java
new file mode 100644
index 0000000..85f589c
--- /dev/null
+++ b/src/java/azkaban/utils/AbstractMailer.java
@@ -0,0 +1,70 @@
+package azkaban.utils;
+
+import java.util.Collection;
+
+public class AbstractMailer {
+ private String clientHostname;
+ private int clientPort;
+ private boolean usesSSL;
+
+ private String mailHost;
+ private String mailUser;
+ private String mailPassword;
+ private String mailSender;
+ private String azkabanName;
+
+ private String referenceURL;
+
+ public AbstractMailer(Props props) {
+ this.azkabanName = props.getString("azkaban.name", "azkaban");
+ this.mailHost = props.getString("mail.host", "localhost");
+ this.mailUser = props.getString("mail.user", "");
+ this.mailPassword = props.getString("mail.password", "");
+ this.mailSender = props.getString("mail.sender", "");
+
+ this.clientHostname = props.get("server.hostname");
+ this.clientPort = props.getInt("server.port");
+ this.usesSSL = props.getBoolean("server.useSSL");
+
+ if (usesSSL) {
+ referenceURL = "https://" + clientHostname + (clientPort==443 ? "/" : ":" + clientPort + "/");
+ }
+ else {
+ referenceURL = "http://" + clientHostname + (clientPort==80 ? "/" : ":" + clientPort + "/");
+ }
+ }
+
+ public String getReferenceURL() {
+ return referenceURL;
+ }
+
+ protected EmailMessage createEmailMessage(String subject, String mimetype, Collection<String> emailList) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType(mimetype);
+ message.setSubject(subject);
+
+ return message;
+ }
+
+ public String getAzkabanName() {
+ return azkabanName;
+ }
+
+ public String getMailHost() {
+ return mailHost;
+ }
+
+ public String getMailUser() {
+ return mailUser;
+ }
+
+ public String getMailPassword() {
+ return mailPassword;
+ }
+
+ public String getMailSender() {
+ return mailSender;
+ }
+}
src/java/azkaban/webapp/AzkabanServer.java 115(+110 -5)
diff --git a/src/java/azkaban/webapp/AzkabanServer.java b/src/java/azkaban/webapp/AzkabanServer.java
index b88c9a8..5da23ee 100644
--- a/src/java/azkaban/webapp/AzkabanServer.java
+++ b/src/java/azkaban/webapp/AzkabanServer.java
@@ -1,17 +1,122 @@
package azkaban.webapp;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import azkaban.user.UserManager;
import azkaban.utils.Props;
import azkaban.webapp.session.SessionCache;
-public interface AzkabanServer {
- public Props getServerProps();
+
+public abstract class AzkabanServer {
+ private static final Logger logger = Logger.getLogger(AzkabanServer.class);
+ public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+ public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
+ public static final String DEFAULT_CONF_PATH = "conf";
+
+ public static Props loadProps(String[] args) {
+ return loadProps(args, new OptionParser());
+ }
+
+ public static Props loadProps(String[] args, OptionParser parser) {;
+ OptionSpec<String> configDirectory = parser
+ .acceptsAll(Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
+ .withRequiredArg()
+ .describedAs("conf").ofType(String.class);
+
+ // Grabbing the azkaban settings from the conf directory.
+ Props azkabanSettings = null;
+ OptionSet options = parser.parse(args);
+
+ if (options.has(configDirectory)) {
+ String path = options.valueOf(configDirectory);
+ logger.info("Loading azkaban settings file from " + path);
+ File dir = new File(path);
+ if (!dir.exists()) {
+ logger.error("Conf directory " + path + " doesn't exist.");
+ }
+ else if (!dir.isDirectory()) {
+ logger.error("Conf directory " + path + " isn't a directory.");
+ }
+ else {
+ azkabanSettings = loadAzkabanConfigurationFromDirectory(dir);
+ }
+ }
+ else {
+ logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
+ azkabanSettings = loadConfigurationFromAzkabanHome();
+ }
+
+ return azkabanSettings;
+ }
+
+ private static Props loadAzkabanConfigurationFromDirectory(File dir) {
+ File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+
+ Props props = null;
+ try {
+ // This is purely optional
+ if (azkabanPrivatePropsFile.exists() && azkabanPrivatePropsFile.isFile()) {
+ logger.info("Loading azkaban private properties file" );
+ props = new Props(null, azkabanPrivatePropsFile);
+ }
+
+ if (azkabanPropsFile.exists() && azkabanPropsFile.isFile()) {
+ logger.info("Loading azkaban properties file" );
+ props = new Props(props, azkabanPropsFile);
+ }
+ } catch (FileNotFoundException e) {
+ logger.error("File not found. Could not load azkaban config file", e);
+ } catch (IOException e) {
+ logger.error("File found, but error reading. Could not load azkaban config file", e);
+ }
+
+ return props;
+ }
- public VelocityEngine getVelocityEngine();
+ /**
+ * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+ *
+ * @return
+ */
+ private static Props loadConfigurationFromAzkabanHome() {
+ String azkabanHome = System.getenv("AZKABAN_HOME");
+
+ if (azkabanHome == null) {
+ logger.error("AZKABAN_HOME not set. Will try default.");
+ return null;
+ }
+
+ if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
+ logger.error(azkabanHome + " is not a readable directory.");
+ return null;
+ }
+
+ File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+ if (!confPath.exists() || !confPath.isDirectory()
+ || !confPath.canRead()) {
+ logger.error(azkabanHome + " does not contain a readable conf directory.");
+ return null;
+ }
- public SessionCache getSessionCache();
+ return loadAzkabanConfigurationFromDirectory(confPath);
+ }
+
+ public abstract Props getServerProps();
+
+ public abstract SessionCache getSessionCache();
+
+ public abstract VelocityEngine getVelocityEngine();
- public UserManager getUserManager();
+ public abstract UserManager getUserManager();
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanSingleServer.java b/src/java/azkaban/webapp/AzkabanSingleServer.java
index 45bef7b..b08139d 100644
--- a/src/java/azkaban/webapp/AzkabanSingleServer.java
+++ b/src/java/azkaban/webapp/AzkabanSingleServer.java
@@ -3,13 +3,31 @@ package azkaban.webapp;
import org.apache.log4j.Logger;
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.execapp.AzkabanExecutorServer;
+import azkaban.utils.Props;
public class AzkabanSingleServer {
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
public static void main(String[] args) throws Exception {
logger.info("Starting Azkaban Server");
+ Props props = AzkabanServer.loadProps(args);
+ if (props == null) {
+ logger.error("Properties not found. Need it to connect to the db.");
+ logger.error("Exiting...");
+ return;
+ }
+
+ boolean checkversion = props.getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, true);
+
+ if (checkversion) {
+ boolean updateDB = props.getBoolean(AzkabanDatabaseSetup.DATABASE_AUTO_UPDATE_TABLES, true);
+ String scriptDir = props.getString(AzkabanDatabaseSetup.DATABASE_SQL_SCRIPT_DIR, "sql");
+ AzkabanDatabaseUpdater.runDatabaseUpdater(props, scriptDir, updateDB);
+ }
+
AzkabanWebServer.main(args);
logger.info("Azkaban Web Server started...");
AzkabanExecutorServer.main(args);
src/java/azkaban/webapp/AzkabanWebServer.java 71(+29 -42)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 24501c7..edbf237 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -25,7 +25,6 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -50,7 +49,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
-
+import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
@@ -83,10 +82,6 @@ import azkaban.webapp.servlet.ProjectManagerServlet;
import azkaban.webapp.servlet.ViewerPlugin;
import azkaban.webapp.session.SessionCache;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-
/**
* The Azkaban Jetty server class
*
@@ -107,14 +102,13 @@ import joptsimple.OptionSpec;
* keystore password jetty.truststore - Jetty truststore jetty.trustpassword -
* Jetty truststore password
*/
-public class AzkabanWebServer implements AzkabanServer {
+public class AzkabanWebServer extends AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
public static final String AZKABAN_HOME = "AZKABAN_HOME";
public static final String DEFAULT_CONF_PATH = "conf";
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
- public static final String JDO_PROPERTIES_FILE = "jdo.properties";
private static final int MAX_FORM_CONTENT_SIZE = 10*1024*1024;
private static final int MAX_HEADER_BUFFER_SIZE = 10*1024*1024;
@@ -129,7 +123,7 @@ public class AzkabanWebServer implements AzkabanServer {
private static final String DEFAULT_STATIC_DIR = "";
private final VelocityEngine velocityEngine;
-
+
private final Server server;
private UserManager userManager;
private ProjectManager projectManager;
@@ -359,36 +353,8 @@ public class AzkabanWebServer implements AzkabanServer {
* @param args
*/
public static void main(String[] args) throws Exception {
- OptionParser parser = new OptionParser();
-
- OptionSpec<String> configDirectory = parser
- .acceptsAll(Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
- .withRequiredArg()
- .describedAs("conf").ofType(String.class);
-
- logger.error("Starting Jetty Azkaban...");
-
- // Grabbing the azkaban settings from the conf directory.
- Props azkabanSettings = null;
- OptionSet options = parser.parse(args);
- if (options.has(configDirectory)) {
- String path = options.valueOf(configDirectory);
- logger.info("Loading azkaban settings file from " + path);
- File dir = new File(path);
- if (!dir.exists()) {
- logger.error("Conf directory " + path + " doesn't exist.");
- }
- else if (!dir.isDirectory()) {
- logger.error("Conf directory " + path + " isn't a directory.");
- }
- else {
- azkabanSettings = loadAzkabanConfigurationFromDirectory(dir);
- }
- }
- else {
- logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
- azkabanSettings = loadConfigurationFromAzkabanHome();
- }
+ logger.error("Starting Jetty Azkaban Executor...");
+ Props azkabanSettings = AzkabanServer.loadProps(args);
if (azkabanSettings == null) {
logger.error("Azkaban Properties not loaded.");
@@ -397,13 +363,14 @@ public class AzkabanWebServer implements AzkabanServer {
}
int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
+
+ boolean ssl;
int port;
- boolean usingSSL = false;
final Server server = new Server();
if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port", DEFAULT_SSL_PORT_NUMBER);
port = sslPortNumber;
- usingSSL = true;
+ ssl = true;
logger.info("Setting up Jetty Https Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
SslSocketConnector secureConnector = new SslSocketConnector();
@@ -418,14 +385,34 @@ public class AzkabanWebServer implements AzkabanServer {
server.addConnector(secureConnector);
}
else {
+ ssl = false;
port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
SocketConnector connector = new SocketConnector();
connector.setPort(port);
connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
server.addConnector(connector);
}
+
+ String hostname = azkabanSettings.getString("jetty.hostname", "localhost");
+ azkabanSettings.put("server.hostname", hostname);
+ azkabanSettings.put("server.port", port);
+ azkabanSettings.put("server.useSSL", String.valueOf(ssl));
+
app = new AzkabanWebServer(server, azkabanSettings);
+ boolean checkDB = azkabanSettings.getBoolean(AzkabanDatabaseSetup.DATABASE_CHECK_VERSION, false);
+ if (checkDB) {
+ AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(azkabanSettings);
+ setup.loadTableInfo();
+ if(setup.needsUpdating()) {
+ logger.error("Database is out of date.");
+ setup.printUpgradePlan();
+
+ logger.error("Exiting with error.");
+ System.exit(-1);
+ }
+ }
+
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
server.setThreadPool(httpThreadPool);
@@ -478,7 +465,7 @@ public class AzkabanWebServer implements AzkabanServer {
logger.info("kk thx bye.");
}
});
- logger.info("Server running on " + (usingSSL ? "ssl" : "") + " port " + port + ".");
+ logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port + ".");
}
private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
diff --git a/src/package/execserver/bin/azkaban-executor-shutdown.sh b/src/package/execserver/bin/azkaban-executor-shutdown.sh
old mode 100644
new mode 100755
index 21fea3c..76fd200
--- a/src/package/execserver/bin/azkaban-executor-shutdown.sh
+++ b/src/package/execserver/bin/azkaban-executor-shutdown.sh
@@ -1,10 +1,11 @@
#!/bin/bash
+azkaban_dir=$(dirname $0)/..
-executorport=`cat conf/azkaban.properties | grep executor.port | cut -d = -f 2`
+executorport=`cat $azkaban_dir/conf/azkaban.properties | grep executor.port | cut -d = -f 2`
echo "Shutting down current running AzkabanExecutorServer at port $executorport"
-proc=`cat currentpid`
+proc=`cat $azkaban_dir/currentpid`
kill $proc
-cat /dev/null > currentpid
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/execserver/bin/azkaban-executor-start.sh b/src/package/execserver/bin/azkaban-executor-start.sh
old mode 100644
new mode 100755
index 56c7dc9..29f4241
--- a/src/package/execserver/bin/azkaban-executor-start.sh
+++ b/src/package/execserver/bin/azkaban-executor-start.sh
@@ -1,10 +1,7 @@
azkaban_dir=$(dirname $0)/..
-base_dir=$1
-tmpdir=
if [[ -z "$tmpdir" ]]; then
-echo "temp directory must be set!"
-exit
+tmpdir=temp
fi
for file in $azkaban_dir/lib/*.jar;
@@ -17,16 +14,15 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/plugins/*/*.jar;
+for file in $azkaban_dir/plugins/*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
echo $azkaban_dir;
-echo $base_dir;
echo $CLASSPATH;
-executorport=`cat conf/azkaban.properties | grep executor.port | cut -d = -f 2`
+executorport=`cat $azkaban_dir/conf/azkaban.properties | grep executor.port | cut -d = -f 2`
echo "Starting AzkabanExecutorServer on port $executorport ..."
serverpath=`pwd`
@@ -35,7 +31,7 @@ if [ -z $AZKABAN_OPTS ]; then
fi
AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
-java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanExecutorServer -conf $base_dir/conf $@ &
+java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanExecutorServer -conf $azkaban_dir/conf $@ &
echo $! > currentpid
diff --git a/src/package/soloserver/bin/azkaban-solo-shutdown.sh b/src/package/soloserver/bin/azkaban-solo-shutdown.sh
old mode 100644
new mode 100755
index 75efe2e..e97bc32
--- a/src/package/soloserver/bin/azkaban-solo-shutdown.sh
+++ b/src/package/soloserver/bin/azkaban-solo-shutdown.sh
@@ -1,6 +1,8 @@
#!/bin/bash
-proc=`cat currentpid`
+azkaban_dir=$(dirname $0)/..
+
+proc=`cat $azkaban_dir/currentpid`
echo "killing AzkabanSingleServer"
kill $proc
-cat /dev/null > currentpid
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/soloserver/bin/azkaban-solo-start.sh b/src/package/soloserver/bin/azkaban-solo-start.sh
old mode 100644
new mode 100755
index 54704a9..4c0d983
--- a/src/package/soloserver/bin/azkaban-solo-start.sh
+++ b/src/package/soloserver/bin/azkaban-solo-start.sh
@@ -1,10 +1,7 @@
azkaban_dir=$(dirname $0)/..
-base_dir=$1
-tmpdir=
if [[ -z "$tmpdir" ]]; then
-echo "temp directory must be set!"
-exit
+tmpdir=temp
fi
for file in $azkaban_dir/lib/*.jar;
@@ -17,16 +14,15 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/plugins/*/*.jar;
+for file in $azkaban_dir/plugins/*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
echo $azkaban_dir;
-echo $base_dir;
echo $CLASSPATH;
-executorport=`cat conf/azkaban.properties | grep executor.port | cut -d = -f 2`
+executorport=`cat $azkaban_dir/conf/azkaban.properties | grep executor.port | cut -d = -f 2`
serverpath=`pwd`
if [ -z $AZKABAN_OPTS ]; then
@@ -34,7 +30,7 @@ if [ -z $AZKABAN_OPTS ]; then
fi
AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
-java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanSingleServer -conf $base_dir/conf $@ &
+java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanSingleServer -conf $azkaban_dir/conf $@ &
echo $! > currentpid
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index df46e5d..ac36e26 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -13,8 +13,9 @@ user.manager.xml.file=conf/azkaban-users.xml
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
-#H2 DB setup
-database.auto.create.tables=true
+database.sql.scripts.dir=sql
+database.check.version=true
+database.auto.update.tables=true
database.type=h2
h2.path=data/azkaban
h2.create.tables=true
diff --git a/src/package/webserver/bin/azkaban-web-shutdown.sh b/src/package/webserver/bin/azkaban-web-shutdown.sh
old mode 100644
new mode 100755
index 662e4a6..4dde70a
--- a/src/package/webserver/bin/azkaban-web-shutdown.sh
+++ b/src/package/webserver/bin/azkaban-web-shutdown.sh
@@ -1,6 +1,8 @@
+azkaban_dir=$(dirname $0)/..
+
#!/bin/bash
-proc=`cat currentpid`
+proc=`cat $azkaban_dir/currentpid`
echo "killing AzkabanWebServer"
kill $proc
-cat /dev/null > currentpid
+cat /dev/null > $azkaban_dir/currentpid
diff --git a/src/package/webserver/bin/azkaban-web-start.sh b/src/package/webserver/bin/azkaban-web-start.sh
old mode 100644
new mode 100755
index 1df0f4f..bdd19e0
--- a/src/package/webserver/bin/azkaban-web-start.sh
+++ b/src/package/webserver/bin/azkaban-web-start.sh
@@ -1,10 +1,7 @@
azkaban_dir=$(dirname $0)/..
-base_dir=$1
-tmpdir=
if [[ -z "$tmpdir" ]]; then
-echo "temp directory must be set!"
-exit
+tmpdir=temp
fi
for file in $azkaban_dir/lib/*.jar;
@@ -17,16 +14,15 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/plugins/*/*.jar;
+for file in $azkaban_dir/plugins/*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
echo $azkaban_dir;
-echo $base_dir;
echo $CLASSPATH;
-executorport=`cat conf/azkaban.properties | grep executor.port | cut -d = -f 2`
+executorport=`cat $azkaban_dir/conf/azkaban.properties | grep executor.port | cut -d = -f 2`
serverpath=`pwd`
if [ -z $AZKABAN_OPTS ]; then
@@ -34,7 +30,7 @@ if [ -z $AZKABAN_OPTS ]; then
fi
AZKABAN_OPTS=$AZKABAN_OPTS -server -Dcom.sun.management.jmxremote -Djava.io.tmpdir=$tmpdir -Dexecutorport=$executorport -Dserverpath=$serverpath
-java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanWebServer -conf $base_dir/conf $@ &
+java $AZKABAN_OPTS -cp $CLASSPATH azkaban.webapp.AzkabanWebServer -conf $azkaban_dir/conf $@ &
echo $! > currentpid
src/sql/create.properties.sql 2(+1 -1)
diff --git a/src/sql/create.properties.sql b/src/sql/create.properties.sql
index 4a62c51..aaa37ec 100644
--- a/src/sql/create.properties.sql
+++ b/src/sql/create.properties.sql
@@ -3,5 +3,5 @@ CREATE TABLE properties (
type INT NOT NULL,
modified_time BIGINT NOT NULL,
value VARCHAR(256),
- PRIMARY KEY (id, type)
+ PRIMARY KEY (name, type)
);
\ No newline at end of file
src/sql/create.schedules.sql 5(+4 -1)
diff --git a/src/sql/create.schedules.sql b/src/sql/create.schedules.sql
index 1924ecd..b759188 100644
--- a/src/sql/create.schedules.sql
+++ b/src/sql/create.schedules.sql
@@ -1,4 +1,5 @@
CREATE TABLE schedules (
+ schedule_id INT NOT NULL AUTO_INCREMENT,
project_id INT NOT NULL,
project_name VARCHAR(128) NOT NULL,
flow_name VARCHAR(128) NOT NULL,
@@ -12,5 +13,7 @@ CREATE TABLE schedules (
submit_user VARCHAR(128),
enc_type TINYINT,
schedule_options LONGBLOB,
- primary key(project_id, flow_name)
+ PRIMARY KEY (schedule_id)
);
+
+CREATE INDEX sched_project_id ON schedules(project_id, flow_name);
src/sql/database.properties 2(+1 -1)
diff --git a/src/sql/database.properties b/src/sql/database.properties
index 7b323f0..8b13789 100644
--- a/src/sql/database.properties
+++ b/src/sql/database.properties
@@ -1 +1 @@
-version=2.2
+
src/sql/update.schedules.2.2.sql 3(+3 -0)
diff --git a/src/sql/update.schedules.2.2.sql b/src/sql/update.schedules.2.2.sql
new file mode 100644
index 0000000..98849db
--- /dev/null
+++ b/src/sql/update.schedules.2.2.sql
@@ -0,0 +1,3 @@
+ALTER TABLE schedules DROP PRIMARY KEY;
+ALTER TABLE schedules ADD COLUMN schedule_id INT PRIMARY KEY NOT NULL AUTO_INCREMENT;
+ALTER TABLE schedules ADD INDEX project_id (project_id, flow_name);
diff --git a/unit/conf/dbtesth2/azkaban.properties b/unit/conf/dbtesth2/azkaban.properties
new file mode 100644
index 0000000..63ed5a9
--- /dev/null
+++ b/unit/conf/dbtesth2/azkaban.properties
@@ -0,0 +1,3 @@
+database.sql.scripts.dir=unit/sql
+database.type=h2
+h2.path=h2dbtest/h2db
diff --git a/unit/conf/dbtestmysql/azkaban.properties b/unit/conf/dbtestmysql/azkaban.properties
new file mode 100644
index 0000000..e974067
--- /dev/null
+++ b/unit/conf/dbtestmysql/azkaban.properties
@@ -0,0 +1,8 @@
+database.type=mysql
+mysql.port=3306
+mysql.host=localhost
+mysql.database=azkabanunittest
+mysql.user=root
+database.sql.scripts.dir=unit/sql
+mysql.password=
+mysql.numconnections=10
\ No newline at end of file
diff --git a/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java b/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java
index 75ca74f..23d548c 100644
--- a/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java
+++ b/unit/java/azkaban/test/database/AzkabanDatabaseSetupTest.java
@@ -6,6 +6,8 @@ import java.sql.SQLException;
import javax.sql.DataSource;
+import junit.framework.Assert;
+
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
@@ -37,25 +39,55 @@ public class AzkabanDatabaseSetupTest {
public void testH2Query() throws Exception {
Props h2Props = getH2Props();
AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(h2Props);
- setup.checkTableVersion(true, true);
- setup.checkTableVersion(true, true);
+ // First time will create the tables
+ setup.loadTableInfo();
+ setup.printUpgradePlan();
+ setup.updateDatabase(true, true);
+ Assert.assertTrue(setup.needsUpdating());
+
+ // Second time will update some tables. This is only for testing purpose and obviously we
+ // wouldn't set things up this way.
+ setup.loadTableInfo();
+ setup.printUpgradePlan();
+ setup.updateDatabase(true, true);
+ Assert.assertTrue(setup.needsUpdating());
+
+ // Nothing to be done
+ setup.loadTableInfo();
+ setup.printUpgradePlan();
+ Assert.assertFalse(setup.needsUpdating());
}
@Test
public void testMySQLQuery() throws Exception {
Props mysqlProps = getMySQLProps();
AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(mysqlProps);
- setup.checkTableVersion(true, true);
- setup.checkTableVersion(true, true);
+ // First time will create the tables
+ setup.loadTableInfo();
+ setup.printUpgradePlan();
+ setup.updateDatabase(true, true);
+ Assert.assertTrue(setup.needsUpdating());
+
+ // Second time will update some tables. This is only for testing purpose and obviously we
+ // wouldn't set things up this way.
+ setup.loadTableInfo();
+ setup.printUpgradePlan();
+ setup.updateDatabase(true, true);
+ Assert.assertTrue(setup.needsUpdating());
+
+ // Nothing to be done
+ setup.loadTableInfo();
+ setup.printUpgradePlan();
+ Assert.assertFalse(setup.needsUpdating());
}
private static Props getH2Props() {
Props props = new Props();
props.put("database.type", "h2");
props.put("h2.path", "h2dbtest/h2db");
- props.put("sql.script.path", "unit/sql");
+ props.put("database.sql.scripts.dir", "unit/sql");
return props;
}
@@ -68,7 +100,7 @@ public class AzkabanDatabaseSetupTest {
props.put("mysql.host", "localhost");
props.put("mysql.database", "azkabanunittest");
props.put("mysql.user", "root");
- props.put("sql.script.path", "unit/sql");
+ props.put("database.sql.scripts.dir", "unit/sql");
props.put("mysql.password", "");
props.put("mysql.numconnections", 10);
diff --git a/unit/java/azkaban/test/database/AzkabanDatabaseUpdaterTest.java b/unit/java/azkaban/test/database/AzkabanDatabaseUpdaterTest.java
new file mode 100644
index 0000000..9948db3
--- /dev/null
+++ b/unit/java/azkaban/test/database/AzkabanDatabaseUpdaterTest.java
@@ -0,0 +1,93 @@
+package azkaban.test.database;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.database.AzkabanDatabaseUpdater;
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.Props;
+
+public class AzkabanDatabaseUpdaterTest {
+ @BeforeClass
+ public static void setupDB() throws IOException, SQLException {
+ File dbDir = new File("h2dbtest");
+ if (dbDir.exists()) {
+ FileUtils.deleteDirectory(dbDir);
+ }
+
+ dbDir.mkdir();
+
+ clearUnitTestDB();
+ }
+
+ @AfterClass
+ public static void teardownDB() {
+ }
+
+ @Test
+ public void testMySQLAutoCreate() throws Exception {
+ String confDir = "unit/conf/dbtestmysql";
+ System.out.println("1.***Now testing check");
+ AzkabanDatabaseUpdater.main(new String[]{"-c",confDir});
+
+ System.out.println("2.***Now testing update");
+ AzkabanDatabaseUpdater.main(new String[]{ "-u", "-c",confDir});
+
+ System.out.println("3.***Now testing check again");
+ AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+
+ System.out.println("4.***Now testing update again");
+ AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir, "-u"});
+
+ System.out.println("5.***Now testing check again");
+ AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+ }
+
+ @Test
+ public void testH2AutoCreate() throws Exception {
+ String confDir = "unit/conf/dbtesth2";
+ System.out.println("1.***Now testing check");
+ AzkabanDatabaseUpdater.main(new String[]{"-c",confDir});
+
+ System.out.println("2.***Now testing update");
+ AzkabanDatabaseUpdater.main(new String[]{ "-u", "-c",confDir});
+
+ System.out.println("3.***Now testing check again");
+ AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+
+ System.out.println("4.***Now testing update again");
+ AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir, "-u"});
+
+ System.out.println("5.***Now testing check again");
+ AzkabanDatabaseUpdater.main(new String[]{ "-c",confDir});
+ }
+
+ private static void clearUnitTestDB() throws SQLException {
+ Props props = new Props();
+
+ props.put("database.type", "mysql");
+ props.put("mysql.host", "localhost");
+ props.put("mysql.port", "3306");
+ props.put("mysql.database", "");
+ props.put("mysql.user", "root");
+ props.put("mysql.password", "");
+ props.put("mysql.numconnections", 10);
+
+ DataSource datasource = DataSourceUtils.getDataSource(props);
+ QueryRunner runner = new QueryRunner(datasource);
+ try {
+ runner.update("drop database azkabanunittest");
+ } catch (SQLException e) {
+ }
+ runner.update("create database azkabanunittest");
+ }
+}