Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 7d2e256..acb189b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -16,8 +16,8 @@
package azkaban.executor;
-import azkaban.db.EncodingType;
import azkaban.db.DatabaseOperator;
+import azkaban.db.EncodingType;
import azkaban.db.SQLTransaction;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
@@ -108,9 +108,24 @@ public class ExecutionFlowDao {
}
}
+ /**
+ * fetch flow execution history with specified {@code projectId}, {@code flowId} and flow start
+ * time >= {@code startTime}
+ *
+ * @return the list of flows meeting the specified criteria
+ */
+ public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId, final
+ long startTime) throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_START_TIME,
+ new FetchExecutableFlows(), projectId, flowId, startTime);
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching historic flows", e);
+ }
+ }
+
List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
- final int skip, final int num,
- final Status status)
+ final int skip, final int num, final Status status)
throws ExecutorManagerException {
try {
return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
@@ -267,6 +282,9 @@ public class ExecutionFlowDao {
public static class FetchExecutableFlows implements
ResultSetHandler<List<ExecutableFlow>> {
+ static String FETCH_EXECUTABLE_FLOW_BY_START_TIME =
+ "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE project_id=? "
+ + "AND flow_id=? AND start_time >= ? ORDER BY start_time DESC";
static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
"SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef";
static String FETCH_EXECUTABLE_FLOW =
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 7657588..87ff41a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -52,6 +52,9 @@ public interface ExecutorLoader {
String flowContains, String userNameContains, int status, long startData,
long endData, int skip, int num) throws ExecutorManagerException;
+ List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final long startTime) throws ExecutorManagerException;
+
/**
* <pre>
* Fetch all executors from executors table
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index eca1744..683cc65 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -20,12 +20,12 @@ import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
-import javax.inject.Inject;
-import javax.inject.Singleton;
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
@Singleton
public class JdbcExecutorLoader implements ExecutorLoader {
@@ -126,6 +126,12 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final long startTime) throws ExecutorManagerException {
+ return this.executionFlowDao.fetchFlowHistory(projectId, flowId, startTime);
+ }
+
+ @Override
+ public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num, final Status status) throws ExecutorManagerException {
return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num, status);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index afb90b9..a6483a9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -137,9 +137,9 @@ public class DefaultMailCreator implements MailCreator {
}
@Override
- public boolean createErrorEmail(final ExecutableFlow flow, final EmailMessage message,
- final String azkabanName, final String scheme, final String clientHostname,
- final String clientPortNumber, final String... vars) {
+ public boolean createErrorEmail(final ExecutableFlow flow, final List<ExecutableFlow>
+ pastExecutions, final EmailMessage message, final String azkabanName, final String scheme,
+ final String clientHostname, final String clientPortNumber, final String... vars) {
final ExecutionOptions option = flow.getExecutionOptions();
@@ -185,6 +185,30 @@ public class DefaultMailCreator implements MailCreator {
}
message.println("</ul>");
+
+ message.println("");
+
+ int failedCount = 0;
+ for (final ExecutableFlow executableFlow : pastExecutions) {
+ if (executableFlow.getStatus().equals(Status.FAILED)) {
+ failedCount++;
+ }
+ }
+
+ message.println(String.format("<h3>Executions from past 72 hours (%s out %s) failed</h3>",
+ failedCount, pastExecutions.size()));
+ for (final ExecutableFlow executableFlow : pastExecutions) {
+ message.println("<table>");
+ message.println(
+ "<tr><td>Execution Id</td><td>" + (executableFlow.getExecutionId()) + "</td></tr>");
+ message.println("<tr><td>Start Time</td><td>"
+ + convertMSToString(executableFlow.getStartTime()) + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>"
+ + convertMSToString(executableFlow.getEndTime()) + "</td></tr>");
+ message.println("<tr><td>Status</td><td>" + flow.getStatus() + "</td></tr>");
+ message.println("</table>");
+ }
+
return true;
}
return false;
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
index 8df7407..934e2c5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
@@ -18,6 +18,7 @@ package azkaban.executor.mail;
import azkaban.executor.ExecutableFlow;
import azkaban.utils.EmailMessage;
+import java.util.List;
public interface MailCreator {
@@ -25,8 +26,8 @@ public interface MailCreator {
EmailMessage message, String azkabanName, String scheme,
String clientHostname, String clientPortNumber, String... vars);
- public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message,
- String azkabanName, String scheme, String clientHostname,
+ public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
+ EmailMessage message, String azkabanName, String scheme, String clientHostname,
String clientPortNumber, String... vars);
public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index 2c07308..41ac9fa 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -22,10 +22,14 @@ import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.mail.DefaultMailCreator;
import azkaban.executor.mail.MailCreator;
import azkaban.metrics.CommonMetrics;
import azkaban.sla.SlaOption;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -43,11 +47,13 @@ public class Emailer extends AbstractMailer implements Alerter {
private final String clientHostname;
private final String clientPortNumber;
private final String azkabanName;
+ private final ExecutorLoader executorLoader;
@Inject
public Emailer(final Props props, final CommonMetrics commonMetrics,
- final EmailMessageCreator messageCreator) {
+ final EmailMessageCreator messageCreator, final ExecutorLoader executorLoader) {
super(props, messageCreator);
+ this.executorLoader = requireNonNull(executorLoader, "executorLoader is null.");
this.commonMetrics = requireNonNull(commonMetrics, "commonMetrics is null.");
this.azkabanName = props.getString("azkaban.name", "azkaban");
@@ -114,8 +120,20 @@ public class Emailer extends AbstractMailer implements Alerter {
public void alertOnError(final ExecutableFlow flow, final String... extraReasons) {
final EmailMessage message = this.messageCreator.createMessage();
final MailCreator mailCreator = getMailCreator(flow);
- final boolean mailCreated = mailCreator.createErrorEmail(flow, message, this.azkabanName,
- this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
+ List<ExecutableFlow> last72hoursExecutions = new ArrayList<>();
+
+ if (flow.getStartTime() > 0) {
+ final long startTime = flow.getStartTime() - Duration.ofHours(72).toMillis();
+ try {
+ last72hoursExecutions = this.executorLoader.fetchFlowHistory(flow.getProjectId(), flow
+ .getFlowId(), startTime);
+ } catch (final ExecutorManagerException e) {
+ logger.error("unable to fetch past executions", e);
+ }
+ }
+
+ final boolean mailCreated = mailCreator.createErrorEmail(flow, last72hoursExecutions, message,
+ this.azkabanName, this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
sendEmail(message, mailCreated, "error email message for execution " + flow.getExecutionId());
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 31ff91c..5819fbf 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -30,7 +30,11 @@ import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import java.io.File;
import java.sql.SQLException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -114,7 +118,6 @@ public class ExecutionFlowDaoTest {
assertTwoFlowSame(flow, fetchFlow);
}
-
@Test
public void testUpdateExecutableFlow() throws Exception {
final ExecutableFlow flow = createTestFlow();
@@ -150,6 +153,48 @@ public class ExecutionFlowDaoTest {
}
@Test
+ public void fetchFlowHistoryWithStartTime() throws Exception {
+ class DateUtil {
+
+ private long dateStrToLong(final String dateStr) throws ParseException {
+ final SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ final Date d = f.parse(dateStr);
+ final long milliseconds = d.getTime();
+ return milliseconds;
+ }
+ }
+
+ final DateUtil dateUtil = new DateUtil();
+ final ExecutableFlow flow1 = createTestFlow();
+ flow1.setStartTime(dateUtil.dateStrToLong("2018-09-01 10:00:00"));
+ this.executionFlowDao.uploadExecutableFlow(flow1);
+
+ final ExecutableFlow flow2 = createTestFlow();
+ flow2.setStartTime(dateUtil.dateStrToLong("2018-09-01 09:00:00"));
+ this.executionFlowDao.uploadExecutableFlow(flow2);
+
+ final ExecutableFlow flow3 = createTestFlow();
+ flow3.setStartTime(dateUtil.dateStrToLong("2018-09-01 09:00:00"));
+ this.executionFlowDao.uploadExecutableFlow(flow3);
+
+ final ExecutableFlow flow4 = createTestFlow();
+ flow4.setStartTime(dateUtil.dateStrToLong("2018-09-01 08:00:00"));
+ this.executionFlowDao.uploadExecutableFlow(flow4);
+
+ final List<ExecutableFlow> flowList = this.executionFlowDao.fetchFlowHistory
+ (flow1.getProjectId(), flow1.getFlowId(), dateUtil.dateStrToLong("2018-09-01 09:00:00"));
+ final List<ExecutableFlow> expected = new ArrayList<>();
+ expected.add(flow1);
+ expected.add(flow2);
+ expected.add(flow3);
+
+ assertThat(flowList).hasSize(3);
+ for (int i = 0; i < flowList.size(); i++) {
+ assertTwoFlowSame(flowList.get(i), expected.get(i));
+ }
+ }
+
+ @Test
public void testAdvancedFilter() throws Exception {
createTestProject();
final ExecutableFlow flow = createTestFlow();
@@ -364,7 +409,7 @@ public class ExecutionFlowDaoTest {
assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
assertThat(flow1.getEndTime()).isEqualTo(flow2.getEndTime());
assertThat(flow1.getStartTime()).isEqualTo(flow2.getStartTime());
- assertThat(flow1.getSubmitTime()).isEqualTo(flow2.getStartTime());
+ assertThat(flow1.getSubmitTime()).isEqualTo(flow2.getSubmitTime());
assertThat(flow1.getFlowId()).isEqualTo(flow2.getFlowId());
assertThat(flow1.getProjectId()).isEqualTo(flow2.getProjectId());
assertThat(flow1.getVersion()).isEqualTo(flow2.getVersion());
diff --git a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
index b590f4c..80577d0 100644
--- a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
@@ -15,6 +15,8 @@ import azkaban.utils.EmailMessage;
import azkaban.utils.EmailMessageCreator;
import azkaban.utils.TestUtils;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
import java.util.TimeZone;
import org.joda.time.DateTimeUtils;
import org.junit.After;
@@ -93,9 +95,25 @@ public class DefaultMailCreatorTest {
setJobStatus(Status.FAILED);
this.executableFlow.setEndTime(END_TIME_MILLIS);
this.executableFlow.setStatus(Status.FAILED);
+ final List<ExecutableFlow> executableFlows = new ArrayList<>();
+
+ final ExecutableFlow executableFlow1 = new ExecutableFlow(this.project, this.flow);
+ executableFlow1.setExecutionId(1);
+ executableFlow1.setStartTime(START_TIME_MILLIS);
+ executableFlow1.setEndTime(END_TIME_MILLIS);
+ executableFlow1.setStatus(Status.FAILED);
+ executableFlows.add(executableFlow1);
+
+ final ExecutableFlow executableFlow2 = new ExecutableFlow(this.project, this.flow);
+ executableFlow2.setExecutionId(2);
+ executableFlow2.setStartTime(START_TIME_MILLIS);
+ executableFlow2.setEndTime(END_TIME_MILLIS);
+ executableFlow2.setStatus(Status.SUCCEEDED);
+ executableFlows.add(executableFlow2);
+
assertTrue(this.mailCreator.createErrorEmail(
- this.executableFlow, this.message, this.azkabanName, this.scheme, this.clientHostname,
- this.clientPortNumber));
+ this.executableFlow, executableFlows, this.message, this.azkabanName, this.scheme, this
+ .clientHostname, this.clientPortNumber));
assertEquals("Flow 'mail-creator-test' has failed on unit-tests", this.message.getSubject());
assertThat(TestUtils.readResource("errorEmail.html", this))
.isEqualToIgnoringWhitespace(this.message.getBody());
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 0bf798d..3973d0b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -204,6 +204,12 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
+ public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+ final long startTime) throws ExecutorManagerException {
+ return new ArrayList<>();
+ }
+
+ @Override
public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
final int skip, final int size) throws ExecutorManagerException {
// TODO Auto-generated method stub
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index f7fef9a..c165eb3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.MockExecutorLoader;
import azkaban.flow.Flow;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
@@ -48,6 +50,7 @@ public class EmailerTest {
private Props props;
private EmailMessageCreator messageCreator;
private EmailMessage message;
+ private ExecutorLoader executorLoader;
public static EmailMessageCreator mockMessageCreator(final EmailMessage message) {
final EmailMessageCreator mock = mock(EmailMessageCreator.class);
@@ -85,6 +88,7 @@ public class EmailerTest {
this.messageCreator = mockMessageCreator(this.message);
this.receiveAddrList.add(this.receiveAddr);
this.project = new Project(11, "myTestProject");
+ this.executorLoader = new MockExecutorLoader();
this.props = createMailProperties();
final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
@@ -102,7 +106,8 @@ public class EmailerTest {
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
- final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+ final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+ this.executorLoader);
emailer.alertOnError(exFlow);
verify(this.message).addAllToAddress(this.receiveAddrList);
verify(this.message).setSubject("Flow 'jobe' has failed on azkaban");
@@ -113,14 +118,16 @@ public class EmailerTest {
@Test
public void testGetAzkabanURL() {
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
- final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+ final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+ this.executorLoader);
assertThat(emailer.getAzkabanURL()).isEqualTo("http://localhost:8786");
}
@Test
public void testCreateEmailMessage() {
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
- final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+ final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+ this.executorLoader);
final EmailMessage em = emailer
.createEmailMessage("subject", "text/html", this.receiveAddrList);
verify(this.messageCreator).createMessage();
@@ -138,7 +145,8 @@ public class EmailerTest {
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
final CommonMetrics commonMetrics = mock(CommonMetrics.class);
- final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+ final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+ this.executorLoader);
emailer.alertOnError(exFlow);
verify(commonMetrics, never()).markSendEmailFail();
}
diff --git a/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html b/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html
index 8593525..9321eea 100644
--- a/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html
+++ b/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail.html
@@ -22,4 +22,40 @@
<ul>
<li><a href="http://localhost:8081/executor?execid=-1&job=test-job">Failed job 'test-job' Link</a>
</li>
-</ul>
+</ul><h3>Executions from past 72 hours (1 out 2) failed</h3>
+<table>
+ <tr>
+ <td>Execution Id</td>
+ <td>1</td>
+ </tr>
+ <tr>
+ <td>Start Time</td>
+ <td>2016/07/17 11:54:11 EEST</td>
+ </tr>
+ <tr>
+ <td>End Time</td>
+ <td>2016/07/17 11:54:16 EEST</td>
+ </tr>
+ <tr>
+ <td>Status</td>
+ <td>FAILED</td>
+ </tr>
+</table>
+<table>
+ <tr>
+ <td>Execution Id</td>
+ <td>2</td>
+ </tr>
+ <tr>
+ <td>Start Time</td>
+ <td>2016/07/17 11:54:11 EEST</td>
+ </tr>
+ <tr>
+ <td>End Time</td>
+ <td>2016/07/17 11:54:16 EEST</td>
+ </tr>
+ <tr>
+ <td>Status</td>
+ <td>FAILED</td>
+ </tr>
+</table>
\ No newline at end of file
diff --git a/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html b/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html
index 7378aa6..df367a0 100644
--- a/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html
+++ b/azkaban-common/src/test/resources/azkaban/utils/errorEmail2.html
@@ -18,4 +18,4 @@
<td>READY</td>
</tr>
</table><a href="http://localhost:8786/executor?execid=-1">jobe Execution Link</a><h3>Reason</h3>
-<ul></ul>
+<ul></ul><h3>Executions from past 72 hours (0 out 0) failed</h3>
\ No newline at end of file
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 6d52ca9..2990b24 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.MockExecutorLoader;
import azkaban.flow.Flow;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.metrics.CommonMetrics;
@@ -66,6 +68,7 @@ public class TriggerInstanceProcessorTest {
private CountDownLatch sendEmailLatch;
private CountDownLatch submitFlowLatch;
private CountDownLatch updateExecIDLatch;
+ private ExecutorLoader executorLoader;
private static TriggerInstance createTriggerInstance() throws ParseException {
final FlowTrigger flowTrigger = new FlowTrigger(
@@ -103,10 +106,11 @@ public class TriggerInstanceProcessorTest {
this.messageCreator = EmailerTest.mockMessageCreator(this.message);
this.triggerInstLoader = mock(FlowTriggerInstanceLoader.class);
this.executorManager = mock(ExecutorManager.class);
+ this.executorLoader = new MockExecutorLoader();
when(this.executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
this.emailer = Mockito.spy(new Emailer(EmailerTest.createMailProperties(), commonMetrics,
- this.messageCreator));
+ this.messageCreator, this.executorLoader));
this.sendEmailLatch = new CountDownLatch(1);
doAnswer(invocation -> {
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
index e3ba282..72e6708 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -51,16 +51,16 @@ import azkaban.utils.EmailMessage;
import azkaban.webapp.AzkabanWebServer;
public class ReportalMailCreator implements MailCreator {
+ public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
+ public static final int NUM_PREVIEW_ROWS = 50;
+ //Attachment that equal or larger than 10MB will be skipped in the email
+ public static final long MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024L;
public static AzkabanWebServer azkaban = null;
public static HadoopSecurityManager hadoopSecurityManager = null;
public static String outputLocation = "";
public static String outputFileSystem = "";
public static String reportalStorageUser = "";
public static File reportalMailTempDirectory;
- public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
- public static final int NUM_PREVIEW_ROWS = 50;
- //Attachment that equal or larger than 10MB will be skipped in the email
- public static final long MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024L;
static {
DefaultMailCreator.registerCreator(REPORTAL_MAIL_CREATOR,
@@ -80,8 +80,8 @@ public class ReportalMailCreator implements MailCreator {
}
@Override
- public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message,
- String azkabanName, String scheme, String clientHostname,
+ public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow>
+ pastExecutions, EmailMessage message, String azkabanName, String scheme, String clientHostname,
String clientPortNumber, String... vars) {
ExecutionOptions option = flow.getExecutionOptions();