azkaban-aplcache
Changes
azkaban-exec-server/build.gradle 11(+11 -0)
azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java 124(+124 -0)
build.gradle 5(+4 -1)
Details
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 3ecaf78..fe66d4f 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -116,6 +116,17 @@ public class Constants {
public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
public static final String PROJECT_TEMP_DIR = "project.temp.dir";
+ // Event reporting properties
+ public static final String AZKABAN_EVENT_REPORTING_CLASS_PARAM =
+ "azkaban.event.reporting.class";
+ public static final String AZKABAN_EVENT_REPORTING_ENABLED = "azkaban.event.reporting.enabled";
+ public static final String AZKABAN_EVENT_REPORTING_KAFKA_BROKERS =
+ "azkaban.event.reporting.kafka.brokers";
+ public static final String AZKABAN_EVENT_REPORTING_KAFKA_TOPIC =
+ "azkaban.event.reporting.kafka.topic";
+ public static final String AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL =
+ "azkaban.event.reporting.kafka.schema.registry.url";
+
/*
* The max number of artifacts retained per project.
* Accepted Values:
diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index fc79b4a..832dd99 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,16 +16,17 @@
package azkaban.event;
+import azkaban.spi.EventType;
import com.google.common.base.Preconditions;
public class Event {
private final Object runner;
- private final Type type;
+ private final EventType type;
private final EventData eventData;
private final long time;
- private Event(final Object runner, final Type type, final EventData eventData) {
+ private Event(final Object runner, final EventType type, final EventData eventData) {
this.runner = runner;
this.type = type;
this.eventData = eventData;
@@ -41,7 +42,7 @@ public class Event {
* @return New Event instance.
* @throws NullPointerException if EventData is null.
*/
- public static Event create(final Object runner, final Type type, final EventData eventData)
+ public static Event create(final Object runner, final EventType type, final EventData eventData)
throws NullPointerException {
Preconditions.checkNotNull(eventData, "EventData was null");
return new Event(runner, type, eventData);
@@ -51,7 +52,7 @@ public class Event {
return this.runner;
}
- public Type getType() {
+ public EventType getType() {
return this.type;
}
@@ -63,14 +64,5 @@ public class Event {
return this.eventData;
}
- public enum Type {
- FLOW_STARTED,
- FLOW_FINISHED,
- JOB_STARTED,
- JOB_FINISHED,
- JOB_STATUS_CHANGED,
- EXTERNAL_FLOW_UPDATED,
- EXTERNAL_JOB_UPDATED
- }
}
azkaban-exec-server/build.gradle 11(+11 -0)
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 0436e3d..c6030b9 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -4,6 +4,7 @@ dependencies {
compile(project(':azkaban-common'))
compile deps.kafkaLog4jAppender
+ compile deps.gobblinKafka
runtime(project(':azkaban-hadoop-security-plugin'))
@@ -13,6 +14,16 @@ dependencies {
testRuntime deps.h2
}
+configurations.compile {
+ exclude group: 'com.linkedin.gobblin', module: 'gobblin-api'
+ exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-graphite'
+ exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-hadoop'
+ exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-influxdb'
+ exclude group: 'com.linkedin.gobblin', module: 'gobblin-runtime'
+
+ exclude group: 'org.projectlombok', module: 'lombok'
+}
+
distributions {
main {
contents {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 683ca35..75f84ef 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,9 +17,21 @@
package azkaban.execapp;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_CLASS_PARAM;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_ENABLED;
+
+import azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.utils.Props;
import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
/**
* This Guice module is currently a one place container for all bindings in the current module. This
@@ -28,9 +40,47 @@ import com.google.inject.AbstractModule;
*/
public class AzkabanExecServerModule extends AbstractModule {
+ private static final Logger logger = Logger.getLogger(AzkabanExecServerModule.class);
+
@Override
protected void configure() {
install(new ExecJettyServerModule());
bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
}
+
+ @Inject
+ @Provides
+ @Singleton
+ public AzkabanEventReporter createAzkabanEventReporter(final Props props) {
+ final boolean eventReporterEnabled =
+ props.getBoolean(AZKABAN_EVENT_REPORTING_ENABLED, false);
+
+ if (!eventReporterEnabled) {
+ logger.info("Event reporter is not enabled");
+ return null;
+ }
+
+ final Class<?> eventReporterClass =
+ props.getClass(AZKABAN_EVENT_REPORTING_CLASS_PARAM, AzkabanKafkaAvroEventReporter.class);
+ if (eventReporterClass != null && eventReporterClass.getConstructors().length > 0) {
+ this.logger.info("Loading event reporter class " + eventReporterClass.getName());
+ try {
+ final Constructor<?> eventReporterClassConstructor =
+ eventReporterClass.getConstructor(Props.class);
+ return (AzkabanEventReporter) eventReporterClassConstructor.newInstance(props);
+ } catch (final InvocationTargetException e) {
+ this.logger.error(e.getTargetException().getMessage());
+ if (e.getTargetException() instanceof IllegalArgumentException) {
+ throw new IllegalArgumentException(e);
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (final Exception e) {
+ this.logger.error("Could not instantiate EventReporter " + eventReporterClass.getName());
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 30f96ce..44bcc59 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -14,6 +14,7 @@ import azkaban.execapp.jmx.JmxJobCallback;
import azkaban.execapp.jmx.JmxJobCallbackMBean;
import azkaban.executor.Status;
import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import java.net.InetAddress;
@@ -108,9 +109,9 @@ public class JobCallbackManager implements EventListener {
if (event.getRunner() instanceof JobRunner) {
try {
- if (event.getType() == Event.Type.JOB_STARTED) {
+ if (event.getType() == EventType.JOB_STARTED) {
processJobCallOnStart(event);
- } else if (event.getType() == Event.Type.JOB_FINISHED) {
+ } else if (event.getType() == EventType.JOB_FINISHED) {
processJobCallOnFinish(event);
}
} catch (final Throwable e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index 1376249..f90470a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -17,12 +17,12 @@
package azkaban.execapp.event;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
+import azkaban.spi.EventType;
public class LocalFlowWatcher extends FlowWatcher {
@@ -58,7 +58,7 @@ public class LocalFlowWatcher extends FlowWatcher {
@Override
public void handleEvent(final Event event) {
- if (event.getType() == Type.JOB_FINISHED) {
+ if (event.getType() == EventType.JOB_FINISHED) {
if (event.getRunner() instanceof FlowRunner) {
// The flow runner will finish a job without it running
final EventData eventData = event.getData();
@@ -72,7 +72,7 @@ public class LocalFlowWatcher extends FlowWatcher {
System.out.println(node + " looks like " + node.getStatus());
handleJobStatusChange(node.getNestedId(), node.getStatus());
}
- } else if (event.getType() == Type.FLOW_FINISHED) {
+ } else if (event.getType() == EventType.FLOW_FINISHED) {
stopWatcher();
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 5d5af56..39ef01b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,9 +16,10 @@
package azkaban.execapp;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+
import azkaban.ServiceProvider;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.event.EventListener;
@@ -42,6 +43,8 @@ import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.sla.SlaOption;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.SwapQueue;
@@ -91,10 +94,12 @@ public class FlowRunner extends EventHandler implements Runnable {
private final Props azkabanProps;
private final Map<String, Props> sharedProps = new HashMap<>();
private final JobRunnerEventListener listener = new JobRunnerEventListener();
+ private final FlowRunnerEventListener flowListener = new FlowRunnerEventListener();
private final Set<JobRunner> activeJobRunners = Collections
.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
// Thread safe swap queue for finishedExecutions.
private final SwapQueue<ExecutableNode> finishedNodes;
+ private final AzkabanEventReporter azkabanEventReporter;
private Logger logger;
private Appender flowAppender;
private File logFile;
@@ -104,21 +109,16 @@ public class FlowRunner extends EventHandler implements Runnable {
// Used for pipelining
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
-
// Watches external flows for execution.
private FlowWatcher watcher = null;
-
private Set<String> proxyUsers = null;
private boolean validateUserProxy;
-
private String jobLogFileSize = "5MB";
private int jobLogNumFiles = 4;
-
private boolean flowPaused = false;
private boolean flowFailed = false;
private boolean flowFinished = false;
private boolean flowKilled = false;
-
// The following is state that will trigger a retry of all failed jobs
private boolean retryFailedJobs = false;
@@ -127,9 +127,10 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
- final Props azkabanProps)
+ final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
throws ExecutorManagerException {
- this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
+ this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
+ azkabanEventReporter);
}
/**
@@ -137,7 +138,8 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
- final ExecutorService executorService, final Props azkabanProps)
+ final ExecutorService executorService, final Props azkabanProps,
+ final AzkabanEventReporter azkabanEventReporter)
throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
@@ -154,10 +156,15 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = executorService;
this.finishedNodes = new SwapQueue<>();
this.azkabanProps = azkabanProps;
+ // Add the flow listener only if a non-null eventReporter is available.
+ if (azkabanEventReporter != null) {
+ this.addListener(this.flowListener);
+ }
// Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
// where the uninitialized logger is used in flow preparing state
createLogger(this.flow.getFlowId());
+ this.azkabanEventReporter = azkabanEventReporter;
}
public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -201,7 +208,7 @@ public class FlowRunner extends EventHandler implements Runnable {
loadAllProperties();
this.fireEventListeners(
- Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
+ Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
runFlow();
} catch (final Throwable t) {
if (this.logger != null) {
@@ -226,7 +233,8 @@ public class FlowRunner extends EventHandler implements Runnable {
closeLogger();
updateFlow();
} finally {
- this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(this.flow)));
+ this.fireEventListeners(
+ Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
}
}
}
@@ -288,7 +296,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
-
/**
* setup logger and execution dir for the flowId
*/
@@ -559,7 +566,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void finishExecutableNode(final ExecutableNode node) {
this.finishedNodes.add(node);
final EventData eventData = new EventData(node.getStatus(), node.getNestedId());
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
+ fireEventListeners(Event.create(this, EventType.JOB_FINISHED, eventData));
}
private void finalizeFlow(final ExecutableFlowBase flow) {
@@ -1099,20 +1106,79 @@ public class FlowRunner extends EventHandler implements Runnable {
return ImmutableSet.copyOf(this.activeJobRunners);
}
+ // Class helps report the flow start and stop events.
+ private class FlowRunnerEventListener implements EventListener {
+
+ public FlowRunnerEventListener() {
+ }
+
+ private synchronized Map<String, String> getFlowMetadata(final FlowRunner flowRunner) {
+ final ExecutableFlow flow = flowRunner.getExecutableFlow();
+ final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
+ final Map<String, String> metaData = new HashMap<>();
+ metaData.put("flowName", flow.getId());
+ metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
+ metaData.put("projectName", flow.getProjectName());
+ metaData.put("submitUser", flow.getSubmitUser());
+ metaData.put("executionId", String.valueOf(flow.getExecutionId()));
+ metaData.put("startTime", String.valueOf(flow.getStartTime()));
+ metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
+ return metaData;
+ }
+
+ @Override
+ public synchronized void handleEvent(final Event event) {
+ if (event.getType() == EventType.FLOW_STARTED) {
+ final FlowRunner flowRunner = (FlowRunner) event.getRunner();
+ final ExecutableFlow flow = flowRunner.getExecutableFlow();
+ FlowRunner.this.logger.info("Flow started: " + flow.getId());
+ FlowRunner.this.azkabanEventReporter.report(event.getType(), getFlowMetadata(flowRunner));
+ } else if (event.getType() == EventType.FLOW_FINISHED) {
+ final FlowRunner flowRunner = (FlowRunner) event.getRunner();
+ final ExecutableFlow flow = flowRunner.getExecutableFlow();
+ FlowRunner.this.logger.info("Flow ended: " + flow.getId());
+ final Map<String, String> flowMetadata = getFlowMetadata(flowRunner);
+ flowMetadata.put("endTime", String.valueOf(flow.getEndTime()));
+ flowMetadata.put("flowStatus", flow.getStatus().name());
+ FlowRunner.this.azkabanEventReporter.report(event.getType(), flowMetadata);
+ }
+ }
+ }
+
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
+ private synchronized Map<String, String> getJobMetadata(final JobRunner jobRunner) {
+ final ExecutableNode node = jobRunner.getNode();
+ final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
+ final Map<String, String> metaData = new HashMap<>();
+ metaData.put("jobId", node.getId());
+ metaData.put("executionID", String.valueOf(node.getExecutableFlow().getExecutionId()));
+ metaData.put("flowName", node.getExecutableFlow().getId());
+ metaData.put("startTime", String.valueOf(node.getStartTime()));
+ metaData.put("jobType", String.valueOf(node.getType()));
+ metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
+ metaData.put("jobProxyUser",
+ jobRunner.getProps().getString("user.to.proxy", null));
+ return metaData;
+ }
+
@Override
public synchronized void handleEvent(final Event event) {
-
- if (event.getType() == Type.JOB_STATUS_CHANGED) {
+ if (event.getType() == EventType.JOB_STATUS_CHANGED) {
updateFlow();
- } else if (event.getType() == Type.JOB_FINISHED) {
- final JobRunner runner = (JobRunner) event.getRunner();
- final ExecutableNode node = runner.getNode();
+ } else if (event.getType() == EventType.JOB_FINISHED) {
final EventData eventData = event.getData();
+ final JobRunner jobRunner = (JobRunner) event.getRunner();
+ final ExecutableNode node = jobRunner.getNode();
+ if (FlowRunner.this.azkabanEventReporter != null) {
+ final Map<String, String> jobMetadata = getJobMetadata(jobRunner);
+ jobMetadata.put("jobStatus", node.getStatus().name());
+ jobMetadata.put("endTime", String.valueOf(node.getEndTime()));
+ FlowRunner.this.azkabanEventReporter.report(event.getType(), jobMetadata);
+ }
final long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (FlowRunner.this.mainSyncObj) {
FlowRunner.this.logger.info("Job " + eventData.getNestedId() + " finished with status "
@@ -1127,12 +1193,18 @@ public class FlowRunner extends EventHandler implements Runnable {
}
FlowRunner.this.finishedNodes.add(node);
- FlowRunner.this.activeJobRunners.remove(runner);
+ FlowRunner.this.activeJobRunners.remove(jobRunner);
node.getParentFlow().setUpdateTime(System.currentTimeMillis());
interrupt();
fireEventListeners(event);
}
- } else if (event.getType() == Type.JOB_STARTED) {
+ } else if (event.getType() == EventType.JOB_STARTED) {
+ final EventData eventData = event.getData();
+ FlowRunner.this.logger.info("Job Started: " + eventData.getNestedId());
+ if (FlowRunner.this.azkabanEventReporter != null) {
+ final JobRunner jobRunner = (JobRunner) event.getRunner();
+ FlowRunner.this.azkabanEventReporter.report(event.getType(), getJobMetadata(jobRunner));
+ }
// add job level checker
final TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER
.getInstance(TriggerManager.class);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index b972c7d..c1b5c87 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -35,6 +35,8 @@ import azkaban.project.ProjectLoader;
import azkaban.project.ProjectWhitelist;
import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.sla.SlaOption;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -61,6 +63,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
@@ -116,7 +119,7 @@ public class FlowRunnerManager implements EventListener,
private final JobTypeManager jobtypeManager;
private final FlowPreparer flowPreparer;
private final TriggerManager triggerManager;
-
+ private final AzkabanEventReporter azkabanEventReporter;
private final Props azkabanProps;
private final File executionDirectory;
@@ -151,11 +154,13 @@ public class FlowRunnerManager implements EventListener,
final ExecutorLoader executorLoader,
final ProjectLoader projectLoader,
final StorageManager storageManager,
- final TriggerManager triggerManager) throws IOException {
+ final TriggerManager triggerManager,
+ @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
this.azkabanProps = props;
this.executionDirRetention = props.getLong("execution.dir.retention",
this.executionDirRetention);
+ this.azkabanEventReporter = azkabanEventReporter;
logger.info("Execution dir retention set to " + this.executionDirRetention + " ms");
this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
@@ -358,7 +363,7 @@ public class FlowRunnerManager implements EventListener,
final FlowRunner runner =
new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
- this.azkabanProps);
+ this.azkabanProps, this.azkabanEventReporter);
runner.setFlowWatcher(watcher)
.setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
.setValidateProxyUser(this.validateProxyUser)
@@ -485,16 +490,16 @@ public class FlowRunnerManager implements EventListener,
@Override
public void handleEvent(final Event event) {
- if (event.getType() == Event.Type.FLOW_FINISHED || event.getType() == Event.Type.FLOW_STARTED) {
+ if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) {
final FlowRunner flowRunner = (FlowRunner) event.getRunner();
final ExecutableFlow flow = flowRunner.getExecutableFlow();
- if (event.getType() == Event.Type.FLOW_FINISHED) {
+ if (event.getType() == EventType.FLOW_FINISHED) {
this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
logger.info("Flow " + flow.getExecutionId()
+ " is finished. Adding it to recently finished flows list.");
this.runningFlows.remove(flow.getExecutionId());
- } else if (event.getType() == Event.Type.FLOW_STARTED) {
+ } else if (event.getType() == EventType.FLOW_STARTED) {
// add flow level SLA checker
this.triggerManager
.addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 461c4b1..24e2bef 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -6,6 +6,7 @@ import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
+import azkaban.spi.EventType;
import azkaban.utils.Props;
import java.util.HashMap;
import java.util.Map;
@@ -108,16 +109,16 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
+ eventData.getStatus());
}
- if (event.getType() == Event.Type.JOB_STARTED) {
+ if (event.getType() == EventType.JOB_STARTED) {
this.runningJobCount.incrementAndGet();
- } else if (event.getType() == Event.Type.JOB_FINISHED) {
+ } else if (event.getType() == EventType.JOB_FINISHED) {
this.totalExecutedJobCount.incrementAndGet();
if (this.runningJobCount.intValue() > 0) {
this.runningJobCount.decrementAndGet();
} else {
logger.warn("runningJobCount not messed up, it is already zero "
+ "and we are trying to decrement on job event "
- + Event.Type.JOB_FINISHED);
+ + EventType.JOB_FINISHED);
}
if (eventData.getStatus() == Status.FAILED) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 85f5a4c..84fff8d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -18,7 +18,6 @@ package azkaban.execapp;
import azkaban.Constants;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.execapp.event.BlockingStatus;
@@ -34,6 +33,7 @@ import azkaban.jobExecutor.JavaProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
+import azkaban.spi.EventType;
import azkaban.utils.ExternalLinkUtils;
import azkaban.utils.PatternLayoutEscaped;
import azkaban.utils.Props;
@@ -412,12 +412,12 @@ public class JobRunner extends EventHandler implements Runnable {
if (quickFinish) {
this.node.setStartTime(time);
fireEvent(
- Event.create(this, Type.JOB_STARTED,
+ Event.create(this, EventType.JOB_STARTED,
new EventData(nodeStatus, this.node.getNestedId())));
this.node.setEndTime(time);
fireEvent(
Event
- .create(this, Type.JOB_FINISHED,
+ .create(this, EventType.JOB_FINISHED,
new EventData(nodeStatus, this.node.getNestedId())));
return true;
}
@@ -580,13 +580,13 @@ public class JobRunner extends EventHandler implements Runnable {
Status finalStatus = this.node.getStatus();
uploadExecutableNode();
if (!errorFound && !isKilled()) {
- fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(this.node)));
+ fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));
final Status prepareStatus = prepareJob();
if (prepareStatus != null) {
// Writes status to the db
writeStatus();
- fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
+ fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
new EventData(prepareStatus, this.node.getNestedId())));
finalStatus = runJob();
} else {
@@ -609,7 +609,7 @@ public class JobRunner extends EventHandler implements Runnable {
"Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
+ " with status " + this.node.getStatus());
- fireEvent(Event.create(this, Type.JOB_FINISHED,
+ fireEvent(Event.create(this, EventType.JOB_FINISHED,
new EventData(finalStatus, this.node.getNestedId())), false);
finalizeLogFile(this.node.getAttempt());
finalizeAttachmentFile();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index 6116583..b48b85f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -17,13 +17,13 @@
package azkaban.execapp.metric;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.execapp.FlowRunner;
import azkaban.executor.Status;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
/**
* Metric to keep track of number of failed flows in between the tracking events
@@ -47,7 +47,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
*/
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == Type.FLOW_FINISHED) {
+ if (event.getType() == EventType.FLOW_FINISHED) {
final FlowRunner runner = (FlowRunner) event.getRunner();
if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
this.value = this.value + 1;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index b069b0b..7ad0a46 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -17,12 +17,12 @@
package azkaban.execapp.metric;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.executor.Status;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
/**
* Metric to keep track of number of failed jobs in between the tracking events
@@ -45,7 +45,8 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
*/
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
+ if (event.getType() == EventType.JOB_FINISHED && Status.FAILED
+ .equals(event.getData().getStatus())) {
this.value = this.value + 1;
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 5be6b68..a6ac7ee 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -17,11 +17,11 @@
package azkaban.execapp.metric;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
/**
* Metric to keep track of number of running jobs in Azkaban exec server
@@ -49,9 +49,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
*/
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == Type.JOB_STARTED) {
+ if (event.getType() == EventType.JOB_STARTED) {
this.value = this.value + 1;
- } else if (event.getType() == Type.JOB_FINISHED) {
+ } else if (event.getType() == EventType.JOB_FINISHED) {
this.value = this.value - 1;
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java
new file mode 100644
index 0000000..fa1040e
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java
@@ -0,0 +1,124 @@
+package azkaban.execapp.reporter;
+
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_KAFKA_BROKERS;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_KAFKA_TOPIC;
+
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import com.google.common.base.Preconditions;
+import gobblin.metrics.MetricContext;
+import gobblin.metrics.event.EventSubmitter;
+import gobblin.metrics.kafka.KafkaAvroEventReporter;
+import gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import gobblin.metrics.kafka.KafkaEventReporter;
+import gobblin.metrics.kafka.KafkaPusher;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Default implementation of the <code>AzkabanEventReporter</code> class.
+ * An instance of this class is injected via guice into the <code>FlowRunnerManager</code>
+ * class when event reporting is enabled.
+ */
+public class AzkabanKafkaAvroEventReporter implements AzkabanEventReporter {
+
+ private static final Logger logger = Logger.getLogger(AzkabanKafkaAvroEventReporter.class);
+ private static final String ROOT_CONTEXT_NAME = "AzkabanKafkaEvents";
+ private static final String AZKABAN_FLOW_EVENTS_NAMESPACE = "AzkabanFlowEvents";
+ private static final String AZKABAN_JOB_EVENTS_NAMESPACE = "AzkabanJobEvents";
+ private final Properties kafkaProps = new Properties();
+ private MetricContext rootContext;
+ private KafkaAvroEventReporter kafkaAvroEventReporter;
+
+ // For testing only.
+ public AzkabanKafkaAvroEventReporter(final KafkaAvroEventReporter kafkaAvroEventReporter,
+ final Props props) {
+ this.kafkaAvroEventReporter = kafkaAvroEventReporter;
+ initKafkaProperties(props);
+ }
+
+ // Constructed via guice provider
+ public AzkabanKafkaAvroEventReporter(final Props props) {
+
+ initKafkaProperties(props);
+
+ final String eventReportingKafkaBrokers =
+ props.get(AZKABAN_EVENT_REPORTING_KAFKA_BROKERS);
+ final String eventReportingKafkaTopic =
+ props.get(AZKABAN_EVENT_REPORTING_KAFKA_TOPIC);
+
+ this.rootContext = MetricContext.builder(ROOT_CONTEXT_NAME).build();
+ final KafkaEventReporter.Builder<? extends KafkaAvroEventReporter.Builder> builder =
+ KafkaAvroEventReporter.Factory.forContext(this.rootContext);
+ final KafkaPusher kafkaPusher =
+ new AzkabanKafkaPusher(eventReportingKafkaBrokers, eventReportingKafkaTopic);
+
+ try {
+ this.kafkaAvroEventReporter = builder.withKafkaPusher(kafkaPusher)
+ .withSchemaRegistry(new KafkaAvroSchemaRegistry(this.kafkaProps))
+ .build(eventReportingKafkaBrokers, eventReportingKafkaTopic);
+ logger.info("Initialized a new kafkaAvroEventReporter");
+ } catch (final Exception e) {
+ logger.error("Exception while initializing kafka reporter", e);
+ }
+ }
+
+ private void initKafkaProperties(final Props props) {
+ Preconditions.checkArgument(props.containsKey(AZKABAN_EVENT_REPORTING_KAFKA_BROKERS),
+ String.format("Property %s not provided.", AZKABAN_EVENT_REPORTING_KAFKA_BROKERS));
+ Preconditions.checkArgument(props.containsKey(AZKABAN_EVENT_REPORTING_KAFKA_TOPIC),
+ String.format("Property %s not provided.", AZKABAN_EVENT_REPORTING_KAFKA_TOPIC));
+ Preconditions
+ .checkArgument(props.containsKey(AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL),
+ String.format("Property %s not provided.",
+ AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL));
+ this.kafkaProps.put("kafka.schema.registry.url",
+ props.get(AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL));
+ }
+
+ /**
+ * @param eventType - The event type to be reported via kafka.
+ * @param metaData - Additional data to accompany the event.
+ */
+ @Override
+ public boolean report(final EventType eventType, final Map<String, String> metaData) {
+ if (this.kafkaAvroEventReporter != null) {
+ switch (eventType) {
+ case FLOW_STARTED:
+ getEventSubmitter(AZKABAN_FLOW_EVENTS_NAMESPACE).submit("flowStarted", metaData);
+ break;
+ case FLOW_FINISHED:
+ getEventSubmitter(AZKABAN_FLOW_EVENTS_NAMESPACE).submit("flowFinished", metaData);
+ break;
+ case JOB_STARTED:
+ getEventSubmitter(AZKABAN_JOB_EVENTS_NAMESPACE).submit("jobStarted", metaData);
+ break;
+ case JOB_FINISHED:
+ getEventSubmitter(AZKABAN_JOB_EVENTS_NAMESPACE).submit("jobFinished", metaData);
+ break;
+ default:
+ logger.warn("Failed to report the event. Unrecognized event type "
+ + eventType.name());
+ return false;
+ }
+ logger.info("Sent " + eventType.name() + " event via kafka");
+ this.kafkaAvroEventReporter.report();
+ } else {
+ logger.warn("Kafka reporter isn't initialized. Failed to report the event");
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @param namespace - The event namespace
+ * @return - Returns an <code>EventSubmitter</code>
+ */
+ private EventSubmitter getEventSubmitter(final String namespace) {
+ return new EventSubmitter.Builder(this.rootContext, namespace).build();
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java
new file mode 100644
index 0000000..11537d2
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java
@@ -0,0 +1,30 @@
+package azkaban.execapp.reporter;
+
+import com.google.common.io.Closer;
+import gobblin.metrics.kafka.KafkaPusher;
+import gobblin.metrics.kafka.ProducerCloseable;
+import java.util.Properties;
+import kafka.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Extends the KafkaPusher class to create an async type kafka producer.
+ */
+public class AzkabanKafkaPusher extends KafkaPusher {
+
+ private static final Logger logger = Logger.getLogger(AzkabanKafkaPusher.class.getName());
+
+ AzkabanKafkaPusher(final String brokers, final String topic) {
+ super(brokers, topic);
+ }
+
+ @Override
+ public ProducerCloseable<String, byte[]> createProducer(final ProducerConfig config) {
+ final Properties props = config.props().props();
+ props.put("producer.type", "async");
+ final ProducerConfig newConfig = new ProducerConfig(props);
+ logger.info("Kafka producer type is set to " + newConfig.producerType());
+ return Closer.create().register(new ProducerCloseable<String, byte[]>(newConfig));
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java
new file mode 100644
index 0000000..df48c28
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java
@@ -0,0 +1,111 @@
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import java.util.Map;
+import org.junit.Test;
+
+public class AzkabanExecServerModuleTest {
+
+ /**
+ * Verify that alternate implementation of the <code>AzkabanEventReporter</code>
+ * is initialized.
+ */
+ @Test
+ public void testCreateAzkabanEventReporter() {
+ final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+ final Props props = new Props();
+ props.put("azkaban.event.reporting.enabled", "true");
+ props.put("azkaban.event.reporting.class",
+ "azkaban.execapp.AzkabanEventReporterTest1");
+ final AzkabanEventReporter azkabanEventReporter = azkabanExecServerModule
+ .createAzkabanEventReporter(props);
+ assertThat(azkabanEventReporter).isNotNull();
+ assertThat(azkabanEventReporter).isInstanceOf(AzkabanEventReporterTest1.class);
+ }
+
+ /**
+ * Verify that <code>IllegalArgumentException</code> is thrown when required properties
+ * are missing.
+ */
+ @Test
+ public void testAzkabanEventReporterInvalidProperties() {
+ final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+ final Props props = new Props();
+ props.put("azkaban.event.reporting.enabled", "true");
+ props.put("azkaban.event.reporting.class",
+ "azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter");
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> azkabanExecServerModule.createAzkabanEventReporter(props));
+ }
+
+ /**
+ * Verify that a <code>RuntimeException</code> is thrown when valid constructor is
+ * not found in the event reporter implementation.
+ */
+ @Test
+ public void testAzkabanEventReporterInvalidConstructor() {
+ final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+ final Props props = new Props();
+ props.put("azkaban.event.reporting.enabled", "true");
+ props.put("azkaban.event.reporting.class",
+ "azkaban.execapp.AzkabanEventReporterTest3");
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(() -> azkabanExecServerModule.createAzkabanEventReporter(props));
+ }
+
+ /**
+ * Ensures that the event reporter is not initialized when the property 'event.reporter.enabled'
+ * is not set.
+ */
+ @Test
+ public void testEventReporterDisabled() {
+ final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+ final AzkabanEventReporter azkabanEventReporter = azkabanExecServerModule
+ .createAzkabanEventReporter(new Props());
+ assertThat(azkabanEventReporter).isNull();
+ }
+
+}
+
+// Dummy implementation of the AzkabanEventReporter interface
+// with valid constructor.
+class AzkabanEventReporterTest1 implements AzkabanEventReporter {
+
+ public AzkabanEventReporterTest1(final Props props) {
+
+ }
+
+ @Override
+ public boolean report(final EventType eventType, final Map<String, String> metadata) {
+ return false;
+ }
+}
+
+// Dummy implementation of the AzkabanEventReporter interface, for test.
+// Valid constructor is not available.
+class AzkabanEventReporterTest2 implements AzkabanEventReporter {
+
+ @Override
+ public boolean report(final EventType eventType, final Map<String, String> metadata) {
+ return false;
+ }
+}
+
+// Dummy implementation of the AzkabanEventReporter with an invalid constructor
+class AzkabanEventReporterTest3 implements AzkabanEventReporter {
+
+ public AzkabanEventReporterTest3() {
+
+ }
+
+ @Override
+ public boolean report(final EventType eventType, final Map<String, String> metadata) {
+ return false;
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 681acd5..f585ea6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -19,6 +19,7 @@ package azkaban.execapp.event;
import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
+import azkaban.execapp.EventReporterUtil;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.executor.ExecutableFlow;
@@ -30,6 +31,7 @@ import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -42,6 +44,8 @@ import org.junit.Test;
public class LocalFlowWatcherTest {
+ private final AzkabanEventReporter azkabanEventReporter =
+ EventReporterUtil.getTestAzkabanEventReporter();
private JobTypeManager jobtypeManager;
private int dirVal = 0;
@@ -160,14 +164,14 @@ public class LocalFlowWatcherTest {
private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -194,14 +198,14 @@ public class LocalFlowWatcherTest {
private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
long minDiff = Long.MAX_VALUE;
for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -209,7 +213,7 @@ public class LocalFlowWatcherTest {
if (child == null) {
continue;
}
- Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
System.out.println("Node " + node.getId() + " start: "
@@ -252,7 +256,7 @@ public class LocalFlowWatcherTest {
}
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
- this.jobtypeManager, azkabanProps);
+ this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index f5b195a..b7cd06f 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -19,6 +19,7 @@ package azkaban.execapp.event;
import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
+import azkaban.execapp.EventReporterUtil;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -32,6 +33,7 @@ import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -46,6 +48,8 @@ import org.junit.rules.TemporaryFolder;
public class RemoteFlowWatcherTest {
+ private final AzkabanEventReporter azkabanEventReporter =
+ EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private JobTypeManager jobtypeManager;
@@ -148,14 +152,14 @@ public class RemoteFlowWatcherTest {
private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -180,14 +184,14 @@ public class RemoteFlowWatcherTest {
private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
long minDiff = Long.MAX_VALUE;
for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -195,7 +199,7 @@ public class RemoteFlowWatcherTest {
if (child == null) {
continue;
}
- Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
Assert.assertTrue(
@@ -238,7 +242,7 @@ public class RemoteFlowWatcherTest {
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
- this.jobtypeManager, azkabanProps);
+ this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
index adf17e0..28fb57a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventListener;
+import azkaban.spi.EventType;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -31,9 +31,9 @@ public class EventCollectorListener implements EventListener {
public static final Object handleEvent = new Object();
// CopyOnWriteArrayList allows concurrent iteration and modification
private final List<Event> eventList = new CopyOnWriteArrayList<>();
- private final HashSet<Event.Type> filterOutTypes = new HashSet<>();
+ private final HashSet<EventType> filterOutTypes = new HashSet<>();
- public void setEventFilterOut(final Event.Type... types) {
+ public void setEventFilterOut(final EventType... types) {
this.filterOutTypes.addAll(Arrays.asList(types));
}
@@ -62,7 +62,7 @@ public class EventCollectorListener implements EventListener {
return true;
}
- public void assertEvents(final Type... expected) {
+ public void assertEvents(final EventType... expected) {
final Object[] captured = this.eventList.stream()
.filter(event -> event.getRunner() != null)
.map(event -> event.getType())
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java
new file mode 100644
index 0000000..9a34d8d
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java
@@ -0,0 +1,37 @@
+package azkaban.execapp;
+
+import static org.mockito.Mockito.mock;
+
+import azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.utils.Props;
+import gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.mockito.Mockito;
+
+public final class EventReporterUtil {
+ private EventReporterUtil() {
+
+ }
+
+ /**
+ *
+ * @return - Returns a mock <code>AzkabanEventReporter</code> instance.
+ */
+ public static AzkabanEventReporter getTestAzkabanEventReporter() {
+ final KafkaAvroEventReporter kafkaAvroEventReporter = mock(KafkaAvroEventReporter.class);
+ Mockito.doNothing().when(kafkaAvroEventReporter).report();
+ return new AzkabanKafkaAvroEventReporter(kafkaAvroEventReporter, getTestKafkaProps());
+ }
+
+ /**
+ * @return - Returns required kafka properties for test methods.
+ */
+ public static Props getTestKafkaProps() {
+ final Props kafkaProps = new Props();
+ kafkaProps.put("azkaban.event.reporting.kafka.brokers", "brokers.com:10950");
+ kafkaProps.put("azkaban.event.reporting.kafka.topic", "KafkaTopic");
+ kafkaProps
+ .put("azkaban.event.reporting.kafka.schema.registry.url", "registry.com/schemaRegistry");
+ return kafkaProps;
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 83897d5..21511cc 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -34,6 +34,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -61,6 +62,8 @@ public class FlowRunnerPipelineTest {
private static int id = 101;
private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+ private final AzkabanEventReporter azkabanEventReporter =
+ EventReporterUtil.getTestAzkabanEventReporter();
private File workingDir;
private JobTypeManager jobtypeManager;
private ExecutorLoader fakeExecutorLoader;
@@ -677,8 +680,8 @@ public class FlowRunnerPipelineTest {
final FlowRunner runner =
new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
- this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
-
+ this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
+ this.azkabanEventReporter);
runner.addListener(eventCollector);
return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 50e0eae..d202193 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -29,6 +29,7 @@ import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -61,6 +62,8 @@ public class FlowRunnerPropertyResolutionTest {
private static int id = 101;
private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+ private final AzkabanEventReporter azkabanEventReporter =
+ EventReporterUtil.getTestAzkabanEventReporter();
private File workingDir;
private JobTypeManager jobtypeManager;
private ExecutorLoader fakeExecutorLoader;
@@ -215,7 +218,8 @@ public class FlowRunnerPropertyResolutionTest {
final FlowRunner runner =
new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
- this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+ this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
+ this.azkabanEventReporter);
return runner;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index c26a68e..e8ea268 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -20,8 +20,6 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.when;
-import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
@@ -33,6 +31,8 @@ import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -48,6 +48,8 @@ import org.mockito.MockitoAnnotations;
public class FlowRunnerTest extends FlowRunnerTestBase {
private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
+ private final AzkabanEventReporter azkabanEventReporter =
+ EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
@@ -76,8 +78,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
@Test
public void exec1Normal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
startThread(this.runner);
@@ -97,14 +99,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job10", Status.SUCCEEDED);
- eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+ eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1Disabled() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow exFlow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
@@ -137,14 +139,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job10", Status.SKIPPED);
- eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+ eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1Failed() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
@@ -168,14 +170,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job10", Status.CANCELLED);
assertThreadShutDown();
- eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+ eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1FailedKillAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
@@ -200,14 +202,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job9", Status.CANCELLED);
assertStatus("job10", Status.CANCELLED);
- eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+ eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1FailedFinishRest() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
flow.getExecutionOptions().setFailureAction(
@@ -231,14 +233,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job10", Status.CANCELLED);
assertThreadShutDown();
- eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+ eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void execAndCancel() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
startThread(this.runner);
@@ -261,14 +263,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertFlowStatus(Status.KILLED);
- eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+ eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void execRetries() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
- Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+ EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
startThread(this.runner);
@@ -340,7 +342,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
loader.uploadExecutableFlow(flow);
final FlowRunner runner =
- new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
+ new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
+ this.azkabanEventReporter);
runner.addListener(eventCollector);
@@ -361,7 +364,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner =
- new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
+ new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
+ this.azkabanEventReporter);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 30ea40e..974df4c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -35,6 +35,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -97,6 +98,8 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
private static int id = 101;
private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+ private final AzkabanEventReporter azkabanEventReporter =
+ EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
@@ -1129,8 +1132,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
final FlowRunner runner = new FlowRunner(
this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
- mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
-
+ mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
runner.addListener(eventCollector);
return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 765b141..6a162ed 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -17,7 +17,6 @@
package azkaban.execapp;
import azkaban.event.Event;
-import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
@@ -28,6 +27,7 @@ import azkaban.executor.SleepJavaJob;
import azkaban.executor.Status;
import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
+import azkaban.spi.EventType;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -82,12 +82,12 @@ public class JobRunnerTest {
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
runner.run();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -102,7 +102,8 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
- eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+ eventCollector
+ .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Ignore
@@ -131,7 +132,8 @@ public class JobRunnerTest {
Assert.assertTrue(!runner.isKilled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
- eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+ eventCollector
+ .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Test
@@ -162,7 +164,7 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
- eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
+ eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
}
@Test
@@ -193,7 +195,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(!runner.isKilled());
- eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
+ eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
}
@Ignore
@@ -232,7 +234,8 @@ public class JobRunnerTest {
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(runner.isKilled());
- eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+ eventCollector
+ .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Ignore
@@ -246,11 +249,11 @@ public class JobRunnerTest {
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
runner.run();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -267,7 +270,8 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
Assert.assertTrue(eventCollector.checkOrdering());
- eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+ eventCollector
+ .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Test
@@ -280,7 +284,7 @@ public class JobRunnerTest {
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
final Thread thread = new Thread(runner);
@@ -292,7 +296,7 @@ public class JobRunnerTest {
runner.kill();
StatusTestUtils.waitForStatus(node, Status.KILLED);
- eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -312,7 +316,7 @@ public class JobRunnerTest {
Thread.sleep(1000L);
Assert.assertEquals(2L, loader.getNodeUpdateCount("testJob").longValue());
Assert.assertEquals(2L, (long) loader.getNodeUpdateCount("testJob"));
- eventCollector.assertEvents(Type.JOB_FINISHED);
+ eventCollector.assertEvents(EventType.JOB_FINISHED);
}
private Props createProps(final int sleepSec, final boolean fail) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java
new file mode 100644
index 0000000..ecd852d
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java
@@ -0,0 +1,64 @@
+package azkaban.execapp.reporter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import azkaban.event.Event;
+import azkaban.event.EventData;
+import azkaban.execapp.EventReporterUtil;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import java.util.HashMap;
+import org.junit.Test;
+
+/**
+ * Tests for the default implementation of <code>AzkabanEventReporter</code> class.
+ */
+public class AzkabanEventReporterTest {
+
+
+ @Test
+ public void testAzkabanKafkaEventReporter() {
+ final AzkabanEventReporter azkabanKafkaAvroEventReporter
+ = EventReporterUtil.getTestAzkabanEventReporter();
+ final EventData eventData = mock(EventData.class);
+
+ final Object runnerObject = new Object();
+ boolean status;
+
+ // test flow started event
+ Event event = Event.create(runnerObject, EventType.FLOW_STARTED, eventData);
+ status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+ assertThat(status).isTrue();
+
+ // test flow finished event
+ event = Event.create(runnerObject, EventType.FLOW_FINISHED, eventData);
+ status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+ assertThat(status).isTrue();
+
+ // test job started event
+ event = Event.create(runnerObject, EventType.JOB_STARTED, eventData);
+ status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+ assertThat(status).isTrue();
+
+ // test job finished event
+ event = Event.create(runnerObject, EventType.JOB_FINISHED, eventData);
+ status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+ assertThat(status).isTrue();
+
+ // test un-supported event type
+ event = Event.create(runnerObject, EventType.JOB_STATUS_CHANGED, eventData);
+ status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+ assertThat(status).isFalse();
+ }
+
+ @Test
+ public void testNullAzkabanKafkaEventReporter() {
+ final AzkabanKafkaAvroEventReporter azkabanKafkaAvroEventReporter
+ = new AzkabanKafkaAvroEventReporter(null, EventReporterUtil.getTestKafkaProps());
+ final Event event = mock(Event.class);
+ final boolean status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+ assertThat(status).isFalse();
+ }
+
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java
new file mode 100644
index 0000000..cc45cc3
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java
@@ -0,0 +1,19 @@
+package azkaban.execapp.reporter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class AzkabanKafkaPusherTest {
+
+ /**
+ * Test validates that an async kafka producer is created.
+ */
+ @Test
+ public void testCreateASyncKafkaProducer() {
+ final AzkabanKafkaPusher azkabanKafkaPusher =
+ new AzkabanKafkaPusher("broker.com:10251", "kafka.topic");
+ assertThat(azkabanKafkaPusher).isNotNull();
+ }
+
+}
diff --git a/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java b/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java
new file mode 100644
index 0000000..73c46d1
--- /dev/null
+++ b/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java
@@ -0,0 +1,19 @@
+package azkaban.spi;
+
+import java.util.Map;
+
+/**
+ * Implement this interface to report flow and job events. Event reporter
+ * can be turned on by setting the property {@code AZKABAN_EVENT_REPORTING_ENABLED} to true.
+ *
+ * By default, a KafkaAvroEventReporter is provided. Alternate implementations
+ * can be provided by setting the property {@code AZKABAN_EVENT_REPORTING_CLASS_PARAM}
+ * <br><br>
+ * The constructor will be called with a {@code azkaban.utils.Props} object passed as
+ * the only parameter. If such a constructor doesn't exist, then the AzkabanEventReporter
+ * instantiation will fail.
+ */
+public interface AzkabanEventReporter {
+
+ boolean report(EventType eventType, Map<String, String> metadata);
+}
diff --git a/azkaban-spi/src/main/java/azkaban/spi/EventType.java b/azkaban-spi/src/main/java/azkaban/spi/EventType.java
new file mode 100644
index 0000000..71f04f7
--- /dev/null
+++ b/azkaban-spi/src/main/java/azkaban/spi/EventType.java
@@ -0,0 +1,14 @@
+package azkaban.spi;
+
+/**
+ * Enum class defining the list of supported event types.
+ */
+public enum EventType {
+ FLOW_STARTED,
+ FLOW_FINISHED,
+ JOB_STARTED,
+ JOB_FINISHED,
+ JOB_STATUS_CHANGED,
+ EXTERNAL_FLOW_UPDATED,
+ EXTERNAL_JOB_UPDATED
+}
build.gradle 5(+4 -1)
diff --git a/build.gradle b/build.gradle
index c7f0093..eab8844 100644
--- a/build.gradle
+++ b/build.gradle
@@ -39,6 +39,9 @@ allprojects {
repositories {
mavenCentral()
mavenLocal()
+ maven {
+ url "http://packages.confluent.io/maven/"
+ }
}
}
@@ -57,6 +60,7 @@ ext.deps = [
dbcp2 : 'org.apache.commons:commons-dbcp2:2.1.1',
dbutils : 'commons-dbutils:commons-dbutils:1.5',
fileupload : 'commons-fileupload:commons-fileupload:1.2.1',
+ gobblinKafka : 'com.linkedin.gobblin:gobblin-kafka-08:0.11.0',
gson : 'com.google.code.gson:gson:2.8.1',
guava : 'com.google.guava:guava:21.0',
guice : 'com.google.inject:guice:4.1.0',
@@ -140,7 +144,6 @@ subprojects {
compile deps.log4j
compile deps.guice
compile deps.slf4j
-
runtime deps.slf4jLog4j
testCompile deps.assertj