azkaban-aplcache

add past executions for failed flow in failure email (#1873) This

7/30/2018 3:37:14 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 7d2e256..acb189b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -16,8 +16,8 @@
 
 package azkaban.executor;
 
-import azkaban.db.EncodingType;
 import azkaban.db.DatabaseOperator;
+import azkaban.db.EncodingType;
 import azkaban.db.SQLTransaction;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
@@ -108,9 +108,24 @@ public class ExecutionFlowDao {
     }
   }
 
+  /**
+   * fetch flow execution history with specified {@code projectId}, {@code flowId} and flow start
+   * time >= {@code startTime}
+   *
+   * @return the list of flows meeting the specified criteria
+   */
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId, final
+  long startTime) throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_START_TIME,
+          new FetchExecutableFlows(), projectId, flowId, startTime);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching historic flows", e);
+    }
+  }
+
   List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
-      final int skip, final int num,
-      final Status status)
+      final int skip, final int num, final Status status)
       throws ExecutorManagerException {
     try {
       return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
@@ -267,6 +282,9 @@ public class ExecutionFlowDao {
   public static class FetchExecutableFlows implements
       ResultSetHandler<List<ExecutableFlow>> {
 
+    static String FETCH_EXECUTABLE_FLOW_BY_START_TIME =
+        "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE project_id=? "
+            + "AND flow_id=? AND start_time >= ? ORDER BY start_time DESC";
     static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
         "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef";
     static String FETCH_EXECUTABLE_FLOW =
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 7657588..87ff41a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -52,6 +52,9 @@ public interface ExecutorLoader {
       String flowContains, String userNameContains, int status, long startData,
       long endData, int skip, int num) throws ExecutorManagerException;
 
+  List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+      final long startTime) throws ExecutorManagerException;
+
   /**
    * <pre>
    * Fetch all executors from executors table
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index eca1744..683cc65 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -20,12 +20,12 @@ import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
-import javax.inject.Inject;
-import javax.inject.Singleton;
 import java.io.File;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 
 @Singleton
 public class JdbcExecutorLoader implements ExecutorLoader {
@@ -126,6 +126,12 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+      final long startTime) throws ExecutorManagerException {
+    return this.executionFlowDao.fetchFlowHistory(projectId, flowId, startTime);
+  }
+
+  @Override
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
       final int skip, final int num, final Status status) throws ExecutorManagerException {
     return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num, status);
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index afb90b9..a6483a9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -137,9 +137,9 @@ public class DefaultMailCreator implements MailCreator {
   }
 
   @Override
-  public boolean createErrorEmail(final ExecutableFlow flow, final EmailMessage message,
-      final String azkabanName, final String scheme, final String clientHostname,
-      final String clientPortNumber, final String... vars) {
+  public boolean createErrorEmail(final ExecutableFlow flow, final List<ExecutableFlow>
+      pastExecutions, final EmailMessage message, final String azkabanName, final String scheme,
+      final String clientHostname, final String clientPortNumber, final String... vars) {
 
     final ExecutionOptions option = flow.getExecutionOptions();
 
@@ -185,6 +185,30 @@ public class DefaultMailCreator implements MailCreator {
       }
 
       message.println("</ul>");
+
+      message.println("");
+
+      int failedCount = 0;
+      for (final ExecutableFlow executableFlow : pastExecutions) {
+        if (executableFlow.getStatus().equals(Status.FAILED)) {
+          failedCount++;
+        }
+      }
+
+      message.println(String.format("<h3>Executions from past 72 hours (%s out %s) failed</h3>",
+          failedCount, pastExecutions.size()));
+      for (final ExecutableFlow executableFlow : pastExecutions) {
+        message.println("<table>");
+        message.println(
+            "<tr><td>Execution Id</td><td>" + (executableFlow.getExecutionId()) + "</td></tr>");
+        message.println("<tr><td>Start Time</td><td>"
+            + convertMSToString(executableFlow.getStartTime()) + "</td></tr>");
+        message.println("<tr><td>End Time</td><td>"
+            + convertMSToString(executableFlow.getEndTime()) + "</td></tr>");
+        message.println("<tr><td>Status</td><td>" + flow.getStatus() + "</td></tr>");
+        message.println("</table>");
+      }
+
       return true;
     }
     return false;
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
index 8df7407..934e2c5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
@@ -18,6 +18,7 @@ package azkaban.executor.mail;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.utils.EmailMessage;
+import java.util.List;
 
 public interface MailCreator {
 
@@ -25,8 +26,8 @@ public interface MailCreator {
       EmailMessage message, String azkabanName, String scheme,
       String clientHostname, String clientPortNumber, String... vars);
 
-  public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message,
-      String azkabanName, String scheme, String clientHostname,
+  public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
+      EmailMessage message, String azkabanName, String scheme, String clientHostname,
       String clientPortNumber, String... vars);
 
   public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index 2c07308..41ac9fa 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -22,10 +22,14 @@ import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.mail.DefaultMailCreator;
 import azkaban.executor.mail.MailCreator;
 import azkaban.metrics.CommonMetrics;
 import azkaban.sla.SlaOption;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -43,11 +47,13 @@ public class Emailer extends AbstractMailer implements Alerter {
   private final String clientHostname;
   private final String clientPortNumber;
   private final String azkabanName;
+  private final ExecutorLoader executorLoader;
 
   @Inject
   public Emailer(final Props props, final CommonMetrics commonMetrics,
-      final EmailMessageCreator messageCreator) {
+      final EmailMessageCreator messageCreator, final ExecutorLoader executorLoader) {
     super(props, messageCreator);
+    this.executorLoader = requireNonNull(executorLoader, "executorLoader is null.");
     this.commonMetrics = requireNonNull(commonMetrics, "commonMetrics is null.");
     this.azkabanName = props.getString("azkaban.name", "azkaban");
 
@@ -114,8 +120,20 @@ public class Emailer extends AbstractMailer implements Alerter {
   public void alertOnError(final ExecutableFlow flow, final String... extraReasons) {
     final EmailMessage message = this.messageCreator.createMessage();
     final MailCreator mailCreator = getMailCreator(flow);
-    final boolean mailCreated = mailCreator.createErrorEmail(flow, message, this.azkabanName,
-        this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
+    List<ExecutableFlow> last72hoursExecutions = new ArrayList<>();
+
+    if (flow.getStartTime() > 0) {
+      final long startTime = flow.getStartTime() - Duration.ofHours(72).toMillis();
+      try {
+        last72hoursExecutions = this.executorLoader.fetchFlowHistory(flow.getProjectId(), flow
+            .getFlowId(), startTime);
+      } catch (final ExecutorManagerException e) {
+        logger.error("unable to fetch past executions", e);
+      }
+    }
+
+    final boolean mailCreated = mailCreator.createErrorEmail(flow, last72hoursExecutions, message,
+        this.azkabanName, this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
     sendEmail(message, mailCreated, "error email message for execution " + flow.getExecutionId());
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 31ff91c..5819fbf 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -30,7 +30,11 @@ import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
 import java.io.File;
 import java.sql.SQLException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -114,7 +118,6 @@ public class ExecutionFlowDaoTest {
     assertTwoFlowSame(flow, fetchFlow);
   }
 
-
   @Test
   public void testUpdateExecutableFlow() throws Exception {
     final ExecutableFlow flow = createTestFlow();
@@ -150,6 +153,48 @@ public class ExecutionFlowDaoTest {
   }
 
   @Test
+  public void fetchFlowHistoryWithStartTime() throws Exception {
+    class DateUtil {
+
+      private long dateStrToLong(final String dateStr) throws ParseException {
+        final SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        final Date d = f.parse(dateStr);
+        final long milliseconds = d.getTime();
+        return milliseconds;
+      }
+    }
+
+    final DateUtil dateUtil = new DateUtil();
+    final ExecutableFlow flow1 = createTestFlow();
+    flow1.setStartTime(dateUtil.dateStrToLong("2018-09-01 10:00:00"));
+    this.executionFlowDao.uploadExecutableFlow(flow1);
+
+    final ExecutableFlow flow2 = createTestFlow();
+    flow2.setStartTime(dateUtil.dateStrToLong("2018-09-01 09:00:00"));
+    this.executionFlowDao.uploadExecutableFlow(flow2);
+
+    final ExecutableFlow flow3 = createTestFlow();
+    flow3.setStartTime(dateUtil.dateStrToLong("2018-09-01 09:00:00"));
+    this.executionFlowDao.uploadExecutableFlow(flow3);
+
+    final ExecutableFlow flow4 = createTestFlow();
+    flow4.setStartTime(dateUtil.dateStrToLong("2018-09-01 08:00:00"));
+    this.executionFlowDao.uploadExecutableFlow(flow4);
+
+    final List<ExecutableFlow> flowList = this.executionFlowDao.fetchFlowHistory
+        (flow1.getProjectId(), flow1.getFlowId(), dateUtil.dateStrToLong("2018-09-01 09:00:00"));
+    final List<ExecutableFlow> expected = new ArrayList<>();
+    expected.add(flow1);
+    expected.add(flow2);
+    expected.add(flow3);
+
+    assertThat(flowList).hasSize(3);
+    for (int i = 0; i < flowList.size(); i++) {
+      assertTwoFlowSame(flowList.get(i), expected.get(i));
+    }
+  }
+
+  @Test
   public void testAdvancedFilter() throws Exception {
     createTestProject();
     final ExecutableFlow flow = createTestFlow();
@@ -364,7 +409,7 @@ public class ExecutionFlowDaoTest {
     assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
     assertThat(flow1.getEndTime()).isEqualTo(flow2.getEndTime());
     assertThat(flow1.getStartTime()).isEqualTo(flow2.getStartTime());
-    assertThat(flow1.getSubmitTime()).isEqualTo(flow2.getStartTime());
+    assertThat(flow1.getSubmitTime()).isEqualTo(flow2.getSubmitTime());
     assertThat(flow1.getFlowId()).isEqualTo(flow2.getFlowId());
     assertThat(flow1.getProjectId()).isEqualTo(flow2.getProjectId());
     assertThat(flow1.getVersion()).isEqualTo(flow2.getVersion());
diff --git a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
index b590f4c..80577d0 100644
--- a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
@@ -15,6 +15,8 @@ import azkaban.utils.EmailMessage;
 import azkaban.utils.EmailMessageCreator;
 import azkaban.utils.TestUtils;
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.TimeZone;
 import org.joda.time.DateTimeUtils;
 import org.junit.After;
@@ -93,9 +95,25 @@ public class DefaultMailCreatorTest {
     setJobStatus(Status.FAILED);
     this.executableFlow.setEndTime(END_TIME_MILLIS);
     this.executableFlow.setStatus(Status.FAILED);
+    final List<ExecutableFlow> executableFlows = new ArrayList<>();
+
+    final ExecutableFlow executableFlow1 = new ExecutableFlow(this.project, this.flow);
+    executableFlow1.setExecutionId(1);
+    executableFlow1.setStartTime(START_TIME_MILLIS);
+    executableFlow1.setEndTime(END_TIME_MILLIS);
+    executableFlow1.setStatus(Status.FAILED);
+    executableFlows.add(executableFlow1);
+
+    final ExecutableFlow executableFlow2 = new ExecutableFlow(this.project, this.flow);
+    executableFlow2.setExecutionId(2);
+    executableFlow2.setStartTime(START_TIME_MILLIS);
+    executableFlow2.setEndTime(END_TIME_MILLIS);
+    executableFlow2.setStatus(Status.SUCCEEDED);
+    executableFlows.add(executableFlow2);
+
     assertTrue(this.mailCreator.createErrorEmail(
-        this.executableFlow, this.message, this.azkabanName, this.scheme, this.clientHostname,
-        this.clientPortNumber));
+        this.executableFlow, executableFlows, this.message, this.azkabanName, this.scheme, this
+            .clientHostname, this.clientPortNumber));
     assertEquals("Flow 'mail-creator-test' has failed on unit-tests", this.message.getSubject());
     assertThat(TestUtils.readResource("errorEmail.html", this))
         .isEqualToIgnoringWhitespace(this.message.getBody());
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 0bf798d..3973d0b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -204,6 +204,12 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+      final long startTime) throws ExecutorManagerException {
+    return new ArrayList<>();
+  }
+
+  @Override
   public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
       final int skip, final int size) throws ExecutorManagerException {
     // TODO Auto-generated method stub
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index f7fef9a..c165eb3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.MockExecutorLoader;
 import azkaban.flow.Flow;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
@@ -48,6 +50,7 @@ public class EmailerTest {
   private Props props;
   private EmailMessageCreator messageCreator;
   private EmailMessage message;
+  private ExecutorLoader executorLoader;
 
   public static EmailMessageCreator mockMessageCreator(final EmailMessage message) {
     final EmailMessageCreator mock = mock(EmailMessageCreator.class);
@@ -85,6 +88,7 @@ public class EmailerTest {
     this.messageCreator = mockMessageCreator(this.message);
     this.receiveAddrList.add(this.receiveAddr);
     this.project = new Project(11, "myTestProject");
+    this.executorLoader = new MockExecutorLoader();
 
     this.props = createMailProperties();
     final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
@@ -102,7 +106,8 @@ public class EmailerTest {
 
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
-    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+        this.executorLoader);
     emailer.alertOnError(exFlow);
     verify(this.message).addAllToAddress(this.receiveAddrList);
     verify(this.message).setSubject("Flow 'jobe' has failed on azkaban");
@@ -113,14 +118,16 @@ public class EmailerTest {
   @Test
   public void testGetAzkabanURL() {
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
-    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+        this.executorLoader);
     assertThat(emailer.getAzkabanURL()).isEqualTo("http://localhost:8786");
   }
 
   @Test
   public void testCreateEmailMessage() {
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
-    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+        this.executorLoader);
     final EmailMessage em = emailer
         .createEmailMessage("subject", "text/html", this.receiveAddrList);
     verify(this.messageCreator).createMessage();
@@ -138,7 +145,8 @@ public class EmailerTest {
 
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
     final CommonMetrics commonMetrics = mock(CommonMetrics.class);
-    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+        this.executorLoader);
     emailer.alertOnError(exFlow);
     verify(commonMetrics, never()).markSendEmailFail();
   }
diff --git a/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html b/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html
index 8593525..9321eea 100644
--- a/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html
+++ b/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html
@@ -22,4 +22,40 @@
 <ul>
   <li><a href="http://localhost:8081/executor?execid=-1&job=test-job">Failed job 'test-job' Link</a>
   </li>
-</ul>
+</ul><h3>Executions from past 72 hours (1 out 2) failed</h3>
+<table>
+  <tr>
+    <td>Execution Id</td>
+    <td>1</td>
+  </tr>
+  <tr>
+    <td>Start Time</td>
+    <td>2016/07/17 11:54:11 EEST</td>
+  </tr>
+  <tr>
+    <td>End Time</td>
+    <td>2016/07/17 11:54:16 EEST</td>
+  </tr>
+  <tr>
+    <td>Status</td>
+    <td>FAILED</td>
+  </tr>
+</table>
+<table>
+  <tr>
+    <td>Execution Id</td>
+    <td>2</td>
+  </tr>
+  <tr>
+    <td>Start Time</td>
+    <td>2016/07/17 11:54:11 EEST</td>
+  </tr>
+  <tr>
+    <td>End Time</td>
+    <td>2016/07/17 11:54:16 EEST</td>
+  </tr>
+  <tr>
+    <td>Status</td>
+    <td>FAILED</td>
+  </tr>
+</table>
\ No newline at end of file
diff --git a/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html b/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html
index 7378aa6..df367a0 100644
--- a/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html
+++ b/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html
@@ -18,4 +18,4 @@
     <td>READY</td>
   </tr>
 </table><a href="http://localhost:8786/executor?execid=-1">jobe Execution Link</a><h3>Reason</h3>
-<ul></ul>
+<ul></ul><h3>Executions from past 72 hours (0 out 0) failed</h3>
\ No newline at end of file
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 6d52ca9..2990b24 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.MockExecutorLoader;
 import azkaban.flow.Flow;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.metrics.CommonMetrics;
@@ -66,6 +68,7 @@ public class TriggerInstanceProcessorTest {
   private CountDownLatch sendEmailLatch;
   private CountDownLatch submitFlowLatch;
   private CountDownLatch updateExecIDLatch;
+  private ExecutorLoader executorLoader;
 
   private static TriggerInstance createTriggerInstance() throws ParseException {
     final FlowTrigger flowTrigger = new FlowTrigger(
@@ -103,10 +106,11 @@ public class TriggerInstanceProcessorTest {
     this.messageCreator = EmailerTest.mockMessageCreator(this.message);
     this.triggerInstLoader = mock(FlowTriggerInstanceLoader.class);
     this.executorManager = mock(ExecutorManager.class);
+    this.executorLoader = new MockExecutorLoader();
     when(this.executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     this.emailer = Mockito.spy(new Emailer(EmailerTest.createMailProperties(), commonMetrics,
-        this.messageCreator));
+        this.messageCreator, this.executorLoader));
 
     this.sendEmailLatch = new CountDownLatch(1);
     doAnswer(invocation -> {
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
index e3ba282..72e6708 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -51,16 +51,16 @@ import azkaban.utils.EmailMessage;
 import azkaban.webapp.AzkabanWebServer;
 
 public class ReportalMailCreator implements MailCreator {
+  public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
+  public static final int NUM_PREVIEW_ROWS = 50;
+  //Attachment that equal or larger than 10MB will be skipped in the email
+  public static final long MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024L;
   public static AzkabanWebServer azkaban = null;
   public static HadoopSecurityManager hadoopSecurityManager = null;
   public static String outputLocation = "";
   public static String outputFileSystem = "";
   public static String reportalStorageUser = "";
   public static File reportalMailTempDirectory;
-  public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
-  public static final int NUM_PREVIEW_ROWS = 50;
-  //Attachment that equal or larger than 10MB will be skipped in the email
-  public static final long MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024L;
 
   static {
     DefaultMailCreator.registerCreator(REPORTAL_MAIL_CREATOR,
@@ -80,8 +80,8 @@ public class ReportalMailCreator implements MailCreator {
   }
 
   @Override
-  public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message,
-      String azkabanName, String scheme, String clientHostname,
+  public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow>
+      pastExecutions, EmailMessage message, String azkabanName, String scheme, String clientHostname,
       String clientPortNumber, String... vars) {
 
     ExecutionOptions option = flow.getExecutionOptions();