TriggerInstanceProcessor.java

171 lines | 6.892 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.flowtrigger;

import azkaban.Constants;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.flow.Flow;
import azkaban.flow.FlowUtils;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.project.Project;
import azkaban.utils.EmailMessage;
import azkaban.utils.Emailer;
import azkaban.utils.Utils;
import com.google.common.base.Preconditions;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@SuppressWarnings("FutureReturnValueIgnored")
public class TriggerInstanceProcessor {

  private static final Logger logger = LoggerFactory.getLogger(TriggerInstanceProcessor.class);
  private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for flow '%s', project '%s' "
      + "has been cancelled on %s";
  private final static int THREAD_POOL_SIZE = 32;
  private final ExecutorManager executorManager;
  private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
  private final Emailer emailer;
  private final ExecutorService executorService;

  @Inject
  public TriggerInstanceProcessor(final ExecutorManager executorManager,
      final FlowTriggerInstanceLoader flowTriggerInstanceLoader,
      final Emailer emailer) {
    Preconditions.checkNotNull(executorManager);
    Preconditions.checkNotNull(flowTriggerInstanceLoader);
    Preconditions.checkNotNull(emailer);
    this.emailer = emailer;
    this.executorManager = executorManager;
    this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
    this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
  }

  private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
    try {
      final Project project = triggerInst.getProject();
      final Flow flow = FlowUtils.getFlow(project, triggerInst.getFlowId());
      final ExecutableFlow executableFlow = FlowUtils.createExecutableFlow(project, flow);
      // execute the flow with default execution option(concurrency option being "ignore
      // currently running")
      this.executorManager.submitExecutableFlow(executableFlow, triggerInst.getSubmitUser());
      triggerInst.setFlowExecId(executableFlow.getExecutionId());
    } catch (final Exception ex) {
      logger.error("exception when executing the associated flow and updating flow exec id for "
              + "trigger instance[id: {}]",
          triggerInst.getId(), ex);
      // if flow fails to be executed(e.g. running execution exceeds the allowed concurrent run
      // limit), set associated flow exec id to Constants.FAILED_EXEC_ID. Upon web server
      // restart, recovery process will skip those flows.
      triggerInst.setFlowExecId(Constants.FAILED_EXEC_ID);
    }

    this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInst);
  }

  private String generateFailureEmailSubject(final TriggerInstance triggerInstance) {
    return String.format(FAILURE_EMAIL_SUBJECT, triggerInstance.getFlowId(), triggerInstance
        .getProjectName(), this.emailer.getAzkabanName());
  }

  private EmailMessage createFlowTriggerFailureEmailMessage(final TriggerInstance triggerInst) {
    final EmailMessage message = this.emailer.createEmailMessage(generateFailureEmailSubject
        (triggerInst), "text/html", triggerInst.getFailureEmails());
    final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    message.addAllToAddress(triggerInst.getFailureEmails());
    message.setMimeType("text/html");
    message.println("<table>");
    message.println("<tr><td>Start Time</td><td>");
    message.println("<tr><td>" + sdf.format(new Date(triggerInst.getStartTime())) + "</td><td>");

    message.println("<tr><td>End Time</td><td>");
    message.println("<tr><td>" + sdf.format(new Date(triggerInst.getEndTime())) + "</td><td>");
    message.println("<tr><td>Duration</td><td>"
        + Utils.formatDuration(triggerInst.getStartTime(), triggerInst.getEndTime())
        + "</td></tr>");
    message.println("<tr><td>Status</td><td>" + triggerInst.getStatus() + "</td></tr>");
    message.println("</table>");
    message.println("");
    final String executionUrl = this.emailer.getAzkabanURL() + "/executor?triggerinstanceid="
        + triggerInst.getId();

    message.println("<a href=\"" + executionUrl + "\">" + triggerInst.getFlowId()
        + " Flow Trigger Instance Link</a>");

    message.println("");
    message.println("<h3>Cancelled Dependencies</h3>");

    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
      if (depInst.getStatus() == Status.CANCELLED) {
        message.println("<table>");
        message.println("<tr><td>Dependency Name: " + depInst.getDepName() + "</td><td>");
        message
            .println("<tr><td>Cancellation Cause: " + depInst.getCancellationCause() + "</td><td>");
        message.println("</table>");
      }
    }

    return message;
  }

  private void sendFailureEmailIfConfigured(final TriggerInstance triggerInstance) {
    final List<String> failureEmails = triggerInstance.getFailureEmails();
    if (!failureEmails.isEmpty()) {
      final EmailMessage message = this.createFlowTriggerFailureEmailMessage(triggerInstance);
      this.emailer.sendEmail(message, true, "email message failure email for flow trigger "
          + triggerInstance.getId());
    }
  }

  /**
   * Process the case where status of trigger instance becomes success
   */
  public void processSucceed(final TriggerInstance triggerInst) {
    //todo chengren311: publishing Trigger events to Azkaban Project Events page
    executeFlowAndUpdateExecID(triggerInst);
  }

  /**
   * Process the case where status of trigger instance becomes cancelled
   */
  public void processTermination(final TriggerInstance triggerInst) {
    //sendFailureEmailIfConfigured takes 1/3 secs
    this.executorService.submit(() -> sendFailureEmailIfConfigured(triggerInst));
  }

  /**
   * Process the case where a new trigger instance is created
   */
  public void processNewInstance(final TriggerInstance triggerInst) {
    this.flowTriggerInstanceLoader.uploadTriggerInstance(triggerInst);
  }

  public void shutdown() {
    this.executorService.shutdown();
    this.executorService.shutdownNow();
  }
}