azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java 10(+8 -2)
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 337b0d7..f7ca8bc 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -89,6 +89,7 @@ public class Constants {
// The flow exec id for a flow trigger instance which hasn't started a flow yet
public static final int UNASSIGNED_EXEC_ID = -1;
+
public static class ConfigurationKeys {
// Configures Azkaban Flow Version in project YAML file
@@ -191,7 +192,13 @@ public class Constants {
// enable Quartz Scheduler if true.
public static final String ENABLE_QUARTZ = "azkaban.server.schedule.enable_quartz";
+ // enable Flow trigger if true.
+ public static final String ENABLE_FLOW_TRIGGER = "azkaban.server.flowtrigger.enabled";
+
public static final String CUSTOM_CREDENTIAL_NAME = "azkaban.security.credential";
+
+ // dir to keep dependency plugins
+ public static final String DEPENDENCY_PLUGIN_DIR = "azkaban.dependency.plugin.dir";
}
public static class FlowProperties {
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index ea0ce1b..8225b93 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -18,6 +18,7 @@ package azkaban.flowtrigger;
import azkaban.Constants;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
import azkaban.project.FlowTrigger;
import azkaban.project.FlowTriggerDependency;
@@ -97,6 +98,11 @@ public class FlowTriggerService {
this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
}
+ public void start() throws FlowTriggerDependencyPluginException {
+ this.triggerPluginManager.loadAllPlugins();
+ this.recoverIncompleteTriggerInstances();
+ }
+
private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
starttimeInMills) throws Exception {
final DependencyCheck dependencyCheck = this.triggerPluginManager
@@ -241,7 +247,7 @@ public class FlowTriggerService {
/**
* Resume executions of all incomplete trigger instances by recovering the state from db.
*/
- public void recoverIncompleteTriggerInstances() {
+ private void recoverIncompleteTriggerInstances() {
final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
.getIncompleteTriggerInstances();
//todo chengren311: what if flow trigger is not found?
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
index c524f27..ddb860e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/plugin/FlowTriggerDependencyPluginManager.java
@@ -60,7 +60,6 @@ public class FlowTriggerDependencyPluginManager {
FlowTriggerDependencyPluginException {
this.dependencyTypeMap = new ConcurrentHashMap<>();
this.pluginDir = pluginDir;
- this.loadAllDependencyPlugins();
}
private Map<String, String> readConfig(final File file) throws
@@ -159,7 +158,14 @@ public class FlowTriggerDependencyPluginManager {
}
}
- private void loadAllDependencyPlugins() throws FlowTriggerDependencyPluginException {
+ /**
+ * Initialize all dependency plugins.
+ * todo chengren311: Current design aborts loadAllPlugins if any of the plugin fails to be
+ * initialized.
+ * However, this might not be the optimal design. Suppose we have two dependency plugin types
+ * - MySQL and Kafka, if MySQL is down, then kafka dependency type will also be unavailable.
+ */
+ public void loadAllPlugins() throws FlowTriggerDependencyPluginException {
final File pluginDir = new File(this.pluginDir);
for (final File dir : pluginDir.listFiles()) {
loadDependencyPlugin(dir);
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 1991485..b4e573e 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -124,7 +124,8 @@ public class QuartzScheduler {
* duplicate quartz schedules, we design the naming convention depending on use cases: <ul>
* <li>User flow schedule: we use {@link org.quartz.JobKey#JobKey} to represent the identity of a
* flow's schedule. The format follows "$projectID_$flowName" to guarantee no duplicates.
- * <li>Quartz schedule for AZ internal use: the groupName should start with letters, rather than
+ * <li>Quartz schedule for AZ internal use: the groupName should start with letters,
+ * rather than
* number, which is the first case.</ul>
*/
public void registerJob(final String cronExpression, final QuartzJobDescription jobDescription)
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 50f26ad..3cfb8af 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -24,6 +24,7 @@ import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
+import azkaban.flowtrigger.FlowTriggerService;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
@@ -142,7 +143,7 @@ public class AzkabanWebServer extends AzkabanServer {
private final SessionCache sessionCache;
private final List<ObjectName> registeredMBeans = new ArrayList<>();
private final QuartzScheduler quartzScheduler;
-
+ private final FlowTriggerService flowTriggerService;
private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
@@ -158,6 +159,7 @@ public class AzkabanWebServer extends AzkabanServer {
final ScheduleManager scheduleManager,
final VelocityEngine velocityEngine,
final QuartzScheduler quartzScheduler,
+ final FlowTriggerService flowTriggerService,
final StatusService statusService) {
this.props = requireNonNull(props, "props is null.");
this.server = requireNonNull(server, "server is null.");
@@ -170,6 +172,7 @@ public class AzkabanWebServer extends AzkabanServer {
this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
this.velocityEngine = requireNonNull(velocityEngine, "velocityEngine is null.");
this.quartzScheduler = requireNonNull(quartzScheduler, "quartzScheduler is null.");
+ this.flowTriggerService = requireNonNull(flowTriggerService, "flowTriggerService is null.");
this.statusService = statusService;
loadBuiltinCheckersAndActions();
@@ -239,6 +242,14 @@ public class AzkabanWebServer extends AzkabanServer {
}
try {
+ if (webServer.flowTriggerService != null) {
+ webServer.flowTriggerService.shutdown();
+ }
+ } catch (final Exception e) {
+ logger.error(("Exception while shutting down flow trigger service."), e);
+ }
+
+ try {
logTopMemoryConsumers();
} catch (final Exception e) {
logger.error(("Exception when logging top memory consumers"), e);
@@ -442,6 +453,10 @@ public class AzkabanWebServer extends AzkabanServer {
ve.addProperty("jar.resource.loader.path", jarResourcePath);
}
+ public FlowTriggerService getFlowTriggerService() {
+ return this.flowTriggerService;
+ }
+
private void validateDatabaseVersion()
throws IOException, SQLException {
final boolean checkDB = this.props
@@ -526,6 +541,14 @@ public class AzkabanWebServer extends AzkabanServer {
this.quartzScheduler.start();
}
+ if (this.props.getBoolean(ConfigurationKeys.ENABLE_FLOW_TRIGGER, false)) {
+ // flow trigger service throws exception when any dependency plugin fails to be initialized
+ // (e.x if it's kafka dependency and kafka is down). In this case if azkaban admin still
+ // wishes to start azkaban web server, she can disable flow trigger in the az config file and
+ // restart web server so that regular scheduled flows are not affected.
+ this.flowTriggerService.start();
+ }
+
try {
this.server.start();
logger.info("Server started");
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index 2297905..f84e541 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -17,16 +17,21 @@
package azkaban.webapp;
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.TriggerBasedScheduleLoader;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.Props;
import com.google.inject.AbstractModule;
-import javax.inject.Inject;
import com.google.inject.Provides;
-import javax.inject.Singleton;
import java.lang.reflect.Constructor;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -45,10 +50,26 @@ public class AzkabanWebServerModule extends AbstractModule {
private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
+ @Provides
+ @Singleton
+ public FlowTriggerDependencyPluginManager getDependencyPluginManager(final Props props)
+ throws FlowTriggerDependencyPluginException {
+ //todo chengren311: disable requireNonNull for now in beta since dependency plugin dir is not
+ // required. Add it back when flow trigger feature is enabled in production
+ String dependencyPluginDir;
+ try {
+ dependencyPluginDir = props.getString(ConfigurationKeys.DEPENDENCY_PLUGIN_DIR);
+ } catch (final Exception ex) {
+ dependencyPluginDir = null;
+ }
+ return new FlowTriggerDependencyPluginManager(dependencyPluginDir);
+ }
+
@Override
protected void configure() {
bind(Server.class).toProvider(WebServerProvider.class);
bind(ScheduleLoader.class).to(TriggerBasedScheduleLoader.class);
+ bind(FlowTriggerInstanceLoader.class).to(JdbcFlowTriggerInstanceLoaderImpl.class);
}
@Inject
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
index abf30d7..a265c34 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerDependencyPluginManagerTest.java
@@ -34,6 +34,7 @@ public class FlowTriggerDependencyPluginManagerTest {
final URL url = FlowTriggerDependencyPluginManagerTest.class.getClassLoader()
.getResource(pluginDir);
pluginManager = new FlowTriggerDependencyPluginManager(url.getPath());
+ pluginManager.loadAllPlugins();
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index f0195a7..99ff8f6 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -74,6 +74,7 @@ public class FlowTriggerServiceTest {
flowTriggerService = new FlowTriggerService(pluginManager,
triggerInstProcessor, depInstProcessor, flowTriggerInstanceLoader);
+ flowTriggerService.start();
}
@Before
@@ -206,7 +207,7 @@ public class FlowTriggerServiceTest {
Thread.sleep(Duration.ofSeconds(1).toMillis());
flowTriggerService.shutdown();
setup();
- flowTriggerService.recoverIncompleteTriggerInstances();
+ flowTriggerService.start();
Thread.sleep(Duration.ofSeconds(5).toMillis());
final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
assertThat(triggerInstances).hasSize(30);