Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
index 95c2347..70b6f83 100644
--- a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
+++ b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
@@ -22,6 +22,7 @@ import azkaban.utils.Emailer;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
+import azkaban.metrics.CommonMetrics;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.File;
@@ -44,19 +45,18 @@ public class AlerterHolder {
private Map<String, Alerter> alerters;
@Inject
- public AlerterHolder(final Props props) {
+ public AlerterHolder(final Props props, final Emailer mailAlerter) {
try {
- this.alerters = loadAlerters(props);
+ this.alerters = loadAlerters(props, mailAlerter);
} catch (final Exception ex) {
logger.error(ex);
this.alerters = new HashMap<>();
}
}
- private Map<String, Alerter> loadAlerters(final Props props) {
+ private Map<String, Alerter> loadAlerters(final Props props, final Emailer mailAlerter) {
final Map<String, Alerter> allAlerters = new HashMap<>();
// load built-in alerters
- final Emailer mailAlerter = new Emailer(props);
allAlerters.put("email", mailAlerter);
// load all plugin alerters
final String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 8e46206..7a62928 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -36,6 +36,8 @@ public class CommonMetrics {
private Meter flowFailMeter;
private Meter dispatchFailMeter;
private Meter dispatchSuccessMeter;
+ private Meter sendEmailFailMeter;
+ private Meter sendEmailSuccessMeter;
@Inject
public CommonMetrics(final MetricsManager metricsManager) {
@@ -48,6 +50,8 @@ public class CommonMetrics {
this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
+ this.sendEmailFailMeter = this.metricsManager.addMeter("send-email-fail-meter");
+ this.sendEmailSuccessMeter = this.metricsManager.addMeter("send-email-success-meter");
this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
}
@@ -88,6 +92,20 @@ public class CommonMetrics {
this.dispatchSuccessMeter.mark();
}
+ /**
+ * Mark sendEmailFailMeter when an email fails to be sent out.
+ */
+ public void markSendEmailFail() {
+ this.sendEmailFailMeter.mark();
+ }
+
+ /**
+ * Mark sendEmailSuccessMeter when an email is sent out successfully.
+ */
+ public void markSendEmailSuccess() {
+ this.sendEmailSuccessMeter.mark();
+ }
+
public void setDBConnectionTime(final long milliseconds) {
this.dbConnectionTime.set(milliseconds);
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index f7a863f..f99825b 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -16,6 +16,8 @@
package azkaban.utils;
+import static java.util.Objects.requireNonNull;
+
import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
@@ -23,17 +25,22 @@ import azkaban.executor.ExecutionOptions;
import azkaban.executor.Status;
import azkaban.executor.mail.DefaultMailCreator;
import azkaban.executor.mail.MailCreator;
+import azkaban.metrics.CommonMetrics;
import azkaban.sla.SlaOption;
import java.util.ArrayList;
import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.mail.MessagingException;
import org.apache.log4j.Logger;
+@Singleton
public class Emailer extends AbstractMailer implements Alerter {
private static final String HTTPS = "https";
private static final String HTTP = "http";
private static final Logger logger = Logger.getLogger(Emailer.class);
+ private final CommonMetrics commonMetrics;
private final String scheme;
private final String clientHostname;
private final String clientPortNumber;
@@ -46,8 +53,10 @@ public class Emailer extends AbstractMailer implements Alerter {
private final String tls;
private boolean testMode = false;
- public Emailer(final Props props) {
+ @Inject
+ public Emailer(final Props props, final CommonMetrics commonMetrics) {
super(props);
+ this.commonMetrics = requireNonNull(commonMetrics, "commonMetrics is null.");
this.azkabanName = props.getString("azkaban.name", "azkaban");
this.mailHost = props.getString("mail.host", "localhost");
this.mailPort = props.getInt("mail.port", DEFAULT_SMTP_PORT);
@@ -101,8 +110,10 @@ public class Emailer extends AbstractMailer implements Alerter {
if (!this.testMode) {
try {
message.sendEmail();
+ this.commonMetrics.markSendEmailSuccess();
} catch (final MessagingException e) {
logger.error("Failed to send SLA email message" + slaMessage, e);
+ this.commonMetrics.markSendEmailFail();
}
}
}
@@ -130,9 +141,11 @@ public class Emailer extends AbstractMailer implements Alerter {
if (mailCreated && !this.testMode) {
try {
message.sendEmail();
+ this.commonMetrics.markSendEmailSuccess();
} catch (final MessagingException e) {
logger.error(
"Failed to send first error email message for execution " + flow.getExecutionId(), e);
+ this.commonMetrics.markSendEmailFail();
}
}
}
@@ -158,9 +171,11 @@ public class Emailer extends AbstractMailer implements Alerter {
if (mailCreated && !this.testMode) {
try {
message.sendEmail();
+ this.commonMetrics.markSendEmailSuccess();
} catch (final MessagingException e) {
logger
.error("Failed to send error email message for execution " + flow.getExecutionId(), e);
+ this.commonMetrics.markSendEmailFail();
}
}
}
@@ -186,9 +201,11 @@ public class Emailer extends AbstractMailer implements Alerter {
if (mailCreated && !this.testMode) {
try {
message.sendEmail();
+ this.commonMetrics.markSendEmailSuccess();
} catch (final MessagingException e) {
logger.error("Failed to send success email message for execution " + flow.getExecutionId(),
e);
+ this.commonMetrics.markSendEmailFail();
}
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 4bff2c8..8152f4c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.when;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.user.User;
+import azkaban.utils.AbstractMailerTest;
+import azkaban.utils.Emailer;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
@@ -38,6 +40,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -47,12 +50,21 @@ import org.junit.Test;
public class ExecutorManagerTest {
private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+ private final CommonMetrics commonMetrics = new CommonMetrics(
+ new MetricsManager(new MetricRegistry()));
private ExecutorManager manager;
private ExecutorLoader loader;
private Props props;
private User user;
private ExecutableFlow flow1;
private ExecutableFlow flow2;
+ private AlerterHolder alertHolder;
+
+ @Before
+ public void setup() {
+ this.props = AbstractMailerTest.createMailProperties();
+ this.alertHolder = new AlerterHolder(this.props, new Emailer(this.props, this.commonMetrics));
+ }
/* Helper method to create a ExecutorManager Instance */
private ExecutorManager createMultiExecutorManagerInstance()
@@ -66,14 +78,11 @@ public class ExecutorManagerTest {
*/
private ExecutorManager createMultiExecutorManagerInstance(
final ExecutorLoader loader) throws ExecutorManagerException {
- final Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
-
+ this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
loader.addExecutor("localhost", 12345);
loader.addExecutor("localhost", 12346);
- return new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ return new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
}
/*
@@ -82,12 +91,10 @@ public class ExecutorManagerTest {
*/
@Test(expected = ExecutorManagerException.class)
public void testNoExecutorScenario() throws ExecutorManagerException {
- final Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final ExecutorLoader loader = new MockExecutorLoader();
@SuppressWarnings("unused") final ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
}
/*
@@ -95,13 +102,10 @@ public class ExecutorManagerTest {
*/
@Test
public void testLocalExecutorScenario() throws ExecutorManagerException {
- final Props props = new Props();
- props.put("executor.port", 12345);
-
+ this.props.put("executor.port", 12345);
final ExecutorLoader loader = new MockExecutorLoader();
final ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
@@ -118,15 +122,13 @@ public class ExecutorManagerTest {
*/
@Test
public void testMultipleExecutorScenario() throws ExecutorManagerException {
- final Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final ExecutorLoader loader = new MockExecutorLoader();
final Executor executor1 = loader.addExecutor("localhost", 12345);
final Executor executor2 = loader.addExecutor("localhost", 12346);
final ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
@@ -138,14 +140,11 @@ public class ExecutorManagerTest {
*/
@Test
public void testSetupExecutorsSucess() throws ExecutorManagerException {
- final Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final ExecutorLoader loader = new MockExecutorLoader();
final Executor executor1 = loader.addExecutor("localhost", 12345);
-
final ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
new Executor[] { executor1 });
@@ -166,14 +165,11 @@ public class ExecutorManagerTest {
*/
@Test(expected = ExecutorManagerException.class)
public void testSetupExecutorsException() throws ExecutorManagerException {
- final Props props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
final ExecutorLoader loader = new MockExecutorLoader();
final Executor executor1 = loader.addExecutor("localhost", 12345);
-
final ExecutorManager manager =
- new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(),
@@ -333,7 +329,6 @@ public class ExecutorManagerTest {
throws ExecutorManagerException, IOException {
this.loader = mock(ExecutorLoader.class);
this.user = TestUtils.getTestUser();
- this.props = new Props();
this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
//To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
//so that flows will be dispatched to executors.
@@ -346,8 +341,8 @@ public class ExecutorManagerTest {
executors.add(executor2);
when(this.loader.fetchActiveExecutors()).thenReturn(executors);
- this.manager = new ExecutorManager(this.props, this.loader, new AlerterHolder(this.props),
- new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ this.manager = new ExecutorManager(this.props, this.loader, this.alertHolder,
+ this.commonMetrics);
this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index d44a739..5d3dc22 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -16,6 +16,7 @@
package azkaban.trigger;
+import azkaban.utils.AbstractMailerTest;
import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
@@ -24,6 +25,7 @@ import azkaban.executor.MockExecutorLoader;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.Emailer;
import azkaban.utils.Props;
import com.codahale.metrics.MetricRegistry;
import java.util.ArrayList;
@@ -44,12 +46,14 @@ public class TriggerManagerDeadlockTest {
@Before
public void setup() throws ExecutorManagerException, TriggerManagerException {
this.loader = new MockTriggerLoader();
- final Props props = new Props();
+ final Props props = AbstractMailerTest.createMailProperties();
props.put("trigger.scan.interval", 1000);
props.put("executor.port", 12321);
this.execLoader = new MockExecutorLoader();
+ final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
- new AlerterHolder(props), new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ new AlerterHolder(props, new Emailer(props, commonMetrics)),
+ commonMetrics);
this.triggerManager = new TriggerManager(props, this.loader, executorManager);
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index f0e3563..8a05bc0 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -17,9 +17,12 @@ package azkaban.utils;
import azkaban.executor.ExecutableFlow;
import azkaban.flow.Flow;
+import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
import azkaban.test.executions.TestExecutions;
+import com.codahale.metrics.MetricRegistry;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
@@ -74,14 +77,16 @@ public class EmailerTest {
Assert.assertNotNull(flow);
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
- final Emailer emailer = new Emailer(this.props);
+ final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+ final Emailer emailer = new Emailer(this.props, commonMetrics);
emailer.sendErrorEmail(exFlow);
}
@Test
public void testCreateEmailMessage() {
- final Emailer emailer = new Emailer(this.props);
+ final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+ final Emailer emailer = new Emailer(this.props, commonMetrics);
final EmailMessage em = emailer
.createEmailMessage("subject", "text/html", this.receiveAddrList);
assert em.getMailPort() == this.mailPort;
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index a4ef1a0..b8347dc 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -28,6 +28,7 @@ import azkaban.AzkabanCommonModule;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.db.DatabaseOperator;
+import azkaban.executor.AlerterHolder;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
@@ -36,6 +37,7 @@ import azkaban.project.ProjectManager;
import azkaban.spi.Storage;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
+import azkaban.utils.Emailer;
import azkaban.utils.Props;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -129,6 +131,8 @@ public class AzkabanWebServerTest {
assertSingleton(DatabaseOperator.class, injector);
assertSingleton(TriggerLoader.class, injector);
assertSingleton(TriggerManager.class, injector);
+ assertSingleton(AlerterHolder.class, injector);
+ assertSingleton(Emailer.class, injector);
SERVICE_PROVIDER.unsetInjector();
}