azkaban-developers

Build Quartz Job Factory to allow Quartz jobs easily inject

10/20/2017 8:41:29 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
index 83953d4..12ea3f1 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
@@ -17,6 +17,7 @@
 package azkaban.scheduler;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -44,6 +45,21 @@ public class QuartzJobDescription<T extends AbstractQuartzJob> {
     this.contextMap = contextMap;
   }
 
+  public QuartzJobDescription(final Class<T> jobClass,
+      final String groupName) {
+
+    /**
+     * This check is necessary for raw type. Please see test
+     * {@link QuartzJobDescriptionTest#testCreateQuartzJobDescription2}
+     */
+    if (jobClass.getSuperclass() != AbstractQuartzJob.class) {
+      throw new ClassCastException("jobClass must extend AbstractQuartzJob class");
+    }
+    this.jobClass = jobClass;
+    this.groupName = groupName;
+    this.contextMap = new HashMap<String, String>();
+  }
+
   public Class<? extends AbstractQuartzJob> getJobClass() {
     return this.jobClass;
   }
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 28f917d..1991485 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -16,6 +16,7 @@
 
 package azkaban.scheduler;
 
+import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants.ConfigurationKeys;
@@ -56,6 +57,10 @@ public class QuartzScheduler {
     final StdSchedulerFactory schedulerFactory =
         new StdSchedulerFactory(azProps.toProperties());
     this.scheduler = schedulerFactory.getScheduler();
+
+    // Currently Quartz only support internal job schedules. When we migrate to User Production
+    // flows, we need to construct a Guice-Free JobFactory for use.
+    this.scheduler.setJobFactory(SERVICE_PROVIDER.getInstance(SchedulerJobFactory.class));
   }
 
   public void start() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/SchedulerJobFactory.java b/azkaban-web-server/src/main/java/azkaban/scheduler/SchedulerJobFactory.java
