Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a69297f..3641e5e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -52,6 +52,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -68,6 +69,7 @@ import java.util.regex.Pattern;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -921,7 +923,7 @@ public class ExecutorManager extends EventHandler implements
userId);
} else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
this.queuedFlows.dequeue(exFlow.getExecutionId());
- finalizeFlows(exFlow);
+ finalizeFlows(exFlow, "Cancelled before dispatching to executor", null);
} else {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1151,7 +1153,7 @@ public class ExecutorManager extends EventHandler implements
// this logic is only implemented in multiExecutorMode but
// missed in single executor case.
this.commonMetrics.markDispatchFail();
- finalizeFlows(exflow);
+ finalizeFlows(exflow, "Dispatching failed", e);
throw e;
}
}
@@ -1227,10 +1229,12 @@ public class ExecutorManager extends EventHandler implements
this.executingManager.shutdown();
}
- private void finalizeFlows(final ExecutableFlow flow) {
+ private void finalizeFlows(final ExecutableFlow flow, final String reason,
+ final Throwable originalError) {
final int execId = flow.getExecutionId();
boolean alertUser = true;
+ final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
this.updaterStage = "finalizing flow " + execId;
// First we check if the execution in the datastore is complete
try {
@@ -1266,19 +1270,17 @@ public class ExecutorManager extends EventHandler implements
logger.error(e);
}
- // TODO append to the flow log that we forced killed this flow because the
- // target no longer had
- // the reference.
+ // TODO append to the flow log that we marked this flow as failed + the extraReasons
this.updaterStage = "finalizing flow " + execId + " alerting and emailing";
if (alertUser) {
final ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
final Alerter mailAlerter = this.alerterHolder.get("email");
- if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
+ if (flow.getStatus() != Status.SUCCEEDED) {
if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
try {
- mailAlerter.alertOnError(flow);
+ mailAlerter.alertOnError(flow, extraReasons);
} catch (final Exception e) {
logger.error(e);
}
@@ -1288,11 +1290,9 @@ public class ExecutorManager extends EventHandler implements
final Alerter alerter = this.alerterHolder.get(alertType);
if (alerter != null) {
try {
- alerter.alertOnError(flow);
+ alerter.alertOnError(flow, extraReasons);
} catch (final Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- logger.error("Failed to alert by " + alertType);
+ logger.error("Failed to alert by " + alertType, e);
}
} else {
logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
@@ -1314,9 +1314,7 @@ public class ExecutorManager extends EventHandler implements
try {
alerter.alertOnSuccess(flow);
} catch (final Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- logger.error("Failed to alert by " + alertType);
+ logger.error("Failed to alert by " + alertType, e);
}
} else {
logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
@@ -1327,6 +1325,15 @@ public class ExecutorManager extends EventHandler implements
}
+ private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
+ final List<String> reasons = new LinkedList<>();
+ reasons.add(reason);
+ if (originalError != null) {
+ reasons.add(ExceptionUtils.getStackTrace(originalError));
+ }
+ return reasons.toArray(new String[reasons.size()]);
+ }
+
private void failEverything(final ExecutableFlow exFlow) {
final long time = System.currentTimeMillis();
for (final ExecutableNode node : exFlow.getExecutableNodes()) {
@@ -1404,8 +1411,7 @@ public class ExecutorManager extends EventHandler implements
try {
mailAlerter.alertOnFirstError(flow);
} catch (final Exception e) {
- e.printStackTrace();
- logger.error("Failed to send first error email." + e.getMessage());
+ logger.error("Failed to send first error email." + e.getMessage(), e);
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
@@ -1415,9 +1421,7 @@ public class ExecutorManager extends EventHandler implements
try {
alerter.alertOnFirstError(flow);
} catch (final Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- logger.error("Failed to alert by " + alertType);
+ logger.error("Failed to alert by " + alertType, e);
}
} else {
logger.error("Alerter type " + alertType
@@ -1662,7 +1666,7 @@ public class ExecutorManager extends EventHandler implements
// Kill error flows
for (final ExecutableFlow flow : finalizeFlows) {
- finalizeFlows(flow);
+ finalizeFlows(flow, "Not running on the assigned executor (any more)", null);
}
}
@@ -1875,7 +1879,7 @@ public class ExecutorManager extends EventHandler implements
if (giveUpReason != null) {
logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
+ giveUpReason);
- finalizeFlows(exflow);
+ finalizeFlows(exflow, "Failed to dispatch because " + giveUpReason, null);
// GIVE UP DISPATCHING - exit
return;
} else {
@@ -1925,7 +1929,7 @@ public class ExecutorManager extends EventHandler implements
"Executor %s responded with exception for exec: %d",
selectedExecutor, exflow.getExecutionId()), e);
logger.info(String.format(
- "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+ "Failed dispatch attempt for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index 1f52b24..2cdf1be 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -79,7 +79,7 @@ public class DefaultMailCreator implements MailCreator {
@Override
public boolean createFirstErrorMessage(final ExecutableFlow flow,
final EmailMessage message, final String azkabanName, final String scheme,
- final String clientHostname, final String clientPortNumber, final String... vars) {
+ final String clientHostname, final String clientPortNumber) {
final ExecutionOptions option = flow.getExecutionOptions();
final List<String> emailList = option.getFailureEmails();
@@ -142,7 +142,7 @@ public class DefaultMailCreator implements MailCreator {
@Override
public boolean createErrorEmail(final ExecutableFlow flow, final List<ExecutableFlow>
pastExecutions, final EmailMessage message, final String azkabanName, final String scheme,
- final String clientHostname, final String clientPortNumber, final String... vars) {
+ final String clientHostname, final String clientPortNumber, final String... reasons) {
final ExecutionOptions option = flow.getExecutionOptions();
@@ -183,8 +183,8 @@ public class DefaultMailCreator implements MailCreator {
message.println("<li><a href=\"" + executionUrl + "&job=" + jobId
+ "\">Failed job '" + jobId + "' Link</a></li>");
}
- for (final String reasons : vars) {
- message.println("<li>" + reasons + "</li>");
+ for (final String reason : reasons) {
+ message.println("<li>" + reason + "</li>");
}
message.println("</ul>");
@@ -220,7 +220,7 @@ public class DefaultMailCreator implements MailCreator {
@Override
public boolean createSuccessEmail(final ExecutableFlow flow, final EmailMessage message,
final String azkabanName, final String scheme, final String clientHostname,
- final String clientPortNumber, final String... vars) {
+ final String clientPortNumber) {
final ExecutionOptions option = flow.getExecutionOptions();
final List<String> emailList = option.getSuccessEmails();
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
index 991c64a..977d232 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
@@ -24,19 +24,19 @@ import java.util.List;
public interface MailCreator {
- public boolean createFirstErrorMessage(ExecutableFlow flow,
+ boolean createFirstErrorMessage(ExecutableFlow flow,
EmailMessage message, String azkabanName, String scheme,
- String clientHostname, String clientPortNumber, String... vars);
+ String clientHostname, String clientPortNumber);
- public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
+ boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
EmailMessage message, String azkabanName, String scheme, String clientHostname,
- String clientPortNumber, String... vars);
+ String clientPortNumber, String... reasons);
- public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
+ boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
String azkabanName, String scheme, String clientHostname,
- String clientPortNumber, String... vars);
+ String clientPortNumber);
- public boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
+ boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
ExecutorManagerException updateException, EmailMessage message,
String azkabanName, String scheme, String clientHostname,
String clientPortNumber);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 302e3ed..7da9c95 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -331,7 +331,8 @@ public class ExecutorManagerTest {
verify(this.apiGateway)
.callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
- verify(this.mailAlerter).alertOnError(flow1);
+ verify(this.mailAlerter).alertOnError(eq(flow1),
+ eq("Failed to dispatch because reached azkaban.maxDispatchingErrors (tried 2 executors)"));
}
private void mockFlowDoesNotExist() throws Exception {
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
index 3175b07..8539182 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -16,8 +16,21 @@
package azkaban.viewer.reportal;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
+import azkaban.project.Project;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.ReportalHelper;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.reportal.util.StreamProviderHDFS;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.EmailMessage;
+import azkaban.webapp.AzkabanWebServer;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -34,23 +47,9 @@ import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
-
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
-
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.mail.DefaultMailCreator;
-import azkaban.executor.mail.MailCreator;
-import azkaban.project.Project;
-import azkaban.reportal.util.IStreamProvider;
-import azkaban.reportal.util.ReportalHelper;
-import azkaban.reportal.util.ReportalUtil;
-import azkaban.reportal.util.StreamProviderHDFS;
-import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.utils.EmailMessage;
-import azkaban.webapp.AzkabanWebServer;
+import org.apache.log4j.Logger;
public class ReportalMailCreator implements MailCreator {
public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
@@ -64,6 +63,8 @@ public class ReportalMailCreator implements MailCreator {
public static String reportalStorageUser = "";
public static File reportalMailTempDirectory;
+ private static final Logger logger = Logger.getLogger(ReportalMailCreator.class);
+
static {
DefaultMailCreator.registerCreator(REPORTAL_MAIL_CREATOR,
new ReportalMailCreator());
@@ -72,7 +73,7 @@ public class ReportalMailCreator implements MailCreator {
@Override
public boolean createFirstErrorMessage(ExecutableFlow flow,
EmailMessage message, String azkabanName, String scheme,
- String clientHostname, String clientPortNumber, String... vars) {
+ String clientHostname, String clientPortNumber) {
ExecutionOptions option = flow.getExecutionOptions();
Set<String> emailList = new HashSet<String>(option.getFailureEmails());
@@ -84,7 +85,7 @@ public class ReportalMailCreator implements MailCreator {
@Override
public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow>
pastExecutions, EmailMessage message, String azkabanName, String scheme, String clientHostname,
- String clientPortNumber, String... vars) {
+ String clientPortNumber, String... reasons) {
ExecutionOptions option = flow.getExecutionOptions();
Set<String> emailList = new HashSet<String>(option.getFailureEmails());
@@ -96,7 +97,7 @@ public class ReportalMailCreator implements MailCreator {
@Override
public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
String azkabanName, String scheme, String clientHostname,
- String clientPortNumber, String... vars) {
+ String clientPortNumber) {
ExecutionOptions option = flow.getExecutionOptions();
Set<String> emailList = new HashSet<String>(option.getSuccessEmails());
@@ -137,7 +138,7 @@ public class ReportalMailCreator implements MailCreator {
try {
return createMessage(project, flow, message, urlPrefix, printData);
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("Message creation failed for " + flow.getId(), e);
}
}
@@ -281,7 +282,7 @@ public class ReportalMailCreator implements MailCreator {
try {
streamProvider.cleanUp();
} catch (IOException e) {
- e.printStackTrace();
+ logger.error("Stream provider cleanup failed for " + flow.getId(), e);
}
boolean emptyResults = true;