azkaban-aplcache
Changes
azkaban-db/src/main/sql/create.quartz-tables-all.sql 165(+165 -0)
azkaban-web-server/build.gradle 2(+2 -0)
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 754aa3a..7fd177e 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -154,6 +154,9 @@ public class Constants {
* implies save latest 3 versions saved in storage.
**/
public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
+
+ // enable Quartz Scheduler if true.
+ public static final String ENABLE_QUARTZ= "azkaban.server.schedule.enable_quartz";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index dc915dc..b8d425c 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -37,4 +37,5 @@ public class Utils {
return new DatabaseOperator(new QueryRunner(dataSource));
}
+
}
azkaban-db/src/main/sql/create.quartz-tables-all.sql 165(+165 -0)
diff --git a/azkaban-db/src/main/sql/create.quartz-tables-all.sql b/azkaban-db/src/main/sql/create.quartz-tables-all.sql
new file mode 100644
index 0000000..109fe6d
--- /dev/null
+++ b/azkaban-db/src/main/sql/create.quartz-tables-all.sql
@@ -0,0 +1,165 @@
+-- This file collects all quartz table create statement required for quartz 2.2.1
+--
+-- We are using Quartz 2.2.1 tables, the original place of which can be found at
+-- https://github.com/quartz-scheduler/quartz/blob/quartz-2.2.1/distribution/src/main/assembly/root/docs/dbTables/tables_mysql.sql
+
+
+DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
+DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
+DROP TABLE IF EXISTS QRTZ_LOCKS;
+DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
+DROP TABLE IF EXISTS QRTZ_CALENDARS;
+
+
+CREATE TABLE QRTZ_JOB_DETAILS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ JOB_NAME VARCHAR(200) NOT NULL,
+ JOB_GROUP VARCHAR(200) NOT NULL,
+ DESCRIPTION VARCHAR(250) NULL,
+ JOB_CLASS_NAME VARCHAR(250) NOT NULL,
+ IS_DURABLE VARCHAR(1) NOT NULL,
+ IS_NONCONCURRENT VARCHAR(1) NOT NULL,
+ IS_UPDATE_DATA VARCHAR(1) NOT NULL,
+ REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
+ JOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ JOB_NAME VARCHAR(200) NOT NULL,
+ JOB_GROUP VARCHAR(200) NOT NULL,
+ DESCRIPTION VARCHAR(250) NULL,
+ NEXT_FIRE_TIME BIGINT(13) NULL,
+ PREV_FIRE_TIME BIGINT(13) NULL,
+ PRIORITY INTEGER NULL,
+ TRIGGER_STATE VARCHAR(16) NOT NULL,
+ TRIGGER_TYPE VARCHAR(8) NOT NULL,
+ START_TIME BIGINT(13) NOT NULL,
+ END_TIME BIGINT(13) NULL,
+ CALENDAR_NAME VARCHAR(200) NULL,
+ MISFIRE_INSTR SMALLINT(2) NULL,
+ JOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+ REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPLE_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ REPEAT_COUNT BIGINT(7) NOT NULL,
+ REPEAT_INTERVAL BIGINT(12) NOT NULL,
+ TIMES_TRIGGERED BIGINT(10) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CRON_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ CRON_EXPRESSION VARCHAR(200) NOT NULL,
+ TIME_ZONE_ID VARCHAR(80),
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPROP_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ STR_PROP_1 VARCHAR(512) NULL,
+ STR_PROP_2 VARCHAR(512) NULL,
+ STR_PROP_3 VARCHAR(512) NULL,
+ INT_PROP_1 INT NULL,
+ INT_PROP_2 INT NULL,
+ LONG_PROP_1 BIGINT NULL,
+ LONG_PROP_2 BIGINT NULL,
+ DEC_PROP_1 NUMERIC(13,4) NULL,
+ DEC_PROP_2 NUMERIC(13,4) NULL,
+ BOOL_PROP_1 VARCHAR(1) NULL,
+ BOOL_PROP_2 VARCHAR(1) NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_BLOB_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ BLOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CALENDARS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ CALENDAR_NAME VARCHAR(200) NOT NULL,
+ CALENDAR BLOB NOT NULL,
+ PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
+);
+
+CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_FIRED_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ ENTRY_ID VARCHAR(95) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ INSTANCE_NAME VARCHAR(200) NOT NULL,
+ FIRED_TIME BIGINT(13) NOT NULL,
+ SCHED_TIME BIGINT(13) NOT NULL,
+ PRIORITY INTEGER NOT NULL,
+ STATE VARCHAR(16) NOT NULL,
+ JOB_NAME VARCHAR(200) NULL,
+ JOB_GROUP VARCHAR(200) NULL,
+ IS_NONCONCURRENT VARCHAR(1) NULL,
+ REQUESTS_RECOVERY VARCHAR(1) NULL,
+ PRIMARY KEY (SCHED_NAME,ENTRY_ID)
+);
+
+CREATE TABLE QRTZ_SCHEDULER_STATE
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ INSTANCE_NAME VARCHAR(200) NOT NULL,
+ LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
+ CHECKIN_INTERVAL BIGINT(13) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
+);
+
+CREATE TABLE QRTZ_LOCKS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ LOCK_NAME VARCHAR(40) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,LOCK_NAME)
+);
+
+
+commit;
diff --git a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
index c6bc817..a875c6a 100644
--- a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
+++ b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
@@ -16,8 +16,22 @@
*/
package azkaban.db;
+import java.io.File;
+import org.apache.commons.dbutils.QueryRunner;
+
public class AzDBTestUtility {
+ public static DatabaseOperator initQuartzDB() throws Exception {
+ final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
+
+ final String sqlScriptsDir = new File("../azkaban-web-server/src/test/resources/")
+ .getCanonicalPath();
+
+ final DatabaseSetup setup = new DatabaseSetup(dataSource, sqlScriptsDir);
+ setup.updateDatabase();
+ return new DatabaseOperator(new QueryRunner(dataSource));
+ }
+
public static class EmbeddedH2BasicDataSource extends AzkabanDataSource {
public EmbeddedH2BasicDataSource() {
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index c25cbe8..0ddcc85 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -89,6 +89,10 @@ public class AzkabanSingleServerTest {
props.put("user.manager.xml.file", new File(confPath, "azkaban-users.xml").getPath());
props.put("executor.port", "12321");
+ // Quartz settings
+ props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
+ props.put("org.quartz.threadPool.threadCount", "10");
+
final String sqlScriptsDir = getSqlScriptsDir();
assertTrue(new File(sqlScriptsDir).isDirectory());
props.put(AzkabanDatabaseSetup.DATABASE_SQL_SCRIPT_DIR, sqlScriptsDir);
azkaban-web-server/build.gradle 2(+2 -0)
diff --git a/azkaban-web-server/build.gradle b/azkaban-web-server/build.gradle
index a22fad2..6a7417a 100644
--- a/azkaban-web-server/build.gradle
+++ b/azkaban-web-server/build.gradle
@@ -63,6 +63,8 @@ dependencies {
generateRestli deps.restliGenerator
generateRestli deps.restliTools
+ testCompile project(':test')
+ testCompile project(path: ':azkaban-db', configuration: 'testOutput')
testRuntime deps.h2
}
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/AbstractQuartzJob.java b/azkaban-web-server/src/main/java/azkaban/scheduler/AbstractQuartzJob.java
new file mode 100644
index 0000000..ba89096
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/AbstractQuartzJob.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import java.io.Serializable;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+
+public abstract class AbstractQuartzJob implements Job {
+
+ /**
+ * Cast the object to the original one with the type. The object must extend Serializable.
+ */
+ protected static <T extends Serializable> T asT(final Object service, final Class<T> type) {
+ return type.cast(service);
+ }
+
+ @Override
+ public abstract void execute(JobExecutionContext context);
+
+ protected Object getKey(final JobExecutionContext context, final String key) {
+ return context.getMergedJobDataMap().get(key);
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
new file mode 100644
index 0000000..83953d4
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Manage one quartz job's variables. Every AZ Quartz Job should come with a QuartzJobDescription.
+ */
+public class QuartzJobDescription<T extends AbstractQuartzJob> {
+
+ private final String groupName;
+ private final Class<T> jobClass;
+ private final Map<String, ? extends Serializable> contextMap;
+
+ public QuartzJobDescription(final Class<T> jobClass,
+ final String groupName,
+ final Map<String, ? extends Serializable> contextMap) {
+
+ /**
+ * This check is necessary for raw type. Please see test
+ * {@link QuartzJobDescriptionTest#testCreateQuartzJobDescription2}
+ */
+ if (jobClass.getSuperclass() != AbstractQuartzJob.class) {
+ throw new ClassCastException("jobClass must extend AbstractQuartzJob class");
+ }
+ this.jobClass = jobClass;
+ this.groupName = groupName;
+ this.contextMap = contextMap;
+ }
+
+ public Class<? extends AbstractQuartzJob> getJobClass() {
+ return this.jobClass;
+ }
+
+ public Map<String, ? extends Serializable> getContextMap() {
+ return this.contextMap;
+ }
+
+ @Override
+ public String toString() {
+ return "QuartzJobDescription{" +
+ "jobClass=" + this.jobClass +
+ ", groupName='" + this.groupName + '\'' +
+ ", contextMap=" + this.contextMap +
+ '}';
+ }
+
+ public String getGroupName() {
+ return this.groupName;
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
new file mode 100644
index 0000000..179b1f6
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.utils.Props;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages Quartz schedules. Azkaban regards QuartzJob and QuartzTrigger as an one-to-one
+ * mapping.
+ */
+@Singleton
+public class QuartzScheduler {
+
+ //Unless specified, all Quartz jobs's identities comes with the default job name.
+ private static final String DEFAULT_JOB_NAME = "job1";
+ private static final Logger logger = LoggerFactory.getLogger(QuartzScheduler.class);
+ private Scheduler scheduler = null;
+
+ @Inject
+ public QuartzScheduler(final Props azProps) throws SchedulerException{
+ final StdSchedulerFactory schedulerFactory =
+ new StdSchedulerFactory(azProps.toProperties());
+ this.scheduler = schedulerFactory.getScheduler();
+ }
+
+ public void start() {
+ try {
+ this.scheduler.start();
+ } catch (final SchedulerException e) {
+ logger.error("Error starting Quartz scheduler: ", e);
+ }
+ logger.info("Quartz Scheduler started.");
+ }
+
+ public void cleanup() {
+ logger.info("Cleaning up schedules in scheduler");
+ try {
+ this.scheduler.clear();
+ } catch (final SchedulerException e) {
+ logger.error("Exception clearing scheduler: ", e);
+ }
+ }
+
+ public void pause() {
+ logger.info("pausing all schedules in Quartz");
+ try {
+ this.scheduler.pauseAll();
+ } catch (final SchedulerException e) {
+ logger.error("Exception pausing scheduler: ", e);
+ }
+ }
+
+ public void resume() {
+ logger.info("resuming all schedules in Quartz");
+ try {
+ this.scheduler.resumeAll();
+ } catch (final SchedulerException e) {
+ logger.error("Exception resuming scheduler: ", e);
+ }
+ }
+
+ public void shutdown() {
+ logger.info("Shutting down scheduler");
+ try {
+ this.scheduler.shutdown();
+ } catch (final SchedulerException e) {
+ logger.error("Exception shutting down scheduler: ", e);
+ }
+ }
+
+ public void unregisterJob(final String groupName) throws SchedulerException {
+ if(!ifJobExist(groupName)) {
+ logger.warn("can not find job with " + groupName + " in quartz.");
+ } else {
+ this.scheduler.deleteJob(new JobKey(DEFAULT_JOB_NAME, groupName));
+ }
+ }
+
+ /**
+ * Only cron schedule register is supported.
+ *
+ * @param cronExpression the cron schedule for this job
+ * @param jobDescription Regarding QuartzJobDescription#groupName, in order to guarantee no
+ * duplicate quartz schedules, we design the naming convention depending on use cases:
+ * <ul>
+ * <li>User flow schedule: we use {@link org.quartz.JobKey#JobKey} to represent the identity
+ * of a flow's schedule. The format follows "$projectID_$flowName" to guarantee no duplicates.
+ * </li>
+ * <li>Quartz schedule for AZ internal use: the groupName should start with letters, rather
+ * than number, which is the first case. </li>
+ * <ul>
+ */
+ public void registerJob(final String cronExpression, final QuartzJobDescription jobDescription)
+ throws SchedulerException {
+
+ requireNonNull(jobDescription, "jobDescription is null");
+
+ // Not allowed to register duplicate job name.
+ if(ifJobExist(jobDescription.getGroupName())) {
+ throw new SchedulerException("can not register existing job " + jobDescription.getGroupName());
+ }
+
+ if (!CronExpression.isValidExpression(cronExpression)) {
+ throw new SchedulerException("The cron expression string <" + cronExpression + "> is not valid.");
+ }
+
+ // TODO kunkun-tang: we will modify this when we start supporting multi schedules per flow.
+ final JobDetail job = JobBuilder.newJob(jobDescription.getJobClass())
+ .withIdentity(DEFAULT_JOB_NAME, jobDescription.getGroupName()).build();
+
+ // Add external dependencies to Job Data Map.
+ job.getJobDataMap().putAll(jobDescription.getContextMap());
+
+ // TODO kunkun-tang: Need management code to deal with different misfire policy
+ final Trigger trigger = TriggerBuilder
+ .newTrigger()
+ .withSchedule(
+ CronScheduleBuilder.cronSchedule(cronExpression)
+ .withMisfireHandlingInstructionFireAndProceed()
+// .withMisfireHandlingInstructionDoNothing()
+// .withMisfireHandlingInstructionIgnoreMisfires()
+ )
+ .build();
+
+ this.scheduler.scheduleJob(job, trigger);
+ logger.info("Quartz Schedule with jobDetail " + job.getDescription() + " is registered.");
+ }
+
+
+ public boolean ifJobExist(final String groupName) throws SchedulerException {
+ final Set<JobKey> jobKeySet = this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));
+ return jobKeySet != null && jobKeySet.size() > 0;
+ }
+
+ public Scheduler getScheduler() {
+ return this.scheduler;
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index e0a921e..cb73909 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
import azkaban.AzkabanCommonModule;
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.jmx.JmxExecutorManager;
@@ -28,6 +29,7 @@ import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
import azkaban.metrics.MetricsManager;
import azkaban.project.ProjectManager;
+import azkaban.scheduler.QuartzScheduler;
import azkaban.scheduler.ScheduleManager;
import azkaban.server.AzkabanServer;
import azkaban.server.session.SessionCache;
@@ -139,6 +141,7 @@ public class AzkabanWebServer extends AzkabanServer {
private final Props props;
private final SessionCache sessionCache;
private final List<ObjectName> registeredMBeans = new ArrayList<>();
+ private final QuartzScheduler quartzScheduler;
private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
@@ -154,6 +157,7 @@ public class AzkabanWebServer extends AzkabanServer {
final UserManager userManager,
final ScheduleManager scheduleManager,
final VelocityEngine velocityEngine,
+ final QuartzScheduler quartzScheduler,
final StatusService statusService) {
this.props = requireNonNull(props, "props is null.");
this.server = requireNonNull(server, "server is null.");
@@ -165,6 +169,7 @@ public class AzkabanWebServer extends AzkabanServer {
this.userManager = requireNonNull(userManager, "userManager is null.");
this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
this.velocityEngine = requireNonNull(velocityEngine, "velocityEngine is null.");
+ this.quartzScheduler = requireNonNull(quartzScheduler, "quartzScheduler is null.");
this.statusService = statusService;
loadBuiltinCheckersAndActions();
@@ -184,7 +189,6 @@ public class AzkabanWebServer extends AzkabanServer {
DateTimeZone.setDefault(DateTimeZone.forID(timezone));
logger.info("Setting timezone to " + timezone);
}
-
configureMBeanServer();
}
@@ -227,9 +231,15 @@ public class AzkabanWebServer extends AzkabanServer {
@Override
public void run() {
try {
+ webServer.quartzScheduler.shutdown();
+ } catch (final Exception e) {
+ logger.error(("Exception while shutting down quartz scheduler."), e);
+ }
+
+ try {
logTopMemoryConsumers();
} catch (final Exception e) {
- logger.info(("Exception when logging top memory consumers"), e);
+ logger.error(("Exception when logging top memory consumers"), e);
}
logger.info("Shutting down http server...");
@@ -509,6 +519,12 @@ public class AzkabanWebServer extends AzkabanServer {
if (this.props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
startWebMetrics();
}
+
+ if(this.props.containsKey(ConfigurationKeys.ENABLE_QUARTZ) && this.props.getBoolean(ConfigurationKeys
+ .ENABLE_QUARTZ)) {
+ this.quartzScheduler.start();
+ }
+
try {
this.server.start();
logger.info("Server started");
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
new file mode 100644
index 0000000..dbc6df6
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+public class QuartzJobDescriptionTest {
+
+ @Test
+ public void testCreateQuartzJobDescription() throws Exception{
+ final SampleService sampleService = new SampleService("first field", "second field");
+ final Map<String, SampleService> contextMap = new HashMap<>();
+ contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
+ assertThatCode(() -> {
+ new QuartzJobDescription<>(SampleQuartzJob.class,
+ "SampleService",
+ contextMap);
+ }).doesNotThrowAnyException();
+ }
+
+
+ @Test
+ public void testCreateQuartzJobDescriptionRawType1() throws Exception{
+ final SampleService sampleService = new SampleService("first field", "second field");
+ final Map<String, SampleService> contextMap = new HashMap<>();
+ contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
+ assertThatCode(() -> {new QuartzJobDescription(SampleQuartzJob.class, "SampleService",
+ contextMap);
+ }).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testCreateQuartzJobDescriptionRawType2() throws Exception{
+ final SampleService sampleService = new SampleService("first field", "second field");
+ final Map<String, SampleService> contextMap = new HashMap<>();
+ contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
+ assertThatThrownBy(
+ () -> new QuartzJobDescription(SampleService.class, "SampleService",
+ contextMap))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("jobClass must extend AbstractQuartzJob class");
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
new file mode 100644
index 0000000..0e97f61
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import azkaban.db.AzDBTestUtility;
+import azkaban.db.DatabaseOperator;
+import azkaban.test.TestUtils;
+import azkaban.utils.Props;
+import java.io.File;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.quartz.SchedulerException;
+
+/**
+ * Use H2-in-mem database to directly test Quartz.
+ */
+public class QuartzSchedulerTest {
+
+ private static DatabaseOperator dbOperator;
+ private static QuartzScheduler scheduler;
+
+ @BeforeClass
+ public static void setUpQuartz() throws Exception {
+ dbOperator = AzDBTestUtility.initQuartzDB();
+ final String quartzPropsPath=
+ new File("../azkaban-web-server/src/test/resources/quartz.test.properties")
+ .getCanonicalPath();
+ final Props quartzProps = new Props(null, quartzPropsPath);
+ scheduler = new QuartzScheduler(quartzProps);
+ scheduler.start();
+ }
+
+ @AfterClass
+ public static void destroyQuartz() {
+ try {
+ scheduler.shutdown();
+ dbOperator.update("DROP ALL OBJECTS");
+ dbOperator.update("SHUTDOWN");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void init() {
+ SampleQuartzJob.COUNT_EXECUTION = 0;
+ }
+
+ @After
+ public void cleanup() {
+ scheduler.cleanup();
+ }
+
+ @Test
+ public void testCreateScheduleAndRun() throws Exception{
+ scheduler.registerJob("* * * * * ?", createJobDescription());
+ assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(true);
+ TestUtils.await().untilAsserted(() -> assertThat(SampleQuartzJob.COUNT_EXECUTION)
+ .isNotNull().isGreaterThan(1));
+ }
+
+ @Test
+ public void testNotAllowDuplicateJobRegister() throws Exception{
+ scheduler.registerJob("* * * * * ?", createJobDescription());
+ assertThatThrownBy(
+ () -> scheduler.registerJob("0 5 * * * ?", createJobDescription()))
+ .isInstanceOf(SchedulerException.class)
+ .hasMessageContaining("can not register existing job");
+ }
+
+ @Test
+ public void testInvalidCron() throws Exception{
+ assertThatThrownBy(
+ () -> scheduler.registerJob("0 5 * * * *", createJobDescription()))
+ .isInstanceOf(SchedulerException.class)
+ .hasMessageContaining("The cron expression string");
+ }
+
+ @Test
+ public void testUnregisterSchedule() throws Exception{
+ scheduler.registerJob("* * * * * ?", createJobDescription());
+ assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(true);
+ scheduler.unregisterJob("SampleService");
+ assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(false);
+ }
+
+ @Test
+ public void testPauseAndResume() throws Exception{
+ scheduler.registerJob("* * * * * ?", createJobDescription());
+ scheduler.pause();
+ final int count = SampleQuartzJob.COUNT_EXECUTION;
+ Thread.sleep(1500);
+ assertThat(SampleQuartzJob.COUNT_EXECUTION).isEqualTo(count);
+ scheduler.resume();
+ Thread.sleep(1200);
+ assertThat(SampleQuartzJob.COUNT_EXECUTION).isGreaterThan(count);
+ }
+
+ private QuartzJobDescription createJobDescription() {
+ final SampleService sampleService = new SampleService("first field", "second field");
+ final Map<String, SampleService> contextMap = new HashMap<>();
+ contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
+
+ return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleService",
+ contextMap);
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java b/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java
new file mode 100644
index 0000000..3439651
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import java.io.Serializable;
+import org.quartz.JobExecutionContext;
+
+public class SampleQuartzJob extends AbstractQuartzJob{
+
+ public static final String DELEGATE_CLASS_NAME = "SampleService";
+ public static int COUNT_EXECUTION = 0;
+
+ public SampleQuartzJob() {
+ }
+
+ @Override
+ public void execute(final JobExecutionContext context) {
+ final SampleService service = asT(getKey(context, DELEGATE_CLASS_NAME), SampleService.class);
+ COUNT_EXECUTION ++ ;
+ service.run();
+ }
+}
+
+class SampleService implements Serializable{
+
+ private final String field1;
+ private final String field2;
+
+ SampleService(final String field1, final String field2) {
+ this.field1 = field1;
+ this.field2 = field2;
+ }
+
+ void run() {
+ System.out.println("field1: " + this.field1 + "==== field2: " + this.field2);
+ }
+
+ @Override
+ public String toString() {
+ return "field1: " + this.field1 + ", field2: " + this.field2;
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index 3a73439..c7ba19a 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -42,6 +42,7 @@ import azkaban.executor.ExecutorManager;
import azkaban.executor.FetchActiveFlowDao;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManager;
+import azkaban.scheduler.QuartzScheduler;
import azkaban.spi.Storage;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
@@ -98,6 +99,9 @@ public class AzkabanWebServerTest {
props.put("jetty.use.ssl", "false");
props.put("user.manager.xml.file", getUserManagerXmlFile());
+ // Quartz settings
+ props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
+ props.put("org.quartz.threadPool.threadCount", "10");
AzkabanDatabaseUpdater.runDatabaseUpdater(props, sqlScriptsDir, true);
}
@@ -151,6 +155,8 @@ public class AzkabanWebServerTest {
assertSingleton(AzkabanWebServer.class, injector);
assertSingleton(H2FileDataSource.class, injector);
+ assertSingleton(QuartzScheduler.class, injector);
+
SERVICE_PROVIDER.unsetInjector();
}
}
diff --git a/azkaban-web-server/src/test/resources/create.quartz-all-tables.sql b/azkaban-web-server/src/test/resources/create.quartz-all-tables.sql
new file mode 100644
index 0000000..2afecb7
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/create.quartz-all-tables.sql
@@ -0,0 +1,165 @@
+-- All necessary quartz table create statement collection for unit test purpose
+--
+-- We are using H2-in-memory DB. However, H2 version (1.3.155) is unable to perform automatic
+-- conversion from boolean into VARCHAR(1) like MySQL can do.
+-- So we replace all varchar(1) to boolean.
+
+DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
+DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
+DROP TABLE IF EXISTS QRTZ_LOCKS;
+DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
+DROP TABLE IF EXISTS QRTZ_CALENDARS;
+
+
+CREATE TABLE QRTZ_JOB_DETAILS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ JOB_NAME VARCHAR(200) NOT NULL,
+ JOB_GROUP VARCHAR(200) NOT NULL,
+ DESCRIPTION VARCHAR(250) NULL,
+ JOB_CLASS_NAME VARCHAR(250) NOT NULL,
+ IS_DURABLE BOOLEAN NOT NULL,
+ IS_NONCONCURRENT BOOLEAN NOT NULL,
+ IS_UPDATE_DATA BOOLEAN NOT NULL,
+ REQUESTS_RECOVERY BOOLEAN NOT NULL,
+ JOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ JOB_NAME VARCHAR(200) NOT NULL,
+ JOB_GROUP VARCHAR(200) NOT NULL,
+ DESCRIPTION VARCHAR(250) NULL,
+ NEXT_FIRE_TIME BIGINT(13) NULL,
+ PREV_FIRE_TIME BIGINT(13) NULL,
+ PRIORITY INTEGER NULL,
+ TRIGGER_STATE VARCHAR(16) NOT NULL,
+ TRIGGER_TYPE VARCHAR(8) NOT NULL,
+ START_TIME BIGINT(13) NOT NULL,
+ END_TIME BIGINT(13) NULL,
+ CALENDAR_NAME VARCHAR(200) NULL,
+ MISFIRE_INSTR SMALLINT(2) NULL,
+ JOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+ REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPLE_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ REPEAT_COUNT BIGINT(7) NOT NULL,
+ REPEAT_INTERVAL BIGINT(12) NOT NULL,
+ TIMES_TRIGGERED BIGINT(10) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CRON_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ CRON_EXPRESSION VARCHAR(200) NOT NULL,
+ TIME_ZONE_ID VARCHAR(80),
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPROP_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ STR_PROP_1 VARCHAR(512) NULL,
+ STR_PROP_2 VARCHAR(512) NULL,
+ STR_PROP_3 VARCHAR(512) NULL,
+ INT_PROP_1 INT NULL,
+ INT_PROP_2 INT NULL,
+ LONG_PROP_1 BIGINT NULL,
+ LONG_PROP_2 BIGINT NULL,
+ DEC_PROP_1 NUMERIC(13,4) NULL,
+ DEC_PROP_2 NUMERIC(13,4) NULL,
+ BOOL_PROP_1 BOOLEAN NULL,
+ BOOL_PROP_2 BOOLEAN NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_BLOB_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ BLOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CALENDARS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ CALENDAR_NAME VARCHAR(200) NOT NULL,
+ CALENDAR BLOB NOT NULL,
+ PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
+);
+
+CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_FIRED_TRIGGERS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ ENTRY_ID VARCHAR(95) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ INSTANCE_NAME VARCHAR(200) NOT NULL,
+ FIRED_TIME BIGINT(13) NOT NULL,
+ SCHED_TIME BIGINT(13) NOT NULL,
+ PRIORITY INTEGER NOT NULL,
+ STATE VARCHAR(16) NOT NULL,
+ JOB_NAME VARCHAR(200) NULL,
+ JOB_GROUP VARCHAR(200) NULL,
+ IS_NONCONCURRENT BOOLEAN NULL,
+ REQUESTS_RECOVERY BOOLEAN NULL,
+ PRIMARY KEY (SCHED_NAME,ENTRY_ID)
+);
+
+CREATE TABLE QRTZ_SCHEDULER_STATE
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ INSTANCE_NAME VARCHAR(200) NOT NULL,
+ LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
+ CHECKIN_INTERVAL BIGINT(13) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
+);
+
+CREATE TABLE QRTZ_LOCKS
+ (
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ LOCK_NAME VARCHAR(40) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,LOCK_NAME)
+);
+
+
+commit;
diff --git a/azkaban-web-server/src/test/resources/quartz.test.properties b/azkaban-web-server/src/test/resources/quartz.test.properties
new file mode 100644
index 0000000..259bb64
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/quartz.test.properties
@@ -0,0 +1,17 @@
+# Quartz settings
+org.quartz.jdbcStore=true
+
+org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
+org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
+org.quartz.jobStore.tablePrefix=qrtz_
+
+org.quartz.jobStore.misfireThreshold=1000
+org.quartz.jobStore.isClustered=false
+org.quartz.jobStore.dataSource=quartzDS
+
+org.quartz.dataSource.quartzDS.driver=org.h2.Driver
+org.quartz.dataSource.quartzDS.URL=jdbc:h2:mem:test
+org.quartz.dataSource.quartzDS.maxConnections = 20
+
+org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
+org.quartz.threadPool.threadCount = 10