azkaban-aplcache

flow trigger scheduler (#1631) This PR created a wrapper

2/16/2018 7:28:44 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index f7ca8bc..6c600a7 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -189,12 +189,9 @@ public class Constants {
      **/
     public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
 
-    // enable Quartz Scheduler if true.
+    // enable quartz scheduler and flow trigger if true.
     public static final String ENABLE_QUARTZ = "azkaban.server.schedule.enable_quartz";
 
-    // enable Flow trigger if true.
-    public static final String ENABLE_FLOW_TRIGGER = "azkaban.server.flowtrigger.enabled";
-
     public static final String CUSTOM_CREDENTIAL_NAME = "azkaban.security.credential";
 
     // dir to keep dependency plugins
diff --git a/az-core/src/main/java/azkaban/utils/Utils.java b/az-core/src/main/java/azkaban/utils/Utils.java
index dc82302..3f84e25 100644
--- a/az-core/src/main/java/azkaban/utils/Utils.java
+++ b/az-core/src/main/java/azkaban/utils/Utils.java
@@ -506,4 +506,5 @@ public class Utils {
     }
     return true;
   }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
index 6e74471..454898c 100644
--- a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
+++ b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
@@ -17,6 +17,7 @@
 package azkaban.project;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -26,7 +27,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
  * It couldn't be changed once gets constructed.
  * It will be used to schedule a trigger.
  */
-public class CronSchedule {
+public class CronSchedule implements Serializable {
 
   private final String cronExpression;
 
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 89e3f6a..5debf39 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -32,6 +32,7 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import com.google.common.io.Files;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +58,7 @@ public class ProjectManager {
   private final ConcurrentHashMap<String, Project> projectsByName =
       new ConcurrentHashMap<>();
 
+
   @Inject
   public ProjectManager(final AzkabanProjectLoader azkabanProjectLoader,
       final ProjectLoader loader,
@@ -82,6 +84,32 @@ public class ProjectManager {
     loadProjectWhiteList();
   }
 
+  public boolean hasFlowTrigger(final Project project, final Flow flow)
+      throws IOException, ProjectManagerException {
+    final String flowFileName = flow.getId() + ".flow";
+    final int latestFlowVersion = this.projectLoader.getLatestFlowVersion(project.getId(), flow
+        .getVersion(), flowFileName);
+    if (latestFlowVersion > 0) {
+      final File tempDir = com.google.common.io.Files.createTempDir();
+      final File flowFile;
+      try {
+        flowFile = this.projectLoader
+            .getUploadedFlowFile(project.getId(), project.getVersion(),
+                flowFileName, latestFlowVersion, tempDir);
+
+        final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+        return flowTrigger != null;
+      } catch (final Exception ex) {
+        logger.error("error in getting flow file", ex);
+        throw ex;
+      } finally {
+        FlowLoaderUtils.cleanUpDir(tempDir);
+      }
+    } else {
+      return false;
+    }
+  }
+
   private void loadAllProjects() {
     final List<Project> projects;
     try {
diff --git a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
index e365c98..9cdfa3a 100644
--- a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
@@ -41,6 +41,10 @@ public class WebUtils {
     return DateTimeFormat.forPattern(DATE_TIME_STRING).print(timeMS);
   }
 
+  public long currentTimestamp() {
+    return System.currentTimeMillis();
+  }
+
   public String formatDuration(final long startTime, final long endTime) {
     if (startTime == -1) {
       return "-";
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 8225b93..f901e34 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -247,7 +247,7 @@ public class FlowTriggerService {
   /**
    * Resume executions of all incomplete trigger instances by recovering the state from db.
    */
-  private void recoverIncompleteTriggerInstances() {
+  public void recoverIncompleteTriggerInstances() {
     final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
         .getIncompleteTriggerInstances();
     //todo chengren311: what if flow trigger is not found?
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
new file mode 100644
index 0000000..e0eab9f
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.quartz;
+
+import azkaban.flow.FlowUtils;
+import azkaban.flowtrigger.FlowTriggerService;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.scheduler.AbstractQuartzJob;
+import javax.inject.Inject;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+
+
+public class FlowTriggerQuartzJob extends AbstractQuartzJob {
+
+  public static final String SUBMIT_USER = "SUBMIT_USER";
+  public static final String PROJECT = "PROJECT";
+  public static final String FLOW_TRIGGER = "FLOW_TRIGGER";
+  public static final String FLOW_ID = "FLOW_ID";
+  public static final String FLOW_VERSION = "FLOW_VERSION";
+
+  private final FlowTriggerService triggerService;
+
+  @Inject
+  public FlowTriggerQuartzJob(final FlowTriggerService service) {
+    this.triggerService = service;
+  }
+
+  @Override
+  public void execute(final JobExecutionContext context) {
+    final JobDataMap data = context.getMergedJobDataMap();
+    final String projectJson = data.getString(PROJECT);
+    final Project project = FlowUtils.toProject(projectJson);
+
+    final String flowId = data.getString(FLOW_ID);
+    final int flowVersion = data.getInt(FLOW_VERSION);
+    final String submitUser = data.getString(SUBMIT_USER);
+    final FlowTrigger flowTrigger = (FlowTrigger) data.get(FLOW_TRIGGER);
+    this.triggerService.startTrigger(flowTrigger, flowId, flowVersion, submitUser, project);
+  }
+}
+
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
new file mode 100644
index 0000000..c794213
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.quartz;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.flow.Flow;
+import azkaban.flow.FlowUtils;
+import azkaban.project.FlowLoaderUtils;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.scheduler.QuartzJobDescription;
+import azkaban.scheduler.QuartzScheduler;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class FlowTriggerScheduler {
+
+  private static final Logger logger = LoggerFactory.getLogger(FlowTriggerScheduler.class);
+  private final ProjectLoader projectLoader;
+  private final QuartzScheduler scheduler;
+
+  @Inject
+  public FlowTriggerScheduler(final ProjectLoader projectLoader, final QuartzScheduler scheduler) {
+    this.projectLoader = requireNonNull(projectLoader);
+    this.scheduler = requireNonNull(scheduler);
+  }
+
+  /**
+   * Schedule flows containing flow triggers
+   */
+  public void scheduleAll(final Project project, final String submitUser)
+      throws SchedulerException, ProjectManagerException {
+    //todo chengren311: schedule on uploading via CRT
+
+    for (final Flow flow : project.getFlows()) {
+      //todo chengren311: we should validate embedded flow shouldn't have flow trigger defined.
+      if (flow.isEmbeddedFlow()) {
+        // skip scheduling embedded flow since embedded flow are not allowed to have flow trigger
+        continue;
+      }
+      final String flowFileName = flow.getId() + ".flow";
+      final int latestFlowVersion = this.projectLoader
+          .getLatestFlowVersion(flow.getProjectId(), flow
+              .getVersion(), flowFileName);
+      if (latestFlowVersion > 0) {
+        final File tempDir = Files.createTempDir();
+        final File flowFile;
+        try {
+          flowFile = this.projectLoader
+              .getUploadedFlowFile(project.getId(), project.getVersion(),
+                  flowFileName, latestFlowVersion, tempDir);
+
+          final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+
+          if (flowTrigger != null) {
+            final String projectJson = FlowUtils.toJson(project);
+            final Map<String, Object> contextMap = ImmutableMap
+                .of(FlowTriggerQuartzJob.SUBMIT_USER, submitUser,
+                    FlowTriggerQuartzJob.FLOW_TRIGGER, flowTrigger,
+                    FlowTriggerQuartzJob.FLOW_ID, flow.getId(),
+                    FlowTriggerQuartzJob.FLOW_VERSION, latestFlowVersion,
+                    FlowTriggerQuartzJob.PROJECT, projectJson);
+            logger.info("scheduling flow " + flow.getProjectId() + "." + flow.getId());
+            this.scheduler
+                .registerJob(flowTrigger.getSchedule().getCronExpression(), new QuartzJobDescription
+                    (FlowTriggerQuartzJob.class, generateGroupName(flow), contextMap));
+          }
+        } catch (final Exception ex) {
+          logger.error("error in getting flow file", ex);
+        } finally {
+          FlowLoaderUtils.cleanUpDir(tempDir);
+        }
+      }
+    }
+  }
+
+  /**
+   * Retrieve the list of scheduled flow triggers from quartz database
+   */
+  public List<ScheduledFlowTrigger> getScheduledFlowTriggerJobs() {
+    final Scheduler quartzScheduler = this.scheduler.getScheduler();
+    try {
+      final List<String> groupNames = quartzScheduler.getJobGroupNames();
+
+      final List<ScheduledFlowTrigger> flowTriggerJobDetails = new ArrayList<>();
+      for (final String groupName : groupNames) {
+        final JobKey jobKey = new JobKey(QuartzScheduler.DEFAULT_JOB_NAME, groupName);
+        ScheduledFlowTrigger scheduledFlowTrigger = null;
+        try {
+          final JobDetail job = quartzScheduler.getJobDetail(jobKey);
+          final JobDataMap jobDataMap = job.getJobDataMap();
+
+          final String flowId = jobDataMap.getString(FlowTriggerQuartzJob.FLOW_ID);
+
+          final String projectJson = jobDataMap.getString(FlowTriggerQuartzJob.PROJECT);
+          final Project project = FlowUtils.toProject(projectJson);
+          final FlowTrigger flowTrigger = (FlowTrigger) jobDataMap
+              .get(FlowTriggerQuartzJob.FLOW_TRIGGER);
+          final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
+          final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
+          scheduledFlowTrigger = new ScheduledFlowTrigger(
+              project.getName(),
+              flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
+              : quartzTriggers.get(0));
+        } catch (final Exception ex) {
+          logger.error(String.format("unable to get flow trigger by job key %s", jobKey), ex);
+          scheduledFlowTrigger = null;
+        }
+
+        flowTriggerJobDetails.add(scheduledFlowTrigger);
+      }
+      return flowTriggerJobDetails;
+    } catch (final SchedulerException ex) {
+      logger.error("unable to get scheduled flow triggers", ex);
+      return new ArrayList<>();
+    }
+  }
+
+  /**
+   * Unschedule all possible flows in a project
+   */
+  public void unscheduleAll(final Project project) throws SchedulerException {
+    for (final Flow flow : project.getFlows()) {
+      logger.info("unscheduling flow" + flow.getProjectId() + "." + flow.getId() + " if it has "
+          + " schedule");
+      if (!flow.isEmbeddedFlow()) {
+        this.scheduler.unregisterJob(generateGroupName(flow));
+      }
+    }
+  }
+
+  private String generateGroupName(final Flow flow) {
+    return String.valueOf(flow.getProjectId()) + "." + flow.getId();
+  }
+
+  public void start() {
+    this.scheduler.start();
+  }
+
+  public void shutdown() {
+    this.scheduler.shutdown();
+  }
+
+  public static class ScheduledFlowTrigger {
+
+    private final String projectName;
+    private final String flowId;
+    private final FlowTrigger flowTrigger;
+    private final Trigger quartzTrigger;
+    private final String submitUser;
+
+    public ScheduledFlowTrigger(final String projectName, final String flowId,
+        final FlowTrigger flowTrigger, final String submitUser,
+        final Trigger quartzTrigger) {
+      this.projectName = projectName;
+      this.flowId = flowId;
+      this.flowTrigger = flowTrigger;
+      this.submitUser = submitUser;
+      this.quartzTrigger = quartzTrigger;
+    }
+
+    public String getProjectName() {
+      return this.projectName;
+    }
+
+    public String getFlowId() {
+      return this.flowId;
+    }
+
+    public FlowTrigger getFlowTrigger() {
+      return this.flowTrigger;
+    }
+
+    public Trigger getQuartzTrigger() {
+      return this.quartzTrigger;
+    }
+
+    public String getSubmitUser() {
+      return this.submitUser;
+    }
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index b4e573e..ddcacc6 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 public class QuartzScheduler {
 
   //Unless specified, all Quartz jobs's identities comes with the default job name.
-  private static final String DEFAULT_JOB_NAME = "job1";
+  public static final String DEFAULT_JOB_NAME = "job1";
   private static final Logger logger = LoggerFactory.getLogger(QuartzScheduler.class);
   private Scheduler scheduler = null;
 
@@ -108,7 +108,11 @@ public class QuartzScheduler {
     }
   }
 
-  public void unregisterJob(final String groupName) throws SchedulerException {
+  /**
+   * Unregister a job given the groupname. Since unregister might be called when
+   * concurrently removing projects, so synchronized is added to ensure thread safety.
+   */
+  public synchronized void unregisterJob(final String groupName) throws SchedulerException {
     if (!ifJobExist(groupName)) {
       logger.warn("can not find job with " + groupName + " in quartz.");
     } else {
@@ -117,18 +121,20 @@ public class QuartzScheduler {
   }
 
   /**
-   * Only cron schedule register is supported.
+   * Only cron schedule register is supported. Since register might be called when
+   * concurrently uploading projects, so synchronized is added to ensure thread safety.
    *
    * @param cronExpression the cron schedule for this job
    * @param jobDescription Regarding QuartzJobDescription#groupName, in order to guarantee no
    * duplicate quartz schedules, we design the naming convention depending on use cases: <ul>
-   * <li>User flow schedule: we use {@link org.quartz.JobKey#JobKey} to represent the identity of a
-   * flow's schedule. The format follows "$projectID_$flowName" to guarantee no duplicates.
-   * <li>Quartz schedule for AZ internal use: the groupName should start with letters,
-   * rather than
+   * <li>User flow schedule: we use {@link JobKey#JobKey} to represent the identity of a
+   * flow's schedule. The format follows "$projectID.$flowName" to guarantee no duplicates.
+   * <li>Quartz schedule for AZ internal use: the groupName should start with letters, rather
+   * than
    * number, which is the first case.</ul>
    */
-  public void registerJob(final String cronExpression, final QuartzJobDescription jobDescription)
+  public synchronized void registerJob(final String cronExpression, final QuartzJobDescription
+      jobDescription)
       throws SchedulerException {
 
     requireNonNull(jobDescription, "jobDescription is null");
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index f1cfc51..75a033c 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -25,12 +25,12 @@ import azkaban.Constants.ConfigurationKeys;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
 import azkaban.flowtrigger.FlowTriggerService;
+import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
 import azkaban.metrics.MetricsManager;
 import azkaban.project.ProjectManager;
-import azkaban.scheduler.QuartzScheduler;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.server.AzkabanServer;
 import azkaban.server.session.SessionCache;
@@ -143,7 +143,7 @@ public class AzkabanWebServer extends AzkabanServer {
   private final Props props;
   private final SessionCache sessionCache;
   private final List<ObjectName> registeredMBeans = new ArrayList<>();
-  private final QuartzScheduler quartzScheduler;
+  private final FlowTriggerScheduler scheduler;
   private final FlowTriggerService flowTriggerService;
   private Map<String, TriggerPlugin> triggerPlugins;
   private MBeanServer mbeanServer;
@@ -159,7 +159,7 @@ public class AzkabanWebServer extends AzkabanServer {
       final UserManager userManager,
       final ScheduleManager scheduleManager,
       final VelocityEngine velocityEngine,
-      final QuartzScheduler quartzScheduler,
+      final FlowTriggerScheduler scheduler,
       final FlowTriggerService flowTriggerService,
       final StatusService statusService) {
     this.props = requireNonNull(props, "props is null.");
@@ -172,9 +172,9 @@ public class AzkabanWebServer extends AzkabanServer {
     this.userManager = requireNonNull(userManager, "userManager is null.");
     this.scheduleManager = requireNonNull(scheduleManager, "scheduleManager is null.");
     this.velocityEngine = requireNonNull(velocityEngine, "velocityEngine is null.");
-    this.quartzScheduler = requireNonNull(quartzScheduler, "quartzScheduler is null.");
-    this.flowTriggerService = requireNonNull(flowTriggerService, "flowTriggerService is null.");
     this.statusService = statusService;
+    this.scheduler = requireNonNull(scheduler, "scheduler is null.");
+    this.flowTriggerService = requireNonNull(flowTriggerService, "flow trigger service is null");
 
     loadBuiltinCheckersAndActions();
 
@@ -234,12 +234,9 @@ public class AzkabanWebServer extends AzkabanServer {
 
       @Override
       public void run() {
-        try {
-          if (webServer.quartzScheduler != null) {
-            webServer.quartzScheduler.shutdown();
-          }
-        } catch (final Exception e) {
-          logger.error(("Exception while shutting down quartz scheduler."), e);
+        if (webServer.scheduler != null) {
+          logger.info("Shutting down flow trigger scheduler...");
+          webServer.scheduler.shutdown();
         }
 
         try {
@@ -251,17 +248,16 @@ public class AzkabanWebServer extends AzkabanServer {
         }
 
         try {
+          logger.info("Logging top memory consumers...");
           logTopMemoryConsumers();
-        } catch (final Exception e) {
-          logger.error(("Exception when logging top memory consumers"), e);
-        }
 
-        logger.info("Shutting down http server...");
-        try {
+          logger.info("Shutting down http server...");
           webServer.close();
+
         } catch (final Exception e) {
-          logger.error("Error while shutting down http server.", e);
+          logger.error(("Exception while shutting down web server."), e);
         }
+
         logger.info("kk thx bye.");
       }
 
@@ -458,6 +454,10 @@ public class AzkabanWebServer extends AzkabanServer {
     return this.flowTriggerService;
   }
 
+  public FlowTriggerScheduler getScheduler() {
+    return this.scheduler;
+  }
+
   private void validateDatabaseVersion()
       throws IOException, SQLException {
     final boolean checkDB = this.props
@@ -540,14 +540,7 @@ public class AzkabanWebServer extends AzkabanServer {
     }
 
     if (this.props.getBoolean(ConfigurationKeys.ENABLE_QUARTZ, false)) {
-      this.quartzScheduler.start();
-    }
-
-    if (this.props.getBoolean(ConfigurationKeys.ENABLE_FLOW_TRIGGER, false)) {
-      // flow trigger service throws exception when any dependency plugin fails to be initialized
-      // (e.x if it's kafka dependency and kafka is down). In this case if azkaban admin still
-      // wishes to start azkaban web server, she can disable flow trigger in the az config file and
-      // restart web server so that regular scheduled flows are not affected.
+      this.scheduler.start();
       this.flowTriggerService.start();
     }
 
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
index f84e541..039e864 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServerModule.java
@@ -22,6 +22,7 @@ import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
 import azkaban.user.UserManager;
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index f7103ef..f7ef977 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -16,6 +16,7 @@
 
 package azkaban.webapp.servlet;
 
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableJobInfo;
 import azkaban.executor.ExecutorManagerAdapter;
@@ -25,6 +26,7 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
+import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
 import azkaban.project.Project;
 import azkaban.project.ProjectFileHandler;
 import azkaban.project.ProjectLogEvent;
@@ -77,6 +79,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
+import org.quartz.SchedulerException;
 
 public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 
@@ -103,9 +106,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
   private UserManager userManager;
+  private FlowTriggerScheduler scheduler;
   private int downloadBufferSize;
   private boolean lockdownCreateProjects = false;
   private boolean lockdownUploadProjects = false;
+  private boolean enableQuartz = false;
 
   @Override
   public void init(final ServletConfig config) throws ServletException {
@@ -116,8 +121,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     this.executorManager = server.getExecutorManager();
     this.scheduleManager = server.getScheduleManager();
     this.userManager = server.getUserManager();
+    this.scheduler = server.getScheduler();
     this.lockdownCreateProjects =
         server.getServerProps().getBoolean(LOCKDOWN_CREATE_PROJECTS_KEY, false);
+    this.enableQuartz = server.getServerProps().getBoolean(ConfigurationKeys.ENABLE_QUARTZ, false);
     if (this.lockdownCreateProjects) {
       logger.info("Creation of projects is locked down");
     }
@@ -587,6 +594,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
   }
 
   private void removeAssociatedSchedules(final Project project) throws ServletException {
+    // remove regular schedules
     try {
       for (final Schedule schedule : this.scheduleManager.getSchedules()) {
         if (schedule.getProjectId() == project.getId()) {
@@ -597,6 +605,15 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     } catch (final ScheduleManagerException e) {
       throw new ServletException(e);
     }
+
+    // remove flow trigger schedules
+    try {
+      if (this.enableQuartz) {
+        this.scheduler.unscheduleAll(project);
+      }
+    } catch (final SchedulerException e) {
+      throw new ServletException(e);
+    }
   }
 
   private void handleRemoveProject(final HttpServletRequest req,
@@ -1499,6 +1516,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     Flow flow = null;
     try {
       project = this.projectManager.getProject(projectName);
+
       if (project == null) {
         page.add("errorMsg", "Project " + projectName + " not found.");
         page.render();
@@ -1650,6 +1668,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         "Upload: reference of project " + projectName + " is " + System.identityHashCode(project));
 
     final String autoFix = (String) multipart.get("fix");
+
     final Props props = new Props();
     if (autoFix != null && autoFix.equals("off")) {
       props.put(ValidatorConfigs.CUSTOM_AUTO_FIX_FLAG_PARAM, "false");
@@ -1701,9 +1720,19 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         IOUtils.copy(item.getInputStream(), out);
         out.close();
 
+        //unscheduleall/scheduleall should only work with flow which has defined flow trigger
+        //unschedule all flows within the old project
+        if (this.enableQuartz) {
+          this.scheduler.unscheduleAll(project);
+        }
         final Map<String, ValidationReport> reports =
             this.projectManager.uploadProject(project, archiveFile, type, user,
                 props);
+
+        if (this.enableQuartz) {
+          //schedule the new project
+          this.scheduler.scheduleAll(project, user.getUserId());
+        }
         final StringBuffer errorMsgs = new StringBuffer();
         final StringBuffer warnMsgs = new StringBuffer();
         for (final Entry<String, ValidationReport> reportEntry : reports.entrySet()) {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index fd532ee..7208607 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -61,6 +61,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
   private ScheduleManager scheduleManager;
   private UserManager userManager;
 
+
   @Override
   public void init(final ServletConfig config) throws ServletException {
     super.init(config);
@@ -90,7 +91,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       ajaxSlaInfo(req, ret, session.getUser());
     } else if (ajaxName.equals("setSla")) {
       ajaxSetSla(req, ret, session.getUser());
-    // alias loadFlow is preserved for backward compatibility
+      // alias loadFlow is preserved for backward compatibility
     } else if (ajaxName.equals("fetchSchedules") || ajaxName.equals("loadFlow")) {
       ajaxFetchSchedules(ret);
     } else if (ajaxName.equals("scheduleFlow")) {
@@ -475,6 +476,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     return;
   }
 
+  @Deprecated
   private void ajaxScheduleFlow(final HttpServletRequest req,
       final HashMap<String, Object> ret, final User user) throws ServletException {
     final String projectName = getParam(req, "projectName");
@@ -591,6 +593,25 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
+    final boolean hasFlowTrigger;
+    try {
+      hasFlowTrigger = this.projectManager.hasFlowTrigger(project, flow);
+    } catch (final Exception ex) {
+      logger.error(ex);
+      ret.put("status", "error");
+      ret.put("message", String.format("Error looking for flow trigger of flow: %s.%s ",
+          projectName, flowName));
+      return;
+    }
+
+    if (hasFlowTrigger) {
+      ret.put("status", "error");
+      ret.put("message", String.format("<font color=\"red\"> Error: Flow %s.%s is already "
+              + "associated with flow trigger, so schedule has to be defined in flow trigger config </font>",
+          projectName, flowName));
+      return;
+    }
+
     final DateTimeZone timezone = DateTimeZone.getDefault();
     final DateTime firstSchedTime = getPresentTimeByTimezone(timezone);
 
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java
index 67893ca..a9fd2f8 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext1.java
@@ -24,12 +24,16 @@ import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
 @SuppressWarnings("FutureReturnValueIgnored")
 public class FakeDependencyInstanceContext1 implements DependencyInstanceContext {
 
+  private final DependencyInstanceCallback callback;
+
   public FakeDependencyInstanceContext1(final DependencyInstanceConfig config,
       final DependencyInstanceRuntimeProps runtimeProps,
       final DependencyInstanceCallback callback) {
+    this.callback = callback;
   }
 
   @Override
   public void cancel() {
+    this.callback.onCancel(this);
   }
 }
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java
index a6842ef..3699233 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/FakeDependencyInstanceContext2.java
@@ -24,12 +24,16 @@ import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
 @SuppressWarnings("FutureReturnValueIgnored")
 public class FakeDependencyInstanceContext2 implements DependencyInstanceContext {
 
+  private final DependencyInstanceCallback callback;
+
   public FakeDependencyInstanceContext2(final DependencyInstanceConfig config,
       final DependencyInstanceRuntimeProps runtimeProps,
       final DependencyInstanceCallback callback) {
+    this.callback = callback;
   }
 
   @Override
   public void cancel() {
+    this.callback.onCancel(this);
   }
 }
diff --git a/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar
index 7204be4..6d4f3ee 100644
Binary files a/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar and b/azkaban-web-server/src/test/resources/az-flow-trigger-dependency-plugin.jar differ