new file mode 100644
index 0000000..9247f28
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/SchedulerJobFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import com.google.inject.Injector;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.quartz.Job;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.spi.JobFactory;
+import org.quartz.spi.TriggerFiredBundle;
+
+/**
+ * Produce Guice-able Job in this custom defined Job Factory.
+ *
+ * In order to allow Quaratz jobs easily inject dependency, we create this factory. Every Quartz job
+ * will be constructed by newJob method.
+ */
+@Singleton
+public class SchedulerJobFactory implements JobFactory {
+
+  private final Injector injector;
+
+  @Inject
+  public SchedulerJobFactory(final Injector injector) {
+    this.injector = injector;
+  }
+
+  @Override
+  public Job newJob(final TriggerFiredBundle bundle, final Scheduler scheduler)
+      throws SchedulerException {
+    return (Job) this.injector.getInstance(bundle.getJobDetail()
+        .getJobClass());
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
index dbc6df6..2c94d2d 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
@@ -27,9 +27,7 @@ public class QuartzJobDescriptionTest {
 
   @Test
   public void testCreateQuartzJobDescription() throws Exception{
-    final SampleService sampleService = new SampleService("first field", "second field");
     final Map<String, SampleService> contextMap = new HashMap<>();
-    contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
     assertThatCode(() -> {
           new QuartzJobDescription<>(SampleQuartzJob.class,
           "SampleService",
@@ -37,25 +35,10 @@ public class QuartzJobDescriptionTest {
     }).doesNotThrowAnyException();
   }
 
-
   @Test
-  public void testCreateQuartzJobDescriptionRawType1() throws Exception{
-    final SampleService sampleService = new SampleService("first field", "second field");
-    final Map<String, SampleService> contextMap = new HashMap<>();
-    contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
-    assertThatCode(() -> {new QuartzJobDescription(SampleQuartzJob.class, "SampleService",
-        contextMap);
-    }).doesNotThrowAnyException();
-  }
-
-  @Test
-  public void testCreateQuartzJobDescriptionRawType2() throws Exception{
-    final SampleService sampleService = new SampleService("first field", "second field");
-    final Map<String, SampleService> contextMap = new HashMap<>();
-    contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
+  public void testCreateQuartzJobDescriptionRawType2() throws Exception {
     assertThatThrownBy(
-        () -> new QuartzJobDescription(SampleService.class, "SampleService",
-            contextMap))
+        () -> new QuartzJobDescription(SampleService.class, "SampleService"))
         .isInstanceOf(RuntimeException.class)
         .hasMessageContaining("jobClass must extend AbstractQuartzJob class");
   }
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
index aae8ddf..776554d 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
@@ -16,18 +16,21 @@
 
 package azkaban.scheduler;
 
+import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import azkaban.AzkabanCommonModule;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.db.AzDBTestUtility;
 import azkaban.db.DatabaseOperator;
 import azkaban.test.TestUtils;
 import azkaban.utils.Props;
+import azkaban.webapp.AzkabanWebServerModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 import java.io.File;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -50,9 +53,20 @@ public class QuartzSchedulerTest {
     final String quartzPropsPath =
         new File("../azkaban-web-server/src/test/resources/quartz.test.properties")
             .getCanonicalPath();
-    final Props quartzProps = new Props(null, quartzPropsPath);
-    quartzProps.put(ConfigurationKeys.ENABLE_QUARTZ, "true");
-    scheduler = new QuartzScheduler(quartzProps);
+    final Props props = new Props(null, quartzPropsPath);
+    props.put(ConfigurationKeys.ENABLE_QUARTZ, "true");
+
+    props.put("database.type", "h2");
+    props.put("h2.path", "./h2");
+    final Injector injector = Guice.createInjector(
+        new AzkabanCommonModule(props),
+        new AzkabanWebServerModule()
+    );
+
+    SERVICE_PROVIDER.unsetInjector();
+    SERVICE_PROVIDER.setInjector(injector);
+
+    scheduler = new QuartzScheduler(props);
     scheduler.start();
   }
 
@@ -124,11 +138,6 @@ public class QuartzSchedulerTest {
   }
 
   private QuartzJobDescription createJobDescription() {
-    final SampleService sampleService = new SampleService("first field", "second field");
-    final Map<String, SampleService> contextMap = new HashMap<>();
-    contextMap.put(SampleQuartzJob.DELEGATE_CLASS_NAME, sampleService);
-
-    return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleService",
-        contextMap);
+    return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleService");
   }
 }
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java b/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java
index 3439651..fa4e8ce 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/SampleQuartzJob.java
@@ -16,41 +16,48 @@
 
 package azkaban.scheduler;
 
+import azkaban.utils.Props;
 import java.io.Serializable;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.quartz.JobExecutionContext;
 
 public class SampleQuartzJob extends AbstractQuartzJob{
 
   public static final String DELEGATE_CLASS_NAME = "SampleService";
   public static int COUNT_EXECUTION = 0;
+  private final SampleService sampleService;
 
-  public SampleQuartzJob() {
+  @Inject
+  public SampleQuartzJob(final SampleService sampleService) {
+    this.sampleService = sampleService;
   }
 
   @Override
   public void execute(final JobExecutionContext context) {
-    final SampleService service = asT(getKey(context, DELEGATE_CLASS_NAME), SampleService.class);
+//    final SampleService service = asT(getKey(context, DELEGATE_CLASS_NAME), SampleService.class);
     COUNT_EXECUTION ++ ;
-    service.run();
+    this.sampleService.run();
   }
 }
 
+@Singleton
 class SampleService implements Serializable{
 
-  private final String field1;
-  private final String field2;
 
-  SampleService(final String field1, final String field2) {
-    this.field1 = field1;
-    this.field2 = field2;
+  private final Props props;
+
+  @Inject
+  SampleService(final Props props) {
+    this.props = props;
   }
 
   void run() {
-    System.out.println("field1: " + this.field1 + "==== field2: " + this.field2);
+    System.out.println(this.props.toString());
   }
 
   @Override
   public String toString() {
-    return "field1: " + this.field1 + ", field2: " + this.field2;
+    return this.props.toString();
   }
 }