azkaban-aplcache

integrate flow trigger service with az web server (#1633) This

2/7/2018 9:06:13 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 337b0d7..f7ca8bc 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -89,6 +89,7 @@ public class Constants {
   // The flow exec id for a flow trigger instance which hasn't started a flow yet
   public static final int UNASSIGNED_EXEC_ID = -1;
 
+
   public static class ConfigurationKeys {
 
     // Configures Azkaban Flow Version in project YAML file
@@ -191,7 +192,13 @@ public class Constants {
     // enable Quartz Scheduler if true.
     public static final String ENABLE_QUARTZ = "azkaban.server.schedule.enable_quartz";
 
+    // enable Flow trigger if true.
+    public static final String ENABLE_FLOW_TRIGGER = "azkaban.server.flowtrigger.enabled";
+
     public static final String CUSTOM_CREDENTIAL_NAME = "azkaban.security.credential";
+
+    // dir to keep dependency plugins
+    public static final String DEPENDENCY_PLUGIN_DIR = "azkaban.dependency.plugin.dir";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index ea0ce1b..8225b93 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -18,6 +18,7 @@ package azkaban.flowtrigger;
 
 import azkaban.Constants;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
 import azkaban.project.FlowTrigger;
 import azkaban.project.FlowTriggerDependency;
@@ -97,6 +98,11 @@ public class FlowTriggerService {
     this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
   }
 
+  public void start() throws FlowTriggerDependencyPluginException {
+    this.triggerPluginManager.loadAllPlugins();
+    this.recoverIncompleteTriggerInstances();
+  }
+
   private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
       starttimeInMills) throws Exception {
     final DependencyCheck dependencyCheck = this.triggerPluginManager
@@ -241,7 +247,7 @@ public class FlowTriggerService {
   /**
    * Resume executions of all incomplete trigger instances by recovering the state from db.
    */
-  public void recoverIncompleteTriggerInstances() {
+  private void recoverIncompleteTriggerInstances() {
     final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
         .getIncompleteTriggerInstances();
     //todo chengren311: what if flow trigger is not found?
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
index c524f27..ddb860e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -60,7 +60,6 @@ public class FlowTriggerDependencyPluginManager {
       FlowTriggerDependencyPluginException {
     this.dependencyTypeMap = new ConcurrentHashMap<>();
     this.pluginDir = pluginDir;
-    this.loadAllDependencyPlugins();
   }
 
   private Map<String, String> readConfig(final File file) throws
@@ -159,7 +158,14 @@ public class FlowTriggerDependencyPluginManager {
     }
   }
 
-  private void loadAllDependencyPlugins() throws FlowTriggerDependencyPluginException {
+  /**
+   * Initialize all dependency plugins.
+   * todo chengren311: Current design aborts loadAllPlugins if any of the plugin fails to be
+   * initialized.
+   * However, this might not be the optimal design. Suppose we have two dependency plugin types
+   * - MySQL and Kafka, if MySQL is down, then kafka dependency type will also be unavailable.
+   */
+  public void loadAllPlugins() throws FlowTriggerDependencyPluginException {
     final File pluginDir = new File(this.pluginDir);
     for (final File dir : pluginDir.listFiles()) {
       loadDependencyPlugin(dir);
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 1991485..b4e573e 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -124,7 +124,8 @@ public class QuartzScheduler {
    * duplicate quartz schedules, we design the naming convention depending on use cases: <ul>
    * <li>User flow schedule: we use {@link org.quartz.JobKey#JobKey} to represent the identity of a
    * flow's schedule. The format follows "$projectID_$flowName" to guarantee no duplicates.
-   * <li>Quartz schedule for AZ internal use: the groupName should start with letters, rather than
+   * <li>Quartz schedule for AZ internal use: the groupName should start with letters,
+   * rather than
    * number, which is the first case.</ul>
    */
   public void registerJob(final String cronExpression, final QuartzJobDescription jobDescription)
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 50f26ad..3cfb8af 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -24,6 +24,7 @@ import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
+import azkaban.flowtrigger.FlowTriggerService;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
@@ -142,7 +143,7 @@ public class AzkabanWebServer extends AzkabanServer {
   private final SessionCache sessionCache;
   private final List<ObjectName> registeredMBeans = new ArrayList<>();
   private final QuartzScheduler quartzScheduler;
-
+  private final FlowTriggerService flowTriggerService;
   private Map<String, TriggerPlugin> triggerPlugins;
   private MBeanServer mbeanServer;
 
@@ -158,6 +159,7 @@ public class AzkabanWebServer extends AzkabanServer {
       final ScheduleManager scheduleManager,
       final VelocityEngine velocityEngine,
       final QuartzScheduler quartzScheduler,
+      final FlowTriggerService flowTriggerService,
       final StatusService statusService) {
     this.props = requireNonNull(props, "props is null.");
     this.server = requireNonNull(server, "server is null.");
@@ -170,6 +172,7 @@ public class AzkabanWebServer extends AzkabanServer {
     this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
     this.velocityEngine = requireNonNull(velocityEngine, "velocityEngine is null.");
     this.quartzScheduler = requireNonNull(quartzScheduler, "quartzScheduler is null.");
+    this.flowTriggerService = requireNonNull(flowTriggerService, "flowTriggerService is null.");
     this.statusService = statusService;
 
     loadBuiltinCheckersAndActions();
@@ -239,6 +242,14 @@ public class AzkabanWebServer extends AzkabanServer {
         }
 
         try {
+          if (webServer.flowTriggerService != null) {
+            webServer.flowTriggerService.shutdown();
+          }
+        } catch (final Exception e) {
+          logger.error(("Exception while shutting down flow trigger service."), e);
+        }
+
+        try {
           logTopMemoryConsumers();
         } catch (final Exception e) {
           logger.error(("Exception when logging top memory consumers"), e);
@@ -442,6 +453,10 @@ public class AzkabanWebServer extends AzkabanServer {
     ve.addProperty("jar.resource.loader.path", jarResourcePath);
   }
 
+  public FlowTriggerService getFlowTriggerService() {
+    return this.flowTriggerService;
+  }
+
   private void validateDatabaseVersion()
       throws IOException, SQLException {
     final boolean checkDB = this.props
@@ -526,6 +541,14 @@ public class AzkabanWebServer extends AzkabanServer {
       this.quartzScheduler.start();
     }
 
+    if (this.props.getBoolean(ConfigurationKeys.ENABLE_FLOW_TRIGGER, false)) {
+      // flow trigger service throws exception when any dependency plugin fails to be initialized
+      // (e.x if it's kafka dependency and kafka is down). In this case if azkaban admin still
+      // wishes to start azkaban web server, she can disable flow trigger in the az config file and
+      // restart web server so that regular scheduled flows are not affected.
+      this.flowTriggerService.start();
+    }
+
     try {
       this.server.start();
       logger.info("Server started");
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index 2297905..f84e541 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -17,16 +17,21 @@
 
 package azkaban.webapp;
 
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
-import javax.inject.Inject;
 import com.google.inject.Provides;
-import javax.inject.Singleton;
 import java.lang.reflect.Constructor;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -45,10 +50,26 @@ public class AzkabanWebServerModule extends AbstractModule {
   private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
   private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
 
+  @Provides
+  @Singleton
+  public FlowTriggerDependencyPluginManager getDependencyPluginManager(final Props props)
+      throws FlowTriggerDependencyPluginException {
+    //todo chengren311: disable requireNonNull for now in beta since dependency plugin dir is not
+    // required. Add it back when flow trigger feature is enabled in production
+    String dependencyPluginDir;
+    try {
+      dependencyPluginDir = props.getString(ConfigurationKeys.DEPENDENCY_PLUGIN_DIR);
+    } catch (final Exception ex) {
+      dependencyPluginDir = null;
+    }
+    return new FlowTriggerDependencyPluginManager(dependencyPluginDir);
+  }
+
   @Override
   protected void configure() {
     bind(Server.class).toProvider(WebServerProvider.class);
     bind(ScheduleLoader.class).to(TriggerBasedScheduleLoader.class);
+    bind(FlowTriggerInstanceLoader.class).to(JdbcFlowTriggerInstanceLoaderImpl.class);
   }
 
   @Inject
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
index abf30d7..a265c34 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
@@ -34,6 +34,7 @@ public class FlowTriggerDependencyPluginManagerTest {
     final URL url = FlowTriggerDependencyPluginManagerTest.class.getClassLoader()
         .getResource(pluginDir);
     pluginManager = new FlowTriggerDependencyPluginManager(url.getPath());
+    pluginManager.loadAllPlugins();
   }
 
 
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index f0195a7..99ff8f6 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -74,6 +74,7 @@ public class FlowTriggerServiceTest {
 
     flowTriggerService = new FlowTriggerService(pluginManager,
         triggerInstProcessor, depInstProcessor, flowTriggerInstanceLoader);
+    flowTriggerService.start();
   }
 
   @Before
@@ -206,7 +207,7 @@ public class FlowTriggerServiceTest {
     Thread.sleep(Duration.ofSeconds(1).toMillis());
     flowTriggerService.shutdown();
     setup();
-    flowTriggerService.recoverIncompleteTriggerInstances();
+    flowTriggerService.start();
     Thread.sleep(Duration.ofSeconds(5).toMillis());
     final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
     assertThat(triggerInstances).hasSize(30);