package azkaban.utils;
/*
* Copyright 2012 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.util.ArrayList;
import java.util.List;
import javax.mail.MessagingException;
import org.apache.log4j.Logger;
import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.Status;
import azkaban.sla.SlaOption;
import azkaban.utils.AbstractMailer;
import azkaban.executor.mail.DefaultMailCreator;
import azkaban.executor.mail.MailCreator;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
public class Emailer extends AbstractMailer implements Alerter {
private static Logger logger = Logger.getLogger(Emailer.class);
private boolean testMode = false;
private String clientHostname;
private String clientPortNumber;
private String mailHost;
private String mailUser;
private String mailPassword;
private String mailSender;
private String azkabanName;
public Emailer(Props props) {
super(props);
this.azkabanName = props.getString("azkaban.name", "azkaban");
this.mailHost = props.getString("mail.host", "localhost");
this.mailUser = props.getString("mail.user", "");
this.mailPassword = props.getString("mail.password", "");
this.mailSender = props.getString("mail.sender", "");
int mailTimeout = props.getInt("mail.timeout.millis", 10000);
EmailMessage.setTimeout(mailTimeout);
int connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
EmailMessage.setConnectionTimeout(connectionTimeout);
this.clientHostname = props.getString("jetty.hostname", "localhost");
if (props.getBoolean("jetty.use.ssl", true)) {
this.clientPortNumber = props.getString("jetty.ssl.port");
} else {
this.clientPortNumber = props.getString("jetty.port");
}
testMode = props.getBoolean("test.mode", false);
}
@SuppressWarnings("unchecked")
private void sendSlaAlertEmail(SlaOption slaOption, String slaMessage) {
String subject = "Sla Violation Alert on " + getAzkabanName();
String body = slaMessage;
List<String> emailList = (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
if (emailList != null && !emailList.isEmpty()) {
EmailMessage message = super.createEmailMessage(
subject,
"text/html",
emailList);
message.setBody(body);
if (!testMode) {
try {
message.sendEmail();
} catch (MessagingException e) {
logger.error("Email message send failed" , e);
}
}
}
}
public void sendFirstErrorMessage(ExecutableFlow flow) {
EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
message.setFromAddress(mailSender);
ExecutionOptions option = flow.getExecutionOptions();
MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, azkabanName, clientHostname, clientPortNumber);
if (mailCreated && !testMode) {
try {
message.sendEmail();
} catch (MessagingException e) {
logger.error("Email message send failed", e);
}
}
}
public void sendErrorEmail(ExecutableFlow flow, String... extraReasons) {
EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
message.setFromAddress(mailSender);
ExecutionOptions option = flow.getExecutionOptions();
MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
boolean mailCreated = mailCreator.createErrorEmail(flow, message, azkabanName, clientHostname, clientPortNumber, extraReasons);
if (mailCreated && !testMode) {
try {
message.sendEmail();
} catch (MessagingException e) {
logger.error("Email message send failed", e);
}
}
}
public void sendSuccessEmail(ExecutableFlow flow) {
EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
message.setFromAddress(mailSender);
ExecutionOptions option = flow.getExecutionOptions();
MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
boolean mailCreated = mailCreator.createSuccessEmail(flow, message, azkabanName, clientHostname, clientPortNumber);
if (mailCreated && !testMode) {
try {
message.sendEmail();
} catch (MessagingException e) {
logger.error("Email message send failed", e);
}
}
}
public static List<String> findFailedJobs(ExecutableFlow flow) {
ArrayList<String> failedJobs = new ArrayList<String>();
for (ExecutableNode node : flow.getExecutableNodes()) {
if (node.getStatus() == Status.FAILED) {
failedJobs.add(node.getId());
}
}
return failedJobs;
}
@Override
public void alertOnSuccess(ExecutableFlow exflow) throws Exception {
sendSuccessEmail(exflow);
}
@Override
public void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception {
sendErrorEmail(exflow, extraReasons);
}
@Override
public void alertOnFirstError(ExecutableFlow exflow) throws Exception {
sendFirstErrorMessage(exflow);
}
@Override
public void alertOnSla(SlaOption slaOption, String slaMessage)
throws Exception {
sendSlaAlertEmail(slaOption, slaMessage);
}
}