azkaban-developers
Changes
azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java 4(+4 -0)
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index f7ca8bc..6c600a7 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -189,12 +189,9 @@ public class Constants {
**/
public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
- // enable Quartz Scheduler if true.
+ // enable quartz scheduler and flow trigger if true.
public static final String ENABLE_QUARTZ = "azkaban.server.schedule.enable_quartz";
- // enable Flow trigger if true.
- public static final String ENABLE_FLOW_TRIGGER = "azkaban.server.flowtrigger.enabled";
-
public static final String CUSTOM_CREDENTIAL_NAME = "azkaban.security.credential";
// dir to keep dependency plugins
diff --git a/az-core/src/main/java/azkaban/utils/Utils.java b/az-core/src/main/java/azkaban/utils/Utils.java
index dc82302..3f84e25 100644
--- a/az-core/src/main/java/azkaban/utils/Utils.java
+++ b/az-core/src/main/java/azkaban/utils/Utils.java
@@ -506,4 +506,5 @@ public class Utils {
}
return true;
}
+
}
diff --git a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
index 6e74471..454898c 100644
--- a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
+++ b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
@@ -17,6 +17,7 @@
package azkaban.project;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -26,7 +27,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
* It couldn't be changed once gets constructed.
* It will be used to schedule a trigger.
*/
-public class CronSchedule {
+public class CronSchedule implements Serializable {
private final String cronExpression;
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 89e3f6a..5debf39 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -32,6 +32,7 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import com.google.common.io.Files;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +58,7 @@ public class ProjectManager {
private final ConcurrentHashMap<String, Project> projectsByName =
new ConcurrentHashMap<>();
+
@Inject
public ProjectManager(final AzkabanProjectLoader azkabanProjectLoader,
final ProjectLoader loader,
@@ -82,6 +84,32 @@ public class ProjectManager {
loadProjectWhiteList();
}
+ public boolean hasFlowTrigger(final Project project, final Flow flow)
+ throws IOException, ProjectManagerException {
+ final String flowFileName = flow.getId() + ".flow";
+ final int latestFlowVersion = this.projectLoader.getLatestFlowVersion(project.getId(), flow
+ .getVersion(), flowFileName);
+ if (latestFlowVersion > 0) {
+ final File tempDir = com.google.common.io.Files.createTempDir();
+ final File flowFile;
+ try {
+ flowFile = this.projectLoader
+ .getUploadedFlowFile(project.getId(), project.getVersion(),
+ flowFileName, latestFlowVersion, tempDir);
+
+ final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+ return flowTrigger != null;
+ } catch (final Exception ex) {
+ logger.error("error in getting flow file", ex);
+ throw ex;
+ } finally {
+ FlowLoaderUtils.cleanUpDir(tempDir);
+ }
+ } else {
+ return false;
+ }
+ }
+
private void loadAllProjects() {
final List<Project> projects;
try {
diff --git a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
index e365c98..9cdfa3a 100644
--- a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
@@ -41,6 +41,10 @@ public class WebUtils {
return DateTimeFormat.forPattern(DATE_TIME_STRING).print(timeMS);
}
+ public long currentTimestamp() {
+ return System.currentTimeMillis();
+ }
+
public String formatDuration(final long startTime, final long endTime) {
if (startTime == -1) {
return "-";
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 8225b93..f901e34 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -247,7 +247,7 @@ public class FlowTriggerService {
/**
* Resume executions of all incomplete trigger instances by recovering the state from db.
*/
- private void recoverIncompleteTriggerInstances() {
+ public void recoverIncompleteTriggerInstances() {
final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
.getIncompleteTriggerInstances();
//todo chengren311: what if flow trigger is not found?
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
new file mode 100644
index 0000000..e0eab9f
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.quartz;
+
+import azkaban.flow.FlowUtils;
+import azkaban.flowtrigger.FlowTriggerService;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.scheduler.AbstractQuartzJob;
+import javax.inject.Inject;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+
+
+public class FlowTriggerQuartzJob extends AbstractQuartzJob {
+
+ public static final String SUBMIT_USER = "SUBMIT_USER";
+ public static final String PROJECT = "PROJECT";
+ public static final String FLOW_TRIGGER = "FLOW_TRIGGER";
+ public static final String FLOW_ID = "FLOW_ID";
+ public static final String FLOW_VERSION = "FLOW_VERSION";
+
+ private final FlowTriggerService triggerService;
+
+ @Inject
+ public FlowTriggerQuartzJob(final FlowTriggerService service) {
+ this.triggerService = service;
+ }
+
+ @Override
+ public void execute(final JobExecutionContext context) {
+ final JobDataMap data = context.getMergedJobDataMap();
+ final String projectJson = data.getString(PROJECT);
+ final Project project = FlowUtils.toProject(projectJson);
+
+ final String flowId = data.getString(FLOW_ID);
+ final int flowVersion = data.getInt(FLOW_VERSION);
+ final String submitUser = data.getString(SUBMIT_USER);
+ final FlowTrigger flowTrigger = (FlowTrigger) data.get(FLOW_TRIGGER);
+ this.triggerService.startTrigger(flowTrigger, flowId, flowVersion, submitUser, project);
+ }
+}
+
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
new file mode 100644
index 0000000..c794213
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.quartz;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.flow.Flow;
+import azkaban.flow.FlowUtils;
+import azkaban.project.FlowLoaderUtils;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.scheduler.QuartzJobDescription;
+import azkaban.scheduler.QuartzScheduler;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class FlowTriggerScheduler {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowTriggerScheduler.class);
+ private final ProjectLoader projectLoader;
+ private final QuartzScheduler scheduler;
+
+ @Inject
+ public FlowTriggerScheduler(final ProjectLoader projectLoader, final QuartzScheduler scheduler) {
+ this.projectLoader = requireNonNull(projectLoader);
+ this.scheduler = requireNonNull(scheduler);
+ }
+
+ /**
+ * Schedule flows containing flow triggers
+ */
+ public void scheduleAll(final Project project, final String submitUser)
+ throws SchedulerException, ProjectManagerException {
+ //todo chengren311: schedule on uploading via CRT
+
+ for (final Flow flow : project.getFlows()) {
+ //todo chengren311: we should validate embedded flow shouldn't have flow trigger defined.
+ if (flow.isEmbeddedFlow()) {
+ // skip scheduling embedded flow since embedded flow are not allowed to have flow trigger
+ continue;
+ }
+ final String flowFileName = flow.getId() + ".flow";
+ final int latestFlowVersion = this.projectLoader
+ .getLatestFlowVersion(flow.getProjectId(), flow
+ .getVersion(), flowFileName);
+ if (latestFlowVersion > 0) {
+ final File tempDir = Files.createTempDir();
+ final File flowFile;
+ try {
+ flowFile = this.projectLoader
+ .getUploadedFlowFile(project.getId(), project.getVersion(),
+ flowFileName, latestFlowVersion, tempDir);
+
+ final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+
+ if (flowTrigger != null) {
+ final String projectJson = FlowUtils.toJson(project);
+ final Map<String, Object> contextMap = ImmutableMap
+ .of(FlowTriggerQuartzJob.SUBMIT_USER, submitUser,
+ FlowTriggerQuartzJob.FLOW_TRIGGER, flowTrigger,
+ FlowTriggerQuartzJob.FLOW_ID, flow.getId(),
+ FlowTriggerQuartzJob.FLOW_VERSION, latestFlowVersion,
+ FlowTriggerQuartzJob.PROJECT, projectJson);
+ logger.info("scheduling flow " + flow.getProjectId() + "." + flow.getId());
+ this.scheduler
+ .registerJob(flowTrigger.getSchedule().getCronExpression(), new QuartzJobDescription
+ (FlowTriggerQuartzJob.class, generateGroupName(flow), contextMap));
+ }
+ } catch (final Exception ex) {
+ logger.error("error in getting flow file", ex);
+ } finally {
+ FlowLoaderUtils.cleanUpDir(tempDir);
+ }
+ }
+ }
+ }
+
+ /**
+ * Retrieve the list of scheduled flow triggers from quartz database
+ */
+ public List<ScheduledFlowTrigger> getScheduledFlowTriggerJobs() {
+ final Scheduler quartzScheduler = this.scheduler.getScheduler();
+ try {
+ final List<String> groupNames = quartzScheduler.getJobGroupNames();
+
+ final List<ScheduledFlowTrigger> flowTriggerJobDetails = new ArrayList<>();
+ for (final String groupName : groupNames) {
+ final JobKey jobKey = new JobKey(QuartzScheduler.DEFAULT_JOB_NAME, groupName);
+ ScheduledFlowTrigger scheduledFlowTrigger = null;
+ try {
+ final JobDetail job = quartzScheduler.getJobDetail(jobKey);
+ final JobDataMap jobDataMap = job.getJobDataMap();
+
+ final String flowId = jobDataMap.getString(FlowTriggerQuartzJob.FLOW_ID);
+
+ final String projectJson = jobDataMap.getString(FlowTriggerQuartzJob.PROJECT);
+ final Project project = FlowUtils.toProject(projectJson);
+ final FlowTrigger flowTrigger = (FlowTrigger) jobDataMap
+ .get(FlowTriggerQuartzJob.FLOW_TRIGGER);
+ final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
+ final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
+ scheduledFlowTrigger = new ScheduledFlowTrigger(
+ project.getName(),
+ flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
+ : quartzTriggers.get(0));
+ } catch (final Exception ex) {
+ logger.error(String.format("unable to get flow trigger by job key %s", jobKey), ex);
+ scheduledFlowTrigger = null;
+ }
+
+ flowTriggerJobDetails.add(scheduledFlowTrigger);
+ }
+ return flowTriggerJobDetails;
+ } catch (final SchedulerException ex) {
+ logger.error("unable to get scheduled flow triggers", ex);
+ return new ArrayList<>();
+ }
+ }
+
+ /**
+ * Unschedule all possible flows in a project
+ */
+ public void unscheduleAll(final Project project) throws SchedulerException {
+ for (final Flow flow : project.getFlows()) {
+ logger.info("unscheduling flow" + flow.getProjectId() + "." + flow.getId() + " if it has "
+ + " schedule");
+ if (!flow.isEmbeddedFlow()) {
+ this.scheduler.unregisterJob(generateGroupName(flow));
+ }
+ }
+ }
+
+ private String generateGroupName(final Flow flow) {
+ return String.valueOf(flow.getProjectId()) + "." + flow.getId();
+ }
+
+ public void start() {
+ this.scheduler.start();
+ }
+
+ public void shutdown() {
+ this.scheduler.shutdown();
+ }
+
+ public static class ScheduledFlowTrigger {
+
+ private final String projectName;
+ private final String flowId;
+ private final FlowTrigger flowTrigger;
+ private final Trigger quartzTrigger;
+ private final String submitUser;
+
+ public ScheduledFlowTrigger(final String projectName, final String flowId,
+ final FlowTrigger flowTrigger, final String submitUser,
+ final Trigger quartzTrigger) {
+ this.projectName = projectName;
+ this.flowId = flowId;
+ this.flowTrigger = flowTrigger;
+ this.submitUser = submitUser;
+ this.quartzTrigger = quartzTrigger;
+ }
+
+ public String getProjectName() {
+ return this.projectName;
+ }
+
+ public String getFlowId() {
+ return this.flowId;
+ }
+
+ public FlowTrigger getFlowTrigger() {
+ return this.flowTrigger;
+ }
+
+ public Trigger getQuartzTrigger() {
+ return this.quartzTrigger;
+ }
+
+ public String getSubmitUser() {
+ return this.submitUser;
+ }
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index b4e573e..ddcacc6 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class QuartzScheduler {
//Unless specified, all Quartz jobs's identities comes with the default job name.
- private static final String DEFAULT_JOB_NAME = "job1";
+ public static final String DEFAULT_JOB_NAME = "job1";
private static final Logger logger = LoggerFactory.getLogger(QuartzScheduler.class);
private Scheduler scheduler = null;
@@ -108,7 +108,11 @@ public class QuartzScheduler {
}
}
- public void unregisterJob(final String groupName) throws SchedulerException {
+ /**
+ * Unregister a job given the groupname. Since unregister might be called when
+ * concurrently removing projects, so synchronized is added to ensure thread safety.
+ */
+ public synchronized void unregisterJob(final String groupName) throws SchedulerException {
if (!ifJobExist(groupName)) {
logger.warn("can not find job with " + groupName + " in quartz.");
} else {
@@ -117,18 +121,20 @@ public class QuartzScheduler {
}
/**
- * Only cron schedule register is supported.
+ * Only cron schedule register is supported. Since register might be called when
+ * concurrently uploading projects, so synchronized is added to ensure thread safety.
*
* @param cronExpression the cron schedule for this job
* @param jobDescription Regarding QuartzJobDescription#groupName, in order to guarantee no
* duplicate quartz schedules, we design the naming convention depending on use cases: <ul>
- * <li>User flow schedule: we use {@link org.quartz.JobKey#JobKey} to represent the identity of a
- * flow's schedule. The format follows "$projectID_$flowName" to guarantee no duplicates.
- * <li>Quartz schedule for AZ internal use: the groupName should start with letters,
- * rather than
+ * <li>User flow schedule: we use {@link JobKey#JobKey} to represent the identity of a
+ * flow's schedule. The format follows "$projectID.$flowName" to guarantee no duplicates.
+ * <li>Quartz schedule for AZ internal use: the groupName should start with letters, rather
+ * than
* number, which is the first case.</ul>
*/
- public void registerJob(final String cronExpression, final QuartzJobDescription jobDescription)
+ public synchronized void registerJob(final String cronExpression, final QuartzJobDescription
+ jobDescription)
throws SchedulerException {
requireNonNull(jobDescription, "jobDescription is null");
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index f1cfc51..75a033c 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -25,12 +25,12 @@ import azkaban.Constants.ConfigurationKeys;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.flowtrigger.FlowTriggerService;
+import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
import azkaban.metrics.MetricsManager;
import azkaban.project.ProjectManager;
-import azkaban.scheduler.QuartzScheduler;
import azkaban.scheduler.ScheduleManager;
import azkaban.server.AzkabanServer;
import azkaban.server.session.SessionCache;
@@ -143,7 +143,7 @@ public class AzkabanWebServer extends AzkabanServer {
private final Props props;
private final SessionCache sessionCache;
private final List<ObjectName> registeredMBeans = new ArrayList<>();
- private final QuartzScheduler quartzScheduler;
+ private final FlowTriggerScheduler scheduler;
private final FlowTriggerService flowTriggerService;
private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
@@ -159,7 +159,7 @@ public class AzkabanWebServer extends AzkabanServer {
final UserManager userManager,
final ScheduleManager scheduleManager,
final VelocityEngine velocityEngine,
- final QuartzScheduler quartzScheduler,
+ final FlowTriggerScheduler scheduler,
final FlowTriggerService flowTriggerService,
final StatusService statusService) {
this.props = requireNonNull(props, "props is null.");
@@ -172,9 +172,9 @@ public class AzkabanWebServer extends AzkabanServer {
this.userManager = requireNonNull(userManager, "userManager is null.");
this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
this.velocityEngine = requireNonNull(velocityEngine, "velocityEngine is null.");
- this.quartzScheduler = requireNonNull(quartzScheduler, "quartzScheduler is null.");
- this.flowTriggerService = requireNonNull(flowTriggerService, "flowTriggerService is null.");
this.statusService = statusService;
+ this.scheduler = requireNonNull(scheduler, "scheduler is null.");
+ this.flowTriggerService = requireNonNull(flowTriggerService, "flow trigger service is null");
loadBuiltinCheckersAndActions();
@@ -234,12 +234,9 @@ public class AzkabanWebServer extends AzkabanServer {
@Override
public void run() {
- try {
- if (webServer.quartzScheduler != null) {
- webServer.quartzScheduler.shutdown();
- }
- } catch (final Exception e) {
- logger.error(("Exception while shutting down quartz scheduler."), e);
+ if (webServer.scheduler != null) {
+ logger.info("Shutting down flow trigger scheduler...");
+ webServer.scheduler.shutdown();
}
try {
@@ -251,17 +248,16 @@ public class AzkabanWebServer extends AzkabanServer {
}
try {
+ logger.info("Logging top memory consumers...");
logTopMemoryConsumers();
- } catch (final Exception e) {
- logger.error(("Exception when logging top memory consumers"), e);
- }
- logger.info("Shutting down http server...");
- try {
+ logger.info("Shutting down http server...");
webServer.close();
+
} catch (final Exception e) {
- logger.error("Error while shutting down http server.", e);
+ logger.error(("Exception while shutting down web server."), e);
}
+
logger.info("kk thx bye.");
}
@@ -458,6 +454,10 @@ public class AzkabanWebServer extends AzkabanServer {
return this.flowTriggerService;
}
+ public FlowTriggerScheduler getScheduler() {
+ return this.scheduler;
+ }
+
private void validateDatabaseVersion()
throws IOException, SQLException {
final boolean checkDB = this.props
@@ -540,14 +540,7 @@ public class AzkabanWebServer extends AzkabanServer {
}
if (this.props.getBoolean(ConfigurationKeys.ENABLE_QUARTZ, false)) {
- this.quartzScheduler.start();
- }
-
- if (this.props.getBoolean(ConfigurationKeys.ENABLE_FLOW_TRIGGER, false)) {
- // flow trigger service throws exception when any dependency plugin fails to be initialized
- // (e.x if it's kafka dependency and kafka is down). In this case if azkaban admin still
- // wishes to start azkaban web server, she can disable flow trigger in the az config file and
- // restart web server so that regular scheduled flows are not affected.
+ this.scheduler.start();
this.flowTriggerService.start();
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index f84e541..039e864 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -22,6 +22,7 @@ import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.TriggerBasedScheduleLoader;
import azkaban.user.UserManager;
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index f7103ef..f7ef977 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -16,6 +16,7 @@
package azkaban.webapp.servlet;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutorManagerAdapter;
@@ -25,6 +26,7 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
+import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
import azkaban.project.Project;
import azkaban.project.ProjectFileHandler;
import azkaban.project.ProjectLogEvent;
@@ -77,6 +79,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
+import org.quartz.SchedulerException;
public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
@@ -103,9 +106,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private UserManager userManager;
+ private FlowTriggerScheduler scheduler;
private int downloadBufferSize;
private boolean lockdownCreateProjects = false;
private boolean lockdownUploadProjects = false;
+ private boolean enableQuartz = false;
@Override
public void init(final ServletConfig config) throws ServletException {
@@ -116,8 +121,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
this.executorManager = server.getExecutorManager();
this.scheduleManager = server.getScheduleManager();
this.userManager = server.getUserManager();
+ this.scheduler = server.getScheduler();
this.lockdownCreateProjects =
server.getServerProps().getBoolean(LOCKDOWN_CREATE_PROJECTS_KEY, false);
+ this.enableQuartz = server.getServerProps().getBoolean(ConfigurationKeys.ENABLE_QUARTZ, false);
if (this.lockdownCreateProjects) {
logger.info("Creation of projects is locked down");
}
@@ -587,6 +594,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
private void removeAssociatedSchedules(final Project project) throws ServletException {
+ // remove regular schedules
try {
for (final Schedule schedule : this.scheduleManager.getSchedules()) {
if (schedule.getProjectId() == project.getId()) {
@@ -597,6 +605,15 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
} catch (final ScheduleManagerException e) {
throw new ServletException(e);
}
+
+ // remove flow trigger schedules
+ try {
+ if (this.enableQuartz) {
+ this.scheduler.unscheduleAll(project);
+ }
+ } catch (final SchedulerException e) {
+ throw new ServletException(e);
+ }
}
private void handleRemoveProject(final HttpServletRequest req,
@@ -1499,6 +1516,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Flow flow = null;
try {
project = this.projectManager.getProject(projectName);
+
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
page.render();
@@ -1650,6 +1668,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
"Upload: reference of project " + projectName + " is " + System.identityHashCode(project));
final String autoFix = (String) multipart.get("fix");
+
final Props props = new Props();
if (autoFix != null && autoFix.equals("off")) {
props.put(ValidatorConfigs.CUSTOM_AUTO_FIX_FLAG_PARAM, "false");
@@ -1701,9 +1720,19 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
IOUtils.copy(item.getInputStream(), out);
out.close();
+ //unscheduleall/scheduleall should only work with flow which has defined flow trigger
+ //unschedule all flows within the old project
+ if (this.enableQuartz) {
+ this.scheduler.unscheduleAll(project);
+ }
final Map<String, ValidationReport> reports =
this.projectManager.uploadProject(project, archiveFile, type, user,
props);
+
+ if (this.enableQuartz) {
+ //schedule the new project
+ this.scheduler.scheduleAll(project, user.getUserId());
+ }
final StringBuffer errorMsgs = new StringBuffer();
final StringBuffer warnMsgs = new StringBuffer();
for (final Entry<String, ValidationReport> reportEntry : reports.entrySet()) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index fd532ee..7208607 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -61,6 +61,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private ScheduleManager scheduleManager;
private UserManager userManager;
+
@Override
public void init(final ServletConfig config) throws ServletException {
super.init(config);
@@ -90,7 +91,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ajaxSlaInfo(req, ret, session.getUser());
} else if (ajaxName.equals("setSla")) {
ajaxSetSla(req, ret, session.getUser());
- // alias loadFlow is preserved for backward compatibility
+ // alias loadFlow is preserved for backward compatibility
} else if (ajaxName.equals("fetchSchedules") || ajaxName.equals("loadFlow")) {
ajaxFetchSchedules(ret);
} else if (ajaxName.equals("scheduleFlow")) {
@@ -475,6 +476,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
+ @Deprecated
private void ajaxScheduleFlow(final HttpServletRequest req,
final HashMap<String, Object> ret, final User user) throws ServletException {
final String projectName = getParam(req, "projectName");
@@ -591,6 +593,25 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
+ final boolean hasFlowTrigger;
+ try {
+ hasFlowTrigger = this.projectManager.hasFlowTrigger(project, flow);
+ } catch (final Exception ex) {
+ logger.error(ex);
+ ret.put("status", "error");
+ ret.put("message", String.format("Error looking for flow trigger of flow: %s.%s ",
+ projectName, flowName));
+ return;
+ }
+
+ if (hasFlowTrigger) {
+ ret.put("status", "error");
+ ret.put("message", String.format("<font color=\"red\"> Error: Flow %s.%s is already "
+ + "associated with flow trigger, so schedule has to be defined in flow trigger config </font>",
+ projectName, flowName));
+ return;
+ }
+
final DateTimeZone timezone = DateTimeZone.getDefault();
final DateTime firstSchedTime = getPresentTimeByTimezone(timezone);
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java
index 67893ca..a9fd2f8 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java
@@ -24,12 +24,16 @@ import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
@SuppressWarnings("FutureReturnValueIgnored")
public class FakeDependencyInstanceContext1 implements DependencyInstanceContext {
+ private final DependencyInstanceCallback callback;
+
public FakeDependencyInstanceContext1(final DependencyInstanceConfig config,
final DependencyInstanceRuntimeProps runtimeProps,
final DependencyInstanceCallback callback) {
+ this.callback = callback;
}
@Override
public void cancel() {
+ this.callback.onCancel(this);
}
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java
index a6842ef..3699233 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java
@@ -24,12 +24,16 @@ import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
@SuppressWarnings("FutureReturnValueIgnored")
public class FakeDependencyInstanceContext2 implements DependencyInstanceContext {
+ private final DependencyInstanceCallback callback;
+
public FakeDependencyInstanceContext2(final DependencyInstanceConfig config,
final DependencyInstanceRuntimeProps runtimeProps,
final DependencyInstanceCallback callback) {
+ this.callback = callback;
}
@Override
public void cancel() {
+ this.callback.onCancel(this);
}
}
diff --git a/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar
index 7204be4..6d4f3ee 100644
Binary files a/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar and b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar differ