azkaban-aplcache

Add metrics for sending email successs/failure. (#1287) *

7/27/2017 8:26:53 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
index 95c2347..70b6f83 100644
--- a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
+++ b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
@@ -22,6 +22,7 @@ import azkaban.utils.Emailer;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
+import azkaban.metrics.CommonMetrics;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.File;
@@ -44,19 +45,18 @@ public class AlerterHolder {
   private Map<String, Alerter> alerters;
 
   @Inject
-  public AlerterHolder(final Props props) {
+  public AlerterHolder(final Props props, final Emailer mailAlerter) {
     try {
-      this.alerters = loadAlerters(props);
+      this.alerters = loadAlerters(props, mailAlerter);
     } catch (final Exception ex) {
       logger.error(ex);
       this.alerters = new HashMap<>();
     }
   }
 
-  private Map<String, Alerter> loadAlerters(final Props props) {
+  private Map<String, Alerter> loadAlerters(final Props props, final Emailer mailAlerter) {
     final Map<String, Alerter> allAlerters = new HashMap<>();
     // load built-in alerters
-    final Emailer mailAlerter = new Emailer(props);
     allAlerters.put("email", mailAlerter);
     // load all plugin alerters
     final String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 8e46206..7a62928 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -36,6 +36,8 @@ public class CommonMetrics {
   private Meter flowFailMeter;
   private Meter dispatchFailMeter;
   private Meter dispatchSuccessMeter;
+  private Meter sendEmailFailMeter;
+  private Meter sendEmailSuccessMeter;
 
   @Inject
   public CommonMetrics(final MetricsManager metricsManager) {
@@ -48,6 +50,8 @@ public class CommonMetrics {
     this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
     this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
     this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
+    this.sendEmailFailMeter = this.metricsManager.addMeter("send-email-fail-meter");
+    this.sendEmailSuccessMeter = this.metricsManager.addMeter("send-email-success-meter");
     this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
     this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
   }
@@ -88,6 +92,20 @@ public class CommonMetrics {
     this.dispatchSuccessMeter.mark();
   }
 
+  /**
+   * Mark sendEmailFailMeter when an email fails to be sent out.
+   */
+  public void markSendEmailFail() {
+    this.sendEmailFailMeter.mark();
+  }
+
+  /**
+   * Mark sendEmailSuccessMeter when an email is sent out successfully.
+   */
+  public void markSendEmailSuccess() {
+    this.sendEmailSuccessMeter.mark();
+  }
+
   public void setDBConnectionTime(final long milliseconds) {
     this.dbConnectionTime.set(milliseconds);
   }
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index f7a863f..f99825b 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -16,6 +16,8 @@
 
 package azkaban.utils;
 
+import static java.util.Objects.requireNonNull;
+
 import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -23,17 +25,22 @@ import azkaban.executor.ExecutionOptions;
 import azkaban.executor.Status;
 import azkaban.executor.mail.DefaultMailCreator;
 import azkaban.executor.mail.MailCreator;
+import azkaban.metrics.CommonMetrics;
 import azkaban.sla.SlaOption;
 import java.util.ArrayList;
 import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import javax.mail.MessagingException;
 import org.apache.log4j.Logger;
 
+@Singleton
 public class Emailer extends AbstractMailer implements Alerter {
 
   private static final String HTTPS = "https";
   private static final String HTTP = "http";
   private static final Logger logger = Logger.getLogger(Emailer.class);
+  private final CommonMetrics commonMetrics;
   private final String scheme;
   private final String clientHostname;
   private final String clientPortNumber;
@@ -46,8 +53,10 @@ public class Emailer extends AbstractMailer implements Alerter {
   private final String tls;
   private boolean testMode = false;
 
-  public Emailer(final Props props) {
+  @Inject
+  public Emailer(final Props props, final CommonMetrics commonMetrics) {
     super(props);
+    this.commonMetrics = requireNonNull(commonMetrics, "commonMetrics is null.");
     this.azkabanName = props.getString("azkaban.name", "azkaban");
     this.mailHost = props.getString("mail.host", "localhost");
     this.mailPort = props.getInt("mail.port", DEFAULT_SMTP_PORT);
@@ -101,8 +110,10 @@ public class Emailer extends AbstractMailer implements Alerter {
       if (!this.testMode) {
         try {
           message.sendEmail();
+          this.commonMetrics.markSendEmailSuccess();
         } catch (final MessagingException e) {
           logger.error("Failed to send SLA email message" + slaMessage, e);
+          this.commonMetrics.markSendEmailFail();
         }
       }
     }
@@ -130,9 +141,11 @@ public class Emailer extends AbstractMailer implements Alerter {
     if (mailCreated && !this.testMode) {
       try {
         message.sendEmail();
+        this.commonMetrics.markSendEmailSuccess();
       } catch (final MessagingException e) {
         logger.error(
             "Failed to send first error email message for execution " + flow.getExecutionId(), e);
+        this.commonMetrics.markSendEmailFail();
       }
     }
   }
@@ -158,9 +171,11 @@ public class Emailer extends AbstractMailer implements Alerter {
     if (mailCreated && !this.testMode) {
       try {
         message.sendEmail();
+        this.commonMetrics.markSendEmailSuccess();
       } catch (final MessagingException e) {
         logger
             .error("Failed to send error email message for execution " + flow.getExecutionId(), e);
+        this.commonMetrics.markSendEmailFail();
       }
     }
   }
@@ -186,9 +201,11 @@ public class Emailer extends AbstractMailer implements Alerter {
     if (mailCreated && !this.testMode) {
       try {
         message.sendEmail();
+        this.commonMetrics.markSendEmailSuccess();
       } catch (final MessagingException e) {
         logger.error("Failed to send success email message for execution " + flow.getExecutionId(),
             e);
+        this.commonMetrics.markSendEmailFail();
       }
     }
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 4bff2c8..8152f4c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.when;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.user.User;
+import azkaban.utils.AbstractMailerTest;
+import azkaban.utils.Emailer;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
@@ -38,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -47,12 +50,21 @@ import org.junit.Test;
 public class ExecutorManagerTest {
 
   private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+  private final CommonMetrics commonMetrics = new CommonMetrics(
+      new MetricsManager(new MetricRegistry()));
   private ExecutorManager manager;
   private ExecutorLoader loader;
   private Props props;
   private User user;
   private ExecutableFlow flow1;
   private ExecutableFlow flow2;
+  private AlerterHolder alertHolder;
+
+  @Before
+  public void setup() {
+    this.props = AbstractMailerTest.createMailProperties();
+    this.alertHolder = new AlerterHolder(this.props, new Emailer(this.props, this.commonMetrics));
+  }
 
   /* Helper method to create a ExecutorManager Instance */
   private ExecutorManager createMultiExecutorManagerInstance()
@@ -66,14 +78,11 @@ public class ExecutorManagerTest {
    */
   private ExecutorManager createMultiExecutorManagerInstance(
       final ExecutorLoader loader) throws ExecutorManagerException {
-    final Props props = new Props();
-    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
-
+    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
     loader.addExecutor("localhost", 12345);
     loader.addExecutor("localhost", 12346);
-    return new ExecutorManager(props, loader, new AlerterHolder(props),
-        new CommonMetrics(new MetricsManager(new MetricRegistry())));
+    return new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
   }
 
   /*
@@ -82,12 +91,10 @@ public class ExecutorManagerTest {
    */
   @Test(expected = ExecutorManagerException.class)
   public void testNoExecutorScenario() throws ExecutorManagerException {
-    final Props props = new Props();
-    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final ExecutorLoader loader = new MockExecutorLoader();
     @SuppressWarnings("unused") final ExecutorManager manager =
-      new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
   }
 
   /*
@@ -95,13 +102,10 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testLocalExecutorScenario() throws ExecutorManagerException {
-    final Props props = new Props();
-    props.put("executor.port", 12345);
-
+    this.props.put("executor.port", 12345);
     final ExecutorLoader loader = new MockExecutorLoader();
     final ExecutorManager manager =
-      new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     final Set<Executor> activeExecutors =
       new HashSet(manager.getAllActiveExecutors());
 
@@ -118,15 +122,13 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testMultipleExecutorScenario() throws ExecutorManagerException {
-    final Props props = new Props();
-    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final ExecutorLoader loader = new MockExecutorLoader();
     final Executor executor1 = loader.addExecutor("localhost", 12345);
     final Executor executor2 = loader.addExecutor("localhost", 12346);
 
     final ExecutorManager manager =
-      new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     final Set<Executor> activeExecutors =
       new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
@@ -138,14 +140,11 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testSetupExecutorsSucess() throws ExecutorManagerException {
-    final Props props = new Props();
-    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final ExecutorLoader loader = new MockExecutorLoader();
     final Executor executor1 = loader.addExecutor("localhost", 12345);
-
     final ExecutorManager manager =
-      new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
       new Executor[] { executor1 });
 
@@ -166,14 +165,11 @@ public class ExecutorManagerTest {
    */
   @Test(expected = ExecutorManagerException.class)
   public void testSetupExecutorsException() throws ExecutorManagerException {
-    final Props props = new Props();
-    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     final ExecutorLoader loader = new MockExecutorLoader();
     final Executor executor1 = loader.addExecutor("localhost", 12345);
-
     final ExecutorManager manager =
-      new ExecutorManager(props, loader, new AlerterHolder(props),
-          new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
     final Set<Executor> activeExecutors =
       new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(),
@@ -333,7 +329,6 @@ public class ExecutorManagerTest {
       throws ExecutorManagerException, IOException {
     this.loader = mock(ExecutorLoader.class);
     this.user = TestUtils.getTestUser();
-    this.props = new Props();
     this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
     //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
     //so that flows will be dispatched to executors.
@@ -346,8 +341,8 @@ public class ExecutorManagerTest {
     executors.add(executor2);
 
     when(this.loader.fetchActiveExecutors()).thenReturn(executors);
-    this.manager = new ExecutorManager(this.props, this.loader, new AlerterHolder(this.props),
-        new CommonMetrics(new MetricsManager(new MetricRegistry())));
+    this.manager = new ExecutorManager(this.props, this.loader, this.alertHolder,
+        this.commonMetrics);
 
     this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index d44a739..5d3dc22 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -16,6 +16,7 @@
 
 package azkaban.trigger;
 
+import azkaban.utils.AbstractMailerTest;
 import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
@@ -24,6 +25,7 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.Emailer;
 import azkaban.utils.Props;
 import com.codahale.metrics.MetricRegistry;
 import java.util.ArrayList;
@@ -44,12 +46,14 @@ public class TriggerManagerDeadlockTest {
   @Before
   public void setup() throws ExecutorManagerException, TriggerManagerException {
     this.loader = new MockTriggerLoader();
-    final Props props = new Props();
+    final Props props = AbstractMailerTest.createMailProperties();
     props.put("trigger.scan.interval", 1000);
     props.put("executor.port", 12321);
     this.execLoader = new MockExecutorLoader();
+    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
-        new AlerterHolder(props), new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new AlerterHolder(props, new Emailer(props, commonMetrics)),
+        commonMetrics);
     this.triggerManager = new TriggerManager(props, this.loader, executorManager);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index f0e3563..8a05bc0 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -17,9 +17,12 @@ package azkaban.utils;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.flow.Flow;
+import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
 import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
 import azkaban.test.executions.TestExecutions;
+import com.codahale.metrics.MetricRegistry;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.log4j.Logger;
@@ -74,14 +77,16 @@ public class EmailerTest {
     Assert.assertNotNull(flow);
 
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
-    final Emailer emailer = new Emailer(this.props);
+    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+    final Emailer emailer = new Emailer(this.props, commonMetrics);
     emailer.sendErrorEmail(exFlow);
 
   }
 
   @Test
   public void testCreateEmailMessage() {
-    final Emailer emailer = new Emailer(this.props);
+    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+    final Emailer emailer = new Emailer(this.props, commonMetrics);
     final EmailMessage em = emailer
         .createEmailMessage("subject", "text/html", this.receiveAddrList);
     assert em.getMailPort() == this.mailPort;
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index a4ef1a0..b8347dc 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -28,6 +28,7 @@ import azkaban.AzkabanCommonModule;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.db.DatabaseOperator;
+import azkaban.executor.AlerterHolder;
 import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
@@ -36,6 +37,7 @@ import azkaban.project.ProjectManager;
 import azkaban.spi.Storage;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.TriggerManager;
+import azkaban.utils.Emailer;
 import azkaban.utils.Props;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -129,6 +131,8 @@ public class AzkabanWebServerTest {
     assertSingleton(DatabaseOperator.class, injector);
     assertSingleton(TriggerLoader.class, injector);
     assertSingleton(TriggerManager.class, injector);
+    assertSingleton(AlerterHolder.class, injector);
+    assertSingleton(Emailer.class, injector);
 
     SERVICE_PROVIDER.unsetInjector();
   